Clean abstraction

This commit is contained in:
uid 2020-10-04 17:45:09 +02:00
parent 0b91ee1f22
commit cdddc250fb
3 changed files with 93 additions and 59 deletions

View File

@ -5,6 +5,21 @@ from phi.logging import get_logger
log = get_logger(__name__)
def get_response(client, response_id):
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if result['description'] is not 'success':
raise Exception(result['description'])
return response
def get_entry_by_uid(client, uid):
log.info("Searching entry with identifier: {}".format(uid))
@ -17,18 +32,12 @@ def get_entry_by_uid(client, uid):
attributes=[ALL_ATTRIBUTES]
)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
response = get_response(client, response_id)
if not response:
return None
if response[1:]:
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
@ -48,8 +57,20 @@ def get_entries_by_ou(client, ou):
attributes=[ALL_ATTRIBUTES]
)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
response = get_response(client, response_id)
return response
def add_entry(client, dn, attributes):
log.info('Adding entry with distinguiscet name: {}'
'and attributes {}'.format(dn, attributes))
response_id = client.connection.add(dn, attributes=attributes)
response = get_response(client, response_id)
return response
def delete_entry(client, dn):
log.info('Deleting entry with distinguiscet name: {}')
response_id = client.connection.delete(dn)
response = get_response(client, response_id)
return response

View File

@ -1,4 +1,5 @@
from phi.ldap.entry import get_entry_by_uid, get_entries_by_ou
from phi.ldap.entry import get_entry_by_uid, get_entries_by_ou, \
add_entry, delete_entry
shown_keys = ['uid', 'mail', 'createTimestamp', 'modifyTimestamp']
@ -30,3 +31,25 @@ def get_users_by_ou(client, ou):
users = [flatten_user(entry) for entry in entries]
return users
def add_user(client, uid, cn, sn, mail):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
attributes={
'objectClass': [
'inetOrgPerson',
'organizationalPerson',
'person', 'top'
],
'cn': cn,
'sn': sn,
'mail': mail
}
add_entry(client, dn, attributes)
def delete_user(client, uid):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
delete_entry(client, dn)

View File

@ -1,4 +1,5 @@
from phi.ldap.user import get_user_by_uid, get_users_by_ou
from phi.ldap.user import get_user_by_uid, get_users_by_ou, \
add_user, delete_user
def test_connection(ldap_client):
@ -7,9 +8,9 @@ def test_connection(ldap_client):
def test_get_user_by_id(ldap_client):
entry = get_user_by_uid(ldap_client, 'conte_mascetti')
assert entry['uid'] == 'conte_mascetti'
assert entry['mail'] == 'rmascetti@autistici.org'
user = get_user_by_uid(ldap_client, 'conte_mascetti')
assert user['uid'] == 'conte_mascetti'
assert user['mail'] == 'rmascetti@autistici.org'
def test_get_users_by_ou(ldap_client):
@ -19,50 +20,39 @@ def test_get_users_by_ou(ldap_client):
def test_add_delete_user(ldap_client):
client = ldap_client
uid = 'rosa_rossi'
cn = 'Rosa'
sn = 'Rossi'
mail = 'foo@autistici.org'
add_user(ldap_client, uid, cn, sn, mail)
user = get_user_by_uid(ldap_client, uid)
assert user['uid'] == uid
assert user['mail'] == mail
delete_user(ldap_client, uid)
user = get_user_by_uid(ldap_client, uid)
assert user is None
def test_failing_add_user(ldap_client):
uid = 'conte_mascetti'
try:
add_user(ldap_client, uid, '.', '.', '.')
except: # User alrady existing
pass
else:
assert False
def test_failing_delete_user(ldap_client):
uid = 'rosa_rossi'
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
response_id = client.connection.add(
dn,
attributes={
'objectClass': [
'inetOrgPerson',
'organizationalPerson',
'person', 'top'
],
'cn': cn,
'sn': sn,
'mail': mail
}
)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
print("Request: {}".format(request))
print("Response: {}".format(response))
print("Result: {}".format(result))
entry = get_user_by_uid(ldap_client, uid)
assert entry['uid'] == uid
assert entry['mail'] == mail
response_id = client.connection.delete(dn)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
print("Request: {}".format(request))
print("Response: {}".format(response))
print("Result: {}".format(result))
entry = get_user_by_uid(ldap_client, uid)
assert entry is None
try:
delete_user(ldap_client, uid)
except: # User already not existing
pass
else:
assert False