# -*- encoding: utf-8 -*- from bonsai import LDAPEntry, LDAPModOp, NoSuchObjectError # type: ignore from phi.exceptions import ( PhiAttributeMissing, PhiEntryDoesNotExist, PhiEntryExists, PhiUnauthorized, PhiUnexpectedRuntimeValue, ) ATTR_TAG = "_ldap_" ATTR_TAG_LEN = len(ATTR_TAG) async def build_heritage(obj, child_class, attribute_id="uid"): """ Given the object and the child class, yields the instances of the children. """ async for child in obj.get_children(): if attribute_id in child: _name = child[attribute_id][0] yield child_class(_name, obj.client) class Singleton(object): """ to singletonize a class. The class is crafted to be used with the mixins that implement the compatible __init__. """ def __new__(cls, client, *args): if "name" in args: name = f"{cls.__name__}-{args['name']}-{id(client)}" else: name = f"{cls.__name__}-{id(client)}" if name not in cls._instances: cls._instances[name] = object.__new__(cls) return cls._instances[name] class Entry(object): """ to interact with LDAP. """ def get_all_ldap_attributes(self): return [attr for attr in dir(self) if attr.startswith(ATTR_TAG)] def __repr__(self): return f"<{self.__class__.__name__} {self.dn}>" def __str__(self): return f"<{self.__class__.__name__} {self.dn}>" def __iter__(self): for attr in self.get_all_ldap_attributes(): if not isinstance(getattr(type(self), attr), property) and callable( getattr(type(self), attr) ): yield attr[ATTR_TAG_LEN:], getattr(self, attr)() else: yield attr[ATTR_TAG_LEN:], getattr(self, attr) def __dict__(self): return dict(self) async def _create_new(self): entry = LDAPEntry(self.dn) entry["objectClass"] = self.object_class entry.update(self.get_all_ldap_attributes()) async with self.client.connect(is_async=True) as conn: await conn.add(entry) return entry async def _get(self): async with self.client.connect(is_async=True) as conn: # This returns a list of dicts. It should always contain only one item: # the one we are interested. _res = await conn.search(self.dn, 0) if len(_res) == 0: return elif len(_res) > 1: raise PhiUnexpectedRuntimeValue( "return value should be no more than one", res ) return _res[0] async def describe(self): res = dict(await self._get()) res["dn"] = self.dn res["name"] = self.name return res class OrganizationalUnit(object): def __init__(self, client): self.client = client self.base_dn = client.base_dn self.name = self.__class__.__name__ self.children = build_heritage(self, getattr(self, "child_class")) self._entry = LDAPEntry(self.dn) def __aiter__(self): return self async def __anext__(self): try: return await self.children.__anext__() except StopAsyncIteration: self.children = build_heritage(self, getattr(self, "child_class")) raise async def get_children(self): async with self.client.connect(is_async=True) as conn: for el in await conn.search(self.dn, 2): yield el @property def dn(self): return f"ou={self.ou_name},{self.base_dn}" class Child(object): object_class = ["inetOrgPerson", "organizationalPerson", "person", "top"] def __init__(self, client, name): self.client = client self.base_dn = client.base_dn self.name = name self._entry = LDAPEntry(self.dn) @property def dn(self): return f"{self.id_tag}={self.name},ou={self.ou_name},{self.base_dn}"