User modify_password and verify_password. Failing Service tests.
This commit is contained in:
parent
7634f0f530
commit
ce4c085e3f
|
@ -558,6 +558,7 @@ async def test_User_modify_not_existing(client_fixture):
|
|||
assert c.dn in str(e.value)
|
||||
assert "snafu" in str(e.value)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_User_remove_existing(client_fixture, caplog):
|
||||
c = User("existing_user", client_fixture)
|
||||
|
@ -595,6 +596,79 @@ async def test_User_remove_not_existing(client_fixture):
|
|||
|
||||
assert c.dn in str(e.value)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_User_modify_password(client_fixture, caplog):
|
||||
c = User("existing_user", client_fixture)
|
||||
|
||||
def _modify():
|
||||
coro = mock.Mock(name="coroutine")
|
||||
fn = mock.MagicMock(side_effect=asyncio.coroutine(coro))
|
||||
return fn
|
||||
|
||||
modify = _modify()
|
||||
c.modify = modify
|
||||
|
||||
await c.modify_password("new-password")
|
||||
|
||||
modify.assert_called_with("userPassword", hash_pass("new-password"))
|
||||
assert "User(existing_user): password modified" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_User_modify_password_raises(client_fixture):
|
||||
c = User("existing_user", client_fixture)
|
||||
|
||||
def _modify():
|
||||
coro = mock.Mock(
|
||||
name="coroutine", side_effect=PhiAttributeMissing("TESTDN", "TESTATTR")
|
||||
)
|
||||
fn = mock.MagicMock(side_effect=asyncio.coroutine(coro))
|
||||
return fn
|
||||
|
||||
modify = _modify()
|
||||
c.modify = modify
|
||||
c.connection = mock.MagicMock()
|
||||
c.connection.username = "existing_user"
|
||||
|
||||
with pytest.raises(PhiUnauthorized) as e:
|
||||
await c.modify_password("randombytes")
|
||||
|
||||
modify.assert_called_with("userPassword", hash_pass("randombytes"))
|
||||
assert f"uid=mock_connection,ou=Services,{BASE_DN}" in str(e.value.user)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_User_verify_password(client_fixture):
|
||||
c = User("existing_user", client_fixture)
|
||||
|
||||
res_true = await c.verify_password("password")
|
||||
res_false = await c.verify_password("wrong")
|
||||
|
||||
assert res_true
|
||||
assert not res_false
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_User_verify_password_raises_not_existing(client_fixture):
|
||||
c = User("not_existing", client_fixture)
|
||||
|
||||
with pytest.raises(PhiEntryDoesNotExist) as e:
|
||||
await c.verify_password("randombytes")
|
||||
|
||||
assert f"uid=not_existing,ou=Hackers,{BASE_DN}" in str(e.value)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_User_verify_password_raises_unauthorized(client_fixture):
|
||||
c = User("missing_auth_user", client_fixture)
|
||||
|
||||
with pytest.raises(PhiUnauthorized) as e:
|
||||
await c.verify_password("password")
|
||||
|
||||
assert f"uid=mock_connection,ou=Services,{BASE_DN}" in str(e.value.user)
|
||||
|
||||
|
||||
def test_Service(client_fixture):
|
||||
c = Service("phi", client_fixture)
|
||||
|
||||
|
@ -626,6 +700,28 @@ def test_Service_singleton(client_fixture):
|
|||
assert c4 is not c1
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
@pytest.mark.asyncio
|
||||
async def test_Service_create_existing(client_fixture):
|
||||
s = Service("existing_user", client_fixture)
|
||||
|
||||
with pytest.raises(PhiEntryExists) as e:
|
||||
await s.create("existing@mail.org", sn="exists", cn="Existing Service")
|
||||
|
||||
assert s.dn in str(e.value)
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
@pytest.mark.asyncio
|
||||
async def test_Service_create_not_existing(client_fixture):
|
||||
s = Service("not_existing", client_fixture)
|
||||
|
||||
await s.create("not@existing.org")
|
||||
|
||||
assert client_fixture.called_with_args["search"]["args"] == (s.dn, 0)
|
||||
assert s._entry["mail"][0] == "not@existing.org"
|
||||
|
||||
|
||||
def test_Group(client_fixture):
|
||||
c = Group("amici_miei", client_fixture)
|
||||
|
||||
|
|
|
@ -298,6 +298,28 @@ class User(Hackers):
|
|||
except NoSuchObjectError:
|
||||
raise PhiEntryDoesNotExist(self.dn)
|
||||
|
||||
async def modify_password(self, new_pass, old_pass=None):
|
||||
try:
|
||||
await self.modify("userPassword", hash_pass(new_pass))
|
||||
except PhiAttributeMissing:
|
||||
raise PhiUnauthorized(
|
||||
user=self.client.username,
|
||||
)
|
||||
alog.info("User(%s): password modified", self.name)
|
||||
|
||||
async def verify_password(self, given_pass):
|
||||
async with self.client.connect(is_async=True) as conn:
|
||||
res = await conn.search(self.dn, 0)
|
||||
if len(res) == 0:
|
||||
raise PhiEntryDoesNotExist(self.dn)
|
||||
try:
|
||||
match_pass = res[0]["userPassword"][0] == hash_pass(given_pass)
|
||||
except KeyError:
|
||||
raise PhiUnauthorized(
|
||||
user=self.client.username,
|
||||
)
|
||||
return match_pass
|
||||
|
||||
|
||||
class PhiEntryExists(Exception):
|
||||
def __init__(self, dn):
|
||||
|
@ -356,6 +378,14 @@ class Service(Robots):
|
|||
def name(self, name):
|
||||
raise RuntimeError("Name property is not modifiable.")
|
||||
|
||||
async def create(self, password):
|
||||
async with self.client.connect(is_async=True) as conn:
|
||||
res = await conn.search(self.dn, 0)
|
||||
if len(res) > 0:
|
||||
raise PhiEntryExists(self.dn)
|
||||
self._entry = await create_new_(self, uid=self.name, userPassword=password)
|
||||
alog.info("Service(%s): created", self.name)
|
||||
|
||||
|
||||
class Group(Congregations):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue
Block a user