Search by organizational unit

This commit is contained in:
User Identifier 2020-09-30 18:42:28 +02:00
parent 3e299f96b1
commit dbf6ca966f
3 changed files with 49 additions and 14 deletions

View File

@ -14,7 +14,7 @@ def get_entry_by_uid(client, uid):
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
attributes=[ALL_ATTRIBUTES]
)
response, result, request = client.connection.get_response(
@ -34,3 +34,22 @@ def get_entry_by_uid(client, uid):
.format(len(response)))
return response[0]
def get_entries_by_ou(client, ou):
log.info("Searching entries with organizational unit: {}".format(ou))
dn = 'ou={},{}'.format(ou, client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=person)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
return response

View File

@ -1,22 +1,32 @@
from phi.ldap.entry import get_entry_by_uid
from phi.ldap.entry import get_entry_by_uid, get_entries_by_ou
shown_keys = ['uid', 'mail', 'createTimestamp', 'modifyTimestamp']
def flatten_attribute(attr):
if isinstance(attr, list) and len(attr)==1:
return attr[0]
else:
return attr
def flatten_user(entry):
return {k: flatten_attribute(attr)
for k, attr in entry['attributes'].items()
if k in shown_keys}
def get_user_by_uid(client, uid):
shown_keys = ['uid', 'mail', 'createTimestamp', 'modifyTimestamp']
entry = get_entry_by_uid(client, uid)
if not entry:
return None
def flatten(attr):
if isinstance(attr, list) and len(attr)==1:
return attr[0]
else:
return attr
return flatten_user(entry)
user = {k: flatten(attr)
for k, attr in entry['attributes'].items()
if k in shown_keys}
return user
def get_users_by_ou(client, ou):
entries = get_entries_by_ou(client, ou)
users = [flatten_user(entry) for entry in entries]
return users

View File

@ -1,4 +1,4 @@
from phi.ldap.user import get_user_by_uid
from phi.ldap.user import get_user_by_uid, get_users_by_ou
def test_connection(ldap_client):
@ -10,3 +10,9 @@ def test_get_user_by_id(ldap_client):
entry = get_user_by_uid(ldap_client, 'conte_mascetti')
assert entry['uid'] == 'conte_mascetti'
assert entry['mail'] == 'rmascetti@autistici.org'
def test_get_users_by_ou(ldap_client):
users = get_users_by_ou(ldap_client, 'Hackers')
assert 'conte_mascetti' in [u['uid'] for u in users]