Users and group dictionaries

This commit is contained in:
uid 2020-10-05 14:51:16 +02:00
parent 8efd4bd75c
commit 6d14ed9246
5 changed files with 174 additions and 159 deletions

View File

@ -1,124 +0,0 @@
from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, MODIFY_ADD
from phi.logging import get_logger
log = get_logger(__name__)
def get_response(client, response_id):
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if result['description'] is not 'success':
raise Exception(result['description'])
return response
def get_entry_by_uid(client, uid):
log.info("Searching entry with identifier: {}".format(uid))
filter_ = "({}={})".format('uid', uid)
log.debug("Search filter: {}".format(filter_))
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
return response[0]
def get_entries_by_ou(client, ou):
log.info("Searching entries with organizational unit: {}".format(ou))
dn = 'ou={},{}'.format(ou, client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=person)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
return response
def get_group_by_cn(client, cn):
log.info("Searching groups with common name: {}".format(cn))
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=groupOfNames)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
return response[0]
def add_entry(client, dn, attributes):
log.info('Adding entry with distinguiscet name: {}'
'and attributes {}'.format(dn, attributes))
response_id = client.connection.add(dn, attributes=attributes)
response = get_response(client, response_id)
return response
def delete_entry(client, dn):
log.info('Deleting entry with distinguiscet name: {}')
response_id = client.connection.delete(dn)
response = get_response(client, response_id)
return response
def get_group_members(client, group_cn):
group = get_group_by_cn(client, group_cn)
members = group['attributes']['member']
# log.debug('Found members: {}'.format(members))
return members
def add_group_member(client, group_cn, member_uid):
member_dn = 'uid={},ou=Hackers,dc=unit,dc=macaomilano,dc=org'.format(
member_uid)
group_dn = 'cn={},ou=Groups,dc=unit,dc=macaomilano,dc=org'.format(
group_cn)
# log.debug('Found adding {} to {}'.format(member_uid, group_cn))
response_id = client.connection.modify(
group_dn,
{'member': [(MODIFY_ADD, [member_dn])]}
)
return get_response(client, response_id)

47
src/phi/ldap/group.py Normal file
View File

@ -0,0 +1,47 @@
from ldap3 import ALL_ATTRIBUTES, MODIFY_ADD
from phi.ldap.utils import get_response, make_group_dict
from phi.logging import get_logger
log = get_logger(__name__)
def get_group_by_cn(client, cn):
log.info("Searching groups with common name: {}".format(cn))
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=groupOfNames)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
group = make_group_dict(client, response[0])
return group
# TODO: get_all_groups -> [group_dicts]
def add_group_member(client, group, user):
group_dn = group['dn']
member_dn = user['dn']
log.debug('Found adding {} to {}'.format(member_dn, group_dn))
response_id = client.connection.modify(
group_dn,
{'member': [(MODIFY_ADD, [member_dn])]}
)
return get_response(client, response_id)

View File

@ -1,35 +1,50 @@
from phi.ldap.entry import get_entry_by_uid, get_entries_by_ou, \
add_entry, delete_entry
from ldap3 import ALL_ATTRIBUTES
from phi.ldap.utils import get_response, make_user_dict, add_entry, delete_entry
from phi.logging import get_logger
shown_keys = ['uid', 'mail', 'createTimestamp', 'modifyTimestamp']
def flatten_attribute(attr):
if isinstance(attr, list) and len(attr)==1:
return attr[0]
else:
return attr
def flatten_user(entry):
return {k: flatten_attribute(attr)
for k, attr in entry['attributes'].items()
if k in shown_keys}
log = get_logger(__name__)
def get_user_by_uid(client, uid):
entry = get_entry_by_uid(client, uid)
log.info("Searching entry with identifier: {}".format(uid))
if not entry:
filter_ = "({}={})".format('uid', uid)
log.debug("Search filter: {}".format(filter_))
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
return flatten_user(entry)
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
return make_user_dict(client, response[0])
def get_users_by_ou(client, ou):
entries = get_entries_by_ou(client, ou)
users = [flatten_user(entry) for entry in entries]
def get_all_users(client):
log.info("Searching all the users")
dn = 'ou=Hackers,{}'.format(client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=person)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
users = [make_user_dict(client, entry) for entry in response]
return users
@ -50,6 +65,10 @@ def add_user(client, uid, cn, sn, mail):
add_entry(client, dn, attributes)
def delete_user(client, uid):
def delete_user(client, user):
delete_entry(client, user['dn'])
def delete_user_by_uid(client, uid):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
delete_entry(client, dn)

64
src/phi/ldap/utils.py Normal file
View File

@ -0,0 +1,64 @@
from phi.logging import get_logger
log = get_logger(__name__)
def flatten_attribute(attr):
if isinstance(attr, list) and len(attr)==1:
return attr[0]
else:
return attr
def flatten_entry(entry):
return {k: flatten_attribute(attr)
for k, attr in entry['attributes'].items()}
def make_user_dict(client, entry):
user = flatten_entry(entry)
dn = 'uid={},ou=Hackers,{}'.format(user['uid'], client.base_dn)
user['dn'] = dn
return user
def make_group_dict(client, entry):
group = flatten_entry(entry)
dn = 'cn={},ou=Groups,{}'.format(group['cn'], client.base_dn)
group['dn'] = dn
# unflatten members, they have to be a list even is there is a single one
group['member'] = entry['attributes']['member']
return group
def get_response(client, response_id):
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if result['description'] is not 'success':
raise Exception(result['description'])
return response
def add_entry(client, dn, attributes):
log.info('Adding entry with distinguiscet name: {}'
'and attributes {}'.format(dn, attributes))
response_id = client.connection.add(dn, attributes=attributes)
response = get_response(client, response_id)
return response
def delete_entry(client, dn):
log.info('Deleting entry with distinguiscet name: {}')
response_id = client.connection.delete(dn)
response = get_response(client, response_id)
return response

View File

@ -1,7 +1,7 @@
from phi.ldap.user import get_user_by_uid, get_users_by_ou, \
add_user, delete_user
from phi.ldap.user import get_user_by_uid, get_all_users, \
add_user, delete_user_by_uid, delete_user
from phi.ldap.entry import add_group_member, get_group_members
from phi.ldap.group import add_group_member, get_group_by_cn
def test_connection(ldap_client):
@ -16,7 +16,7 @@ def test_get_user_by_id(ldap_client):
def test_get_users_by_ou(ldap_client):
users = get_users_by_ou(ldap_client, 'Hackers')
users = get_all_users(ldap_client)
assert 'conte_mascetti' in [u['uid'] for u in users]
@ -33,7 +33,7 @@ def test_add_delete_user(ldap_client):
assert user['uid'] == uid
assert user['mail'] == mail
delete_user(ldap_client, uid)
delete_user(ldap_client, user)
user = get_user_by_uid(ldap_client, uid)
assert user is None
@ -53,7 +53,7 @@ def test_failing_delete_user(ldap_client):
uid = 'rosa_rossi'
try:
delete_user(ldap_client, uid)
delete_user_by_uid(ldap_client, uid)
except: # User already not existing
pass
else:
@ -67,16 +67,25 @@ def test_add_to_group(ldap_client):
member_uid = 'rosa_rossi'
add_user(client, member_uid, '.', '.', '.')
group_members = get_group_members(client, 'WikiUsers')
user = get_user_by_uid(client, member_uid)
print(user)
group = get_group_by_cn(client, group_cn)
group_members = group['member']
assert len(group_members) == 1
# print(group_members)
add_group_member(client, group_cn, member_uid)
add_group_member(client, group, user)
group = get_group_by_cn(client, group_cn)
group_members = group['member']
group_members = get_group_members(client, 'WikiUsers')
assert len(group_members) == 2
assert 'uid=rosa_rossi,ou=Hackers,dc=unit,dc=macaomilano,dc=org' \
in group_members
assert user['dn'] in group_members
# print(group_members)
delete_user(client, member_uid)
delete_user(client, user)
print(group)
# assert False