diff --git a/src/phi/ldap/async_model.py b/src/phi/ldap/async_model.py new file mode 100644 index 0000000..3936a23 --- /dev/null +++ b/src/phi/ldap/async_model.py @@ -0,0 +1,141 @@ +import asyncio + +from phi.logging import get_logger + + +log = get_logger(__name__) + + +def get_class(obj): + if type(obj) is type: + return obj + return obj.__class__ + + +def recall(cls): + """ + Prints the name of the parent class. + """ + _cls = get_class(cls) + return _cls.__bases__[0] + + +def call_if_callable(cls, attr): + """ + Tell between methods and properties. Call if method. + """ + _attr = getattr(cls, attr) + if "__call__" in dir(_attr): + return _attr() + return _attr + + +def inheritance(obj, attr, root_cls=object): + """ + Concatenates the value obtained from invoking the method attr + on the class and on any parent class until root_cls. + """ + res = call_if_callable(obj, attr) + base = get_class(obj) + while base is not root_cls: + if base is not get_class(obj): + res += ",{}".format(call_if_callable(base, attr)) + base = recall(base) + return res.strip(",") + + +class Entry: + """ + LDAP Entry. Interface to LDAP. + """ + + kind = None + _name = None + + @classmethod + def name(cls): + if "_name" in dir(cls) and cls._name is not None: + return cls._name + return cls.__name__ + + @classmethod + def qualified_name(cls): + return "{}={}".format(cls.kind, cls.name()) + + def __init__(self, client): + self.client = client + self.base_dn = client.base_dn + + @property + def dn(self): + return "{},{}".format(inheritance(self, "qualified_name", Entry), self.base_dn) + + async def describe(self): + async with self.client.connect(is_async=True) as conn: + res = await conn.search(self.dn, 2) + return res + + +class Hackers(Entry): + """ + This class is where Users belong. + """ + + kind = "ou" + + +class Robots(Entry): + """ + This class is where Services belong. + """ + + kind = "ou" + _name = "Services" + + +class User(Hackers): + """ + This class models a user. User may have attributes + and belong to Groups. + """ + + kind = "uid" + + def __init__(self, name, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = name + + def qualified_name(self): + return "{}={}".format(self.kind, self.name) + + @property + def name(self): + return self._name + + @name.setter + def name(self, name): + self._name = name + + +class Service(Robots): + """ + This class models a user. User may have attributes + and belong to Groups. + """ + + kind = "uid" + + def __init__(self, name, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = name + + def qualified_name(self): + return "{}={}".format(self.kind, self.name) + + @property + def name(self): + return self._name + + @name.setter + def name(self, name): + self._name = name diff --git a/test/test_ldap_async_model.py b/test/test_ldap_async_model.py new file mode 100644 index 0000000..06c8e62 --- /dev/null +++ b/test/test_ldap_async_model.py @@ -0,0 +1,175 @@ +from argparse import Namespace +import asyncio + +from async_generator import asynccontextmanager +import pytest + +from phi.ldap.async_model import ( + get_class, + recall, + call_if_callable, + inheritance, + Entry, + Hackers, + Robots, + Service, + User, +) + + +BASE_DN = "dc=test,dc=abbiamoundominio,dc=org" + + +class MockClient: + def __init__(self, base_dn): + self._base_dn = base_dn + + @property + def base_dn(self): + return self._base_dn + + @asynccontextmanager + async def connect(self, *args, **kwargs): + conn = Namespace() + async def _search(*args, **kwargs): + await asyncio.sleep(0.01) + return "Fake description" + conn.search = _search + yield conn + + +@pytest.fixture +def client_fixture(): + return MockClient(BASE_DN) + + +@pytest.fixture +def lineage_fixture(): + class Grand: + @classmethod + def name(cls): + return cls.__name__ + + class Ma(Grand): + pass + + class Child(Ma): + pass + + grand = Grand() + ma = Ma() + child = Child() + + return Grand, Ma, Child, grand, ma, child + + +def test_get_class(): + c = MockClient(BASE_DN) + + assert get_class(c) is MockClient + + +def test_recall(lineage_fixture): + Grand, Ma, Child, grand, ma, child = lineage_fixture + + assert object is recall(grand) + assert recall(grand) is recall(Grand) + assert Grand is recall(ma) + assert recall(ma) is recall(Ma) + assert Ma is recall(child) + assert recall(child) is recall(Child) + + +def test_call_if_callable(): + class Dummy: + classattr = "classattr" + + @property + def prop(self): + return "prop" + + def func(self): + return "func" + + @classmethod + def clsmth(cls): + return "clsmth" + + d = Dummy() + + assert "classattr" == call_if_callable(d, "classattr") + assert "prop" == call_if_callable(d, "prop") + assert "func" == call_if_callable(d, "func") + assert "clsmth" == call_if_callable(d, "clsmth") + + +def test_inheritance(lineage_fixture): + Grand, Ma, Child, _, _, _ = lineage_fixture + + assert inheritance(Grand, "name") == "Grand" + assert inheritance(Ma, "name") == "Ma,Grand" + assert inheritance(Child, "name") == "Child,Ma,Grand" + assert inheritance(Child, "name", Grand) == "Child,Ma" + assert inheritance(Child, "name", Ma) == "Child" + + +def test_Entry(client_fixture): + e = Entry(client_fixture) + + assert e.base_dn == BASE_DN + assert e.client is not None + + +def test_Entry_name(client_fixture): + e = Entry(client_fixture) + + assert e.name() == "Entry" + + +def test_Entry_qualified_name(client_fixture): + e = Entry(client_fixture) + + assert e.qualified_name() == "None=Entry" + + +def test_Entry_dn(client_fixture): + e = Entry(client_fixture) + + assert e.dn == "None=Entry,{}".format(BASE_DN) + + +@pytest.mark.asyncio +async def test_Entry_describe(client_fixture): + e = Entry(client_fixture) + + assert "Fake description" == await e.describe() + + +def test_Hackers(client_fixture): + h = Hackers(client_fixture) + + assert h.kind == "ou" + assert h.dn == "ou={},{}".format(h.name(), BASE_DN) + + +def test_Robots(client_fixture): + r = Robots(client_fixture) + + assert r.kind == "ou" + assert r.dn == "ou={},{}".format(r.name(), BASE_DN) + + +def test_User(client_fixture): + c = User("conte_mascetti", client_fixture) + + assert c.kind == "uid" + assert c.name == "conte_mascetti" + assert c.dn == "uid=conte_mascetti,ou=Hackers,{}".format(BASE_DN) + + +def test_Service(client_fixture): + c = Service("phi", client_fixture) + + assert c.kind == "uid" + assert c.name == "phi" + assert c.dn == "uid=phi,ou=Services,{}".format(BASE_DN)