# -*- encoding: utf-8 -*- from argparse import Namespace import asyncio from async_generator import asynccontextmanager from bonsai import NoSuchObjectError import mock import pytest from phi.async_ldap.model import ( get_class, recall, call_if_callable, inheritance, iter_children, Entry, build_heritage, Hackers, Robots, Congregations, Service, create_new_, User, PhiEntryExists, PhiEntryDoesNotExist, PhiAttributeMissing, PhiUnauthorized, Group, ) from phi.security import hash_pass BASE_DN = "dc=test,dc=abbiamoundominio,dc=org" USER_LIST = [{"uid": ["conte_mascetti"]}, {"uid": ["perozzi"]}, {"uid": ["necchi"]}] SERVICE_LIST = [{"uid": ["phi"]}, {"uid": ["irc"]}] GROUP_LIST = [{"cn": ["amici_miei"]}, {"cn": ["antani"]}] EXISTING_USER = [ { "dn": f"uid=existing_user,ou=Hackers,{BASE_DN}>", "objectClass": ["inetOrgPerson", "organizationalPerson", "person", "top"], "cn": ["exists"], "sn": ["Existing User"], "mail": ["existing@mail.org"], "uid": ["existing_user"], "userPassword": ["{SHA}W6ph5Mm5Pz8GgiULbPgzG37mj9g="], } ] MISSING_AUTH_USER = [ { "dn": f"uid=existing_user,ou=Hackers,{BASE_DN}>", "objectClass": ["inetOrgPerson", "organizationalPerson", "person", "top"], "cn": ["exists"], "sn": ["Existing User"], "mail": ["existing@mail.org"], "uid": ["existing_user"], } ] EXISTING_SERVICE = [ { "dn": f"uid=existing_service,ou=Services,{BASE_DN}>", "objectClass": ["account", "simpleSecurityObject", "top"], "uid": ["existing_service"], "userPassword": ["{SHA}W6ph5Mm5Pz8GgiULbPgzG37mj9g="], } ] class MockClient: def __init__(self, base_dn, *args): self._base_dn = base_dn self.args = args self.called_with_args = {} self.username = f"uid=mock_connection,ou=Services,{base_dn}" @property def base_dn(self): return self._base_dn @asynccontextmanager async def connect(self, *args, **kwargs): conn = Namespace() async def _search(*args, **kwargs): self.called_with_args["search"] = {"args": args, "kwargs": kwargs} if "Services" in self.args: return SERVICE_LIST elif "Groups" in self.args: return GROUP_LIST if "Users" in self.args or f"None=Entry,{BASE_DN}" in args: return USER_LIST elif f"uid=existing_user,ou=Hackers,{BASE_DN}" in args: return EXISTING_USER elif f"uid=existing_service,ou=Services,{BASE_DN}" in args: return EXISTING_SERVICE elif f"uid=not_existing,ou=Hackers,{BASE_DN}" in args: return [] elif f"uid=not_existing,ou=Services,{BASE_DN}" in args: return [] elif f"uid=missing_auth_user,ou=Hackers,{BASE_DN}" in args: return MISSING_AUTH_USER conn.search = _search async def _add(*args, **kwargs): self.called_with_args["add"] = {"args": args, "kwargs": kwargs} return conn.add = _add async def _modify(*args, **kwargs): self.called_with_args["modify"] = {"args": args, "kwargs": kwargs} return conn.modify = _modify yield conn @pytest.fixture def client_fixture(): return MockClient(BASE_DN) @pytest.fixture def client_fixture_multi(): res = Namespace() res.users = MockClient(BASE_DN, "Users") res.services = MockClient(BASE_DN, "Services") res.groups = MockClient(BASE_DN, "Groups") return res @pytest.fixture def lineage_fixture(): class Grand: @classmethod def name(cls): return cls.__name__ class Ma(Grand): pass class Child(Ma): pass grand = Grand() ma = Ma() child = Child() return Grand, Ma, Child, grand, ma, child def test_get_class(): c = MockClient(BASE_DN) assert get_class(c) is MockClient def test_recall(lineage_fixture): Grand, Ma, Child, grand, ma, child = lineage_fixture assert object is recall(grand) assert recall(grand) is recall(Grand) assert Grand is recall(ma) assert recall(ma) is recall(Ma) assert Ma is recall(child) assert recall(child) is recall(Child) def test_call_if_callable(): class Dummy: classattr = "classattr" @property def prop(self): return "prop" def func(self): return "func" @classmethod def clsmth(cls): return "clsmth" d = Dummy() assert "classattr" == call_if_callable(d, "classattr") assert "prop" == call_if_callable(d, "prop") assert "func" == call_if_callable(d, "func") assert "clsmth" == call_if_callable(d, "clsmth") def test_inheritance(lineage_fixture): Grand, Ma, Child, _, _, _ = lineage_fixture assert inheritance(Grand, "name") == "Grand" assert inheritance(Ma, "name") == "Ma,Grand" assert inheritance(Child, "name") == "Child,Ma,Grand" assert inheritance(Child, "name", Grand) == "Child,Ma" assert inheritance(Child, "name", Ma) == "Child" @pytest.mark.asyncio async def test_iter_children(): LIST = [1, 2, 3, 4] async def _async_gen(): for i in LIST: yield i ALIST = _async_gen() assert LIST == await iter_children(ALIST) def test_Entry(client_fixture): e = Entry(client_fixture) assert e.base_dn == BASE_DN assert e.client is not None def test_Entry_name(client_fixture): e = Entry(client_fixture) assert e.name() == "Entry" def test_Entry_qualified_name(client_fixture): e = Entry(client_fixture) assert e.qualified_name() == "None=Entry" def test_Entry_dn(client_fixture): e = Entry(client_fixture) assert e.dn == "None=Entry,{}".format(BASE_DN) @pytest.mark.asyncio async def test_Entry_describe(client_fixture): e = Entry(client_fixture) assert USER_LIST == await e.describe() @pytest.mark.asyncio async def test_build_heritage(client_fixture): class MockAIterable(object): client = None def __init__(self, elements): self.elements = elements async def get_children(self): for el in self.elements: yield el class Dummy(object): def __init__(self, *args, **kwargs): self.id = sum([id(el) for el in args]) + sum( [id(v) for _, v in kwargs.items()] ) def __repr__(self): return f"" def __eq__(self, other): return self.id == other.id m = MockAIterable(USER_LIST) res = [el async for el in build_heritage(m, Dummy, attribute_id="uid")] exp_res = [Dummy(el["uid"][0], None) for el in USER_LIST] assert res == exp_res def test_Hackers(client_fixture_multi): h = Hackers(client_fixture_multi.users) assert h.kind == "ou" assert h.dn == "ou={},{}".format(h.name(), BASE_DN) assert repr(h) == f"" def test_Hackers_singleton(client_fixture): other_client = MockClient(BASE_DN) h1 = Hackers(client_fixture) h2 = Hackers(other_client) h3 = Hackers(client_fixture) assert client_fixture is not other_client assert h1 is h3 assert h2 is not h1 @pytest.mark.asyncio async def test_Entry_get_children(client_fixture_multi): h = Hackers(client_fixture_multi.users) assert USER_LIST == [el async for el in h.get_children()] @pytest.mark.asyncio async def test_Hackers_anext(client_fixture_multi): h = Hackers(client_fixture_multi.users) exp_res = [User(el["uid"][0], client_fixture_multi.users) for el in USER_LIST] assert exp_res == [el async for el in h] def test_Hackers_children(client_fixture_multi): h = Hackers(client_fixture_multi.users) assert USER_LIST == h.children @pytest.mark.asyncio async def test_Hackers_get_by_attr(client_fixture): h = Hackers(client_fixture) res = await h.get_by_attr("uid", "existing_user") assert len(res) == 1 assert res[0] is User("existing_user", client_fixture) assert client_fixture.called_with_args["search"]["args"] == ( f"uid=existing_user,ou=Hackers,{BASE_DN}", 0, ) @pytest.mark.asyncio async def test_Hackers_get_by_attr_empty(client_fixture): h = Hackers(client_fixture) res = await h.get_by_attr("uid", "not_existing") assert res is None @pytest.mark.asyncio async def test_Hackers_get_by_uid(client_fixture): h = Hackers(client_fixture) res = await h.get_by_uid("existing_user") assert res is User("existing_user", client_fixture) def test_Robots(client_fixture_multi): r = Robots(client_fixture_multi.services) assert r.kind == "ou" assert r.dn == "ou={},{}".format(r.name(), BASE_DN) assert repr(r) == f"" def test_Robots_singleton(client_fixture): other_client = MockClient(BASE_DN) r1 = Robots(client_fixture) r2 = Robots(other_client) r3 = Robots(client_fixture) assert client_fixture is not other_client assert r1 is r3 assert r2 is not r1 @pytest.mark.asyncio async def test_Robots_anext(client_fixture_multi): r = Robots(client_fixture_multi.services) exp_res = [ Service(el["uid"][0], client_fixture_multi.services) for el in SERVICE_LIST ] assert exp_res == [el async for el in r] def test_Robots_children(client_fixture_multi): r = Robots(client_fixture_multi.services) assert SERVICE_LIST == r.children def test_Congregations(client_fixture_multi): g = Congregations(client_fixture_multi.groups) assert g.kind == "ou" assert g.dn == "ou={},{}".format(g.name(), BASE_DN) assert repr(g) == f"" def test_Congregations_singleton(client_fixture): other_client = MockClient(BASE_DN) g1 = Congregations(client_fixture) g2 = Congregations(other_client) g3 = Congregations(client_fixture) assert client_fixture is not other_client assert g1 is g3 assert g2 is not g1 @pytest.mark.asyncio async def test_Congregations_anext(client_fixture_multi): c = Congregations(client_fixture_multi.groups) exp_res = [Group(el["cn"][0], client_fixture_multi.groups) for el in GROUP_LIST] assert exp_res == [el async for el in c] def test_Congregations_children(client_fixture_multi): c = Congregations(client_fixture_multi.groups) assert GROUP_LIST == c.children @pytest.mark.asyncio async def test_create_new_(client_fixture): self = Namespace() self.dn = f"uid=test,ou=Tests,{BASE_DN}" self.client = client_fixture self.object_class = ["a", "b", "c"] ENTRY_DICT = {"attr1": "val1", "attr2": "val2", "attr3": "val3", "attr4": "val4"} res = await create_new_(self, **ENTRY_DICT) assert res is not None assert client_fixture.called_with_args["add"]["args"][0] == res assert res["objectClass"] == self.object_class for k, v in ENTRY_DICT.items(): assert v == res[k][0] def test_User(client_fixture): c = User("conte_mascetti", client_fixture) assert c.kind == "uid" assert c.name == "conte_mascetti" assert c.dn == "uid=conte_mascetti,ou=Hackers,{}".format(BASE_DN) assert repr(c) == f"" def test_User_unsettable_name(client_fixture): c = User("conte_mascetti", client_fixture) with pytest.raises(RuntimeError) as e: c.name = "totò" assert "Name property is not modifiable." in str(e.value) def test_User_singleton(client_fixture): other_client = MockClient(BASE_DN) c1 = User("conte_mascetti", client_fixture) c2 = User("perozzi", client_fixture) c3 = User("conte_mascetti", client_fixture) c4 = User("conte_mascetti", other_client) assert client_fixture is not other_client assert c1 is c3 assert c2 is not c1 assert c4 is not c1 @pytest.mark.asyncio async def test_User_create_existing(client_fixture): c = User("existing_user", client_fixture) with pytest.raises(PhiEntryExists) as e: await c.create( "existing@mail.org", password="password", sn="exists", cn="Existing User" ) assert c.dn in str(e.value) @pytest.mark.asyncio async def test_User_create_not_existing(client_fixture): c = User("not_existing", client_fixture) await c.create("not@existing.org", "password") assert client_fixture.called_with_args["search"]["args"] == (c.dn, 0) assert c._entry["mail"][0] == "not@existing.org" @pytest.mark.asyncio async def test_User_sync_existing(client_fixture, caplog): c = User("existing_user", client_fixture) await c.sync() assert f"User({c.name}): synced" in caplog.text assert client_fixture.called_with_args["search"]["args"] == (c.dn, 0) for k, v in EXISTING_USER[0].items(): if k != "dn": assert c._entry[k] == v @pytest.mark.asyncio async def test_User_sync_not_existing(client_fixture, caplog): c = User("not_existing", client_fixture) with pytest.raises(PhiEntryDoesNotExist) as e: await c.sync() assert c.dn in str(e.value) assert client_fixture.called_with_args["search"]["args"] == (c.dn, 0) @pytest.mark.asyncio async def test_User_modify_existing(client_fixture, caplog): c = User("existing_user", client_fixture) c._entry = mock.MagicMock() async def _modify(): return c._entry.modify = _modify await c.sync() await c.modify("mail", "other@existing.org") assert f"User({c.name}): modified (mail)" in caplog.text c._entry.__setitem__.assert_called_with("mail", "other@existing.org") c._entry.__delitem__.assert_called_with("mail") @pytest.mark.asyncio async def test_User_modify_existing_append(client_fixture, caplog): c = User("existing_user", client_fixture) c._entry = mock.MagicMock() async def _modify(): return c._entry.modify = _modify await c.sync() await c.modify("mail", "other@existing.org", append=True) assert f"User({c.name}): modified (mail)" in caplog.text c._entry.__setitem__.assert_called_with("mail", "other@existing.org") c._entry.__delitem__.assert_not_called() @pytest.mark.asyncio async def test_User_modify_not_existing(client_fixture): c = User("existing_user", client_fixture) c._entry = mock.MagicMock() attr = {"__delitem__.side_effect": KeyError} c._entry.configure_mock(**attr) async def _modify(): return c._entry.modify = _modify await c.sync() with pytest.raises(PhiAttributeMissing) as e: await c.modify("snafu", "modified") assert c.dn in str(e.value) assert "snafu" in str(e.value) @pytest.mark.asyncio async def test_User_remove_existing(client_fixture, caplog): c = User("existing_user", client_fixture) c._entry = mock.MagicMock() def _delete(): coro = mock.Mock(name="coroutine") fn = mock.MagicMock(side_effect=asyncio.coroutine(coro)) return fn delete = _delete() c._entry.delete = delete await c.sync() await c.remove() assert f"User({c.name}): removed" in caplog.text delete.assert_called_once() @pytest.mark.asyncio async def test_User_remove_not_existing(client_fixture): c = User("not_existing", client_fixture) c._entry = mock.MagicMock() def _delete(): fn = mock.MagicMock(side_effect=NoSuchObjectError) return fn delete = _delete() c._entry.delete = delete with pytest.raises(PhiEntryDoesNotExist) as e: await c.remove() assert c.dn in str(e.value) @pytest.mark.asyncio async def test_User_modify_password(client_fixture, caplog): c = User("existing_user", client_fixture) def _modify(): coro = mock.Mock(name="coroutine") fn = mock.MagicMock(side_effect=asyncio.coroutine(coro)) return fn modify = _modify() c.modify = modify await c.modify_password("new-password") modify.assert_called_with("userPassword", hash_pass("new-password")) assert "User(existing_user): password modified" in caplog.text @pytest.mark.asyncio async def test_User_modify_password_raises(client_fixture): c = User("existing_user", client_fixture) def _modify(): coro = mock.Mock( name="coroutine", side_effect=PhiAttributeMissing("TESTDN", "TESTATTR") ) fn = mock.MagicMock(side_effect=asyncio.coroutine(coro)) return fn modify = _modify() c.modify = modify c.connection = mock.MagicMock() c.connection.username = "existing_user" with pytest.raises(PhiUnauthorized) as e: await c.modify_password("randombytes") modify.assert_called_with("userPassword", hash_pass("randombytes")) assert f"uid=mock_connection,ou=Services,{BASE_DN}" in str(e.value.user) @pytest.mark.asyncio async def test_User_verify_password(client_fixture): c = User("existing_user", client_fixture) res_true = await c.verify_password("password") res_false = await c.verify_password("wrong") assert res_true assert not res_false @pytest.mark.asyncio async def test_User_verify_password_raises_not_existing(client_fixture): c = User("not_existing", client_fixture) with pytest.raises(PhiEntryDoesNotExist) as e: await c.verify_password("randombytes") assert f"uid=not_existing,ou=Hackers,{BASE_DN}" in str(e.value) @pytest.mark.asyncio async def test_User_verify_password_raises_unauthorized(client_fixture): c = User("missing_auth_user", client_fixture) with pytest.raises(PhiUnauthorized) as e: await c.verify_password("password") assert f"uid=mock_connection,ou=Services,{BASE_DN}" in str(e.value.user) def test_Service(client_fixture): c = Service("phi", client_fixture) assert c.kind == "uid" assert c.name == "phi" assert c.dn == "uid=phi,ou=Services,{}".format(BASE_DN) assert repr(c) == f"" def test_Service_unsettable_name(client_fixture): c = Service("phi", client_fixture) with pytest.raises(RuntimeError) as e: c.name = "theta" assert "Name property is not modifiable." in str(e.value) def test_Service_singleton(client_fixture): other_client = MockClient(BASE_DN) c1 = Service("phi", client_fixture) c2 = Service("irc", client_fixture) c3 = Service("phi", client_fixture) c4 = Service("phi", other_client) assert client_fixture is not other_client assert c1 is c3 assert c2 is not c1 assert c4 is not c1 @pytest.mark.asyncio async def test_Service_create_existing(client_fixture): s = Service("existing_service", client_fixture) with pytest.raises(PhiEntryExists) as e: await s.create("password") assert s.dn in str(e.value) @pytest.mark.asyncio async def test_Service_create_not_existing(client_fixture): s = Service("not_existing", client_fixture) await s.create("password") assert client_fixture.called_with_args["search"]["args"] == (s.dn, 0) @pytest.mark.asyncio async def test_Service_sync_existing(client_fixture): s = Service("existing_service", client_fixture) await s.sync() assert client_fixture.called_with_args["search"]["args"] == (s.dn, 0) @pytest.mark.asyncio async def test_Service_sync_non_existing(client_fixture): pass def test_Group(client_fixture): c = Group("amici_miei", client_fixture) assert c.kind == "cn" assert c.name == "amici_miei" assert c.dn == "cn=amici_miei,ou=Groups,{}".format(BASE_DN) assert repr(c) == f"" def test_Group_unsettable_name(client_fixture): c = Group("amici_miei", client_fixture) with pytest.raises(RuntimeError) as e: c.name = "nemici" assert "Name property is not modifiable." in str(e.value) def test_Group_singleton(client_fixture): other_client = MockClient(BASE_DN) c1 = Group("amici_miei", client_fixture) c2 = Group("antani", client_fixture) c3 = Group("amici_miei", client_fixture) c4 = Group("amici_miei", other_client) assert client_fixture is not other_client assert c1 is c3 assert c2 is not c1 assert c4 is not c1