phi/src/phi/async_ldap/model.py

297 lines
7.2 KiB
Python

# -*- encoding: utf-8 -*-
import asyncio
import logging
import typing as T
from phi.logging import get_logger
log = get_logger(__name__)
alog = logging.getLogger("asyncio")
alog.setLevel(logging.DEBUG)
def get_class(obj):
if type(obj) is type:
return obj
return obj.__class__
def recall(cls):
"""
Prints the name of the parent class.
"""
_cls = get_class(cls)
return _cls.__bases__[0]
def call_if_callable(cls, attr):
"""
Tell between methods and properties. Call if method.
"""
_attr = getattr(cls, attr)
if "__call__" in dir(_attr):
return _attr()
return _attr
def inheritance(obj, attr, root_cls=object):
"""
Concatenates the value obtained from invoking the method attr
on the class and on any parent class until root_cls.
"""
res = call_if_callable(obj, attr)
base = get_class(obj)
while base is not root_cls:
if base is not get_class(obj):
res += ",{}".format(call_if_callable(base, attr))
base = recall(base)
return res.strip(",")
async def iter_children(children):
return [child async for child in children]
class Entry(object):
"""
LDAP Entry. Interface to LDAP.
"""
kind: T.Union[None, str] = None
_name: T.Union[None, str] = None
@classmethod
def name(cls):
if "_name" in dir(cls) and cls._name is not None:
return cls._name
return cls.__name__
@classmethod
def qualified_name(cls):
return "{}={}".format(cls.kind, cls.name())
def __init__(self, client):
self.client = client
self.base_dn = client.base_dn
def __repr__(self):
return f"<{call_if_callable(self, 'name')} {self.dn}>"
def __aiter__(self):
return self
async def get_children(self):
async with self.client.connect(is_async=True) as conn:
for el in await conn.search(self.dn, 2):
yield el
@property
def children(self):
"""
Synchronous property to enumerate the children.
"""
return asyncio.run(iter_children(self.get_children()))
@property
def dn(self):
return "{},{}".format(inheritance(self, "qualified_name", Entry), self.base_dn)
async def describe(self):
async with self.client.connect(is_async=True) as conn:
res = await conn.search(self.dn, 0)
return res
async def build_heritage(obj, child_class, attribute_id="uid"):
"""
Given the object and the child class, yields the
instances of the children.
"""
async for child in obj.get_children():
if attribute_id in child:
_name = child[attribute_id][0]
yield child_class(_name, obj.client)
class Hackers(Entry):
"""
This class is where Users belong.
"""
kind = "ou"
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, client, *args, **kwargs):
super().__init__(client)
self._hackers = build_heritage(self, User)
async def __anext__(self):
try:
return await self._hackers.__anext__()
except StopAsyncIteration:
self._hackers = build_heritage(self, User)
raise
class Robots(Entry):
"""
This class is where Services belong.
"""
kind = "ou"
_name = "Services"
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, client, *args, **kwargs):
super().__init__(client)
self._robots = build_heritage(self, Service)
async def __anext__(self):
try:
return await self._robots.__anext__()
except StopAsyncIteration:
self._robots = build_heritage(self, Service)
raise
class Congregations(Entry):
"""
This class is where Groups belong.
"""
kind = "ou"
_name = "Groups"
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, client, *args, **kwargs):
super().__init__(client)
self._groups = build_heritage(self, Group, attribute_id="cn")
async def __anext__(self):
try:
return await self._groups.__anext__()
except StopAsyncIteration:
self._groups = build_heritage(self, Group, attribute_id="cn")
raise
class User(Hackers):
"""
This class models a user. Users may have attributes
and belong one or more Group(s).
"""
kind = "uid"
_instances: T.Dict[str, Entry] = dict()
def __new__(cls, name, client, *args, **kwargs):
_name = f"{name}-{id(client)}"
if _name not in cls._instances:
cls._instances[_name] = object.__new__(cls)
return cls._instances[_name]
def __init__(self, name, client, *args, **kwargs):
super().__init__(client, *args, **kwargs)
self.name = name
def __repr__(self):
return f"<{get_class(self).__name__}({self.name}) {self.dn}>"
def qualified_name(self):
return "{}={}".format(self.kind, self.name)
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
class Service(Robots):
"""
This class models a system user (i.e. users that are ancillary to
services on a machine). System user may have attributes
and belong to one or more Group(s).
"""
kind = "uid"
_instances: T.Dict[str, Entry] = dict()
def __new__(cls, name, client, *args, **kwargs):
_name = f"{name}-{id(client)}"
if _name not in cls._instances:
cls._instances[_name] = object.__new__(cls)
return cls._instances[_name]
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
def __repr__(self):
return f"<{get_class(self).__name__}({self.name}) {self.dn}>"
def qualified_name(self):
return "{}={}".format(self.kind, self.name)
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
class Group(Congregations):
"""
This class models a group. Groups may have attributes
and may have Users and Services belonging to them.
"""
kind = "cn"
_instances: T.Dict[str, Entry] = dict()
def __new__(cls, name, client, *args, **kwargs):
_name = f"{name}-{id(client)}"
if _name not in cls._instances:
cls._instances[_name] = object.__new__(cls)
return cls._instances[_name]
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
def __repr__(self):
return f"<{get_class(self).__name__}({self.name}) {self.dn}>"
def qualified_name(self):
return "{}={}".format(self.kind, self.name)
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name