Compare commits

..

No commits in common. "master" and "make-things-work-brutally" have entirely different histories.

14 changed files with 79 additions and 545 deletions

View File

@ -9,50 +9,3 @@ APIs for the Unit hacklab.
Requirements: Requirements:
* Python >= 3.5 * Python >= 3.5
Create a virtual environment and activate it (optional):
```
virtualenv --python=/usr/bin/python3 env
source env/bin/activate
```
Run the setup:
```
python setup.py install
```
## Setup
In the ldap section of `config.yml` change host, port and password according to
your setup.
## Command Line
```
usage: phicli [-h] [--config config.yml]
{showuser,adduser,deluser,showgroup,listgroups,addtogroup} ...
optional arguments:
-h, --help show this help message and exit
--config config.yml custom configuration file
actions:
showuser dispaly user fields
adduser add a new user
deluser delete an user
showgroup show a group
listgroups list all groups
addtogroup add an user to a group
```
```
phicli showuser [-h] user_id
phicli adduser [-h] user_id
phicli deluser [-h] user_id
phicli showgroup [-h] common_name
phicli listgroups [-h]
phicli addtogroup [-h] user_id group_common_name
```

View File

@ -14,12 +14,12 @@ ldap:
validate: True # Can either be True or False. Default: False validate: True # Can either be True or False. Default: False
ca_certs: openldap/cert.pem ca_certs: openldap/cert.pem
# username: uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org username: uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org
# password: phi password: phi
username: cn=root,dc=unit,dc=macaomilano,dc=org
password: root
base_dn: dc=unit,dc=macaomilano,dc=org base_dn: dc=unit,dc=macaomilano,dc=org
attribute_id: uid
attribute_mail: mail
logging: logging:
@ -40,10 +40,10 @@ logging:
loggers: loggers:
phi: phi:
level: WARNING level: DEBUG
handlers: [console, file] handlers: [console, file]
aiohttp: aiohttp:
level: WARNING level: DEBUG
handlers: [console, file] handlers: [console, file]
ldap3: ldap3:
level: WARNING level: WARNING

View File

@ -38,9 +38,3 @@ sn: Mascetti
mail: rmascetti@autistici.org mail: rmascetti@autistici.org
uid: conte_mascetti uid: conte_mascetti
userPassword: {SHA}oLY7P6V+DWaMJhix7vbMYGIfA+E= userPassword: {SHA}oLY7P6V+DWaMJhix7vbMYGIfA+E=
dn: cn=WikiUsers,ou=Groups,dc=unit,dc=macaomilano,dc=org
objectClass: groupOfNames
objectClass: top
cn: WikiUsers
member: uid=conte_mascetti,ou=Hackers,dc=unit,dc=macaomilano,dc=org

View File

@ -14,7 +14,7 @@ setup(
package_dir={'': 'src'}, package_dir={'': 'src'},
packages=['phi', 'phi.api', 'phi.ldap'], packages=['phi', 'phi.api', 'phi.ldap'],
scripts=['src/phid', 'src/phicli'], scripts=['src/phid'],
setup_requires=['pytest-runner'], setup_requires=['pytest-runner'],
install_requires=['pyYAML', 'ldap3'], install_requires=['pyYAML', 'ldap3'],

View File

@ -1,47 +0,0 @@
import sys
import argparse
import inspect
from phi.logging import setup_logging, get_logger
log = get_logger(__name__)
parser = argparse.ArgumentParser()
subparses = parser.add_subparsers(title='actions', dest='action')
cli_callbacks = {}
def register(action_info='', param_infos=[]):
def decorator(action):
# Get function name and arguments
action_name = action.__name__
param_names = inspect.getfullargspec(action)[0]
# Create subparser for specific action
subparser = subparses.add_parser(action_name, help=action_info)
for i, name in enumerate(param_names):
info = param_infos[i] if i<len(param_infos) else ''
subparser.add_argument(dest=name, help=info)
# Register action
cli_callbacks[action_name] = action, param_names
return action
return decorator
def run(args):
for action_name, (action, param_names) in cli_callbacks.items():
if args['action'] == action_name:
action(**{pname: args[pname] for pname in param_names})
def add_arg(name, example, info):
parser.add_argument(name, metavar=example, help=info)
def get_args():
namespace = parser.parse_args(sys.argv[1:])
args = namespace.__dict__
return args

View File

@ -13,16 +13,12 @@ CONFIG_FILES = [os.path.join(p, CONFIG_FILE)
for p in CONFIG_PATHS] for p in CONFIG_PATHS]
def get_config(custom_config=None): def get_config():
"""Return the path of the found configuration file and its content """Return the path of the found configuration file and its content
:returns: (path, config) :returns: (path, config)
:rtype: (str, dict) :rtype: (str, dict)
""" """
if custom_config:
global CONFIG_FILES
CONFIG_FILES = [custom_config]
for f in CONFIG_FILES: for f in CONFIG_FILES:
try: try:
with open(f, 'r') as c: with open(f, 'r') as c:
@ -35,10 +31,5 @@ def get_config(custom_config=None):
# in any of CONFIG_PATHS. # in any of CONFIG_PATHS.
pass pass
else: else:
if custom_config: raise FileNotFoundError("Could not find {} in any of {}."
raise FileNotFoundError('Config file {} not found.' .format(CONFIG_FILE, ', '.join(CONFIG_PATHS)))
.format(custom_config))
else:
raise FileNotFoundError("Could not find {} in any of {}."
.format(CONFIG_FILE,
', '.join(CONFIG_PATHS)))

View File

@ -14,7 +14,8 @@ class Client:
host=None, port=389, host=None, port=389,
encryption=None, ciphers=None, validate=False, ca_certs=None, encryption=None, ciphers=None, validate=False, ca_certs=None,
username=None, password=None, username=None, password=None,
base_dn=None): base_dn=None,
attribute_id='uid', attribute_mail='mail'):
log.info("Initializing LDAP Client.") log.info("Initializing LDAP Client.")
self.host = host self.host = host
@ -30,6 +31,9 @@ class Client:
self.base_dn = base_dn self.base_dn = base_dn
self.attribute_id = attribute_id
self.attribute_mail = attribute_mail
self.connection_lock = Lock() self.connection_lock = Lock()
self.connection = make_connection(host=self.host, port=self.port, self.connection = make_connection(host=self.host, port=self.port,
encryption=self.encryption, encryption=self.encryption,

36
src/phi/ldap/entry.py Normal file
View File

@ -0,0 +1,36 @@
from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
from phi.logging import get_logger
log = get_logger(__name__)
def get_entry_by_uid(client, uid):
log.info("Searching entry with identifier: {}".format(uid))
filter_ = "({}={})".format(client.attribute_id, uid)
log.debug("Search filter: {}".format(filter_))
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if not response:
return None
if response[1:]:
log.erorr("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
return response[0]

View File

@ -1,61 +0,0 @@
from ldap3 import ALL_ATTRIBUTES, MODIFY_ADD
from phi.ldap.utils import get_response, make_group_dict
from phi.logging import get_logger
log = get_logger(__name__)
def get_group_by_cn(client, cn):
log.info("Searching groups with common name: {}".format(cn))
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=groupOfNames)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
group = make_group_dict(client, response[0])
return group
def get_all_groups(client):
log.info("Searching all the groups")
dn = 'ou=Groups,{}'.format(client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=groupOfNames)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
groups = [make_group_dict(client, entry) for entry in response]
return groups
def add_group_member(client, group, user):
group_dn = group['dn']
member_dn = user['dn']
log.debug('Found adding {} to {}'.format(member_dn, group_dn))
response_id = client.connection.modify(
group_dn,
{'member': [(MODIFY_ADD, [member_dn])]}
)
return get_response(client, response_id)

View File

@ -1,77 +1,26 @@
from ldap3 import ALL_ATTRIBUTES, HASHED_SALTED_SHA from phi.ldap.entry import get_entry_by_uid
from ldap3.utils.hashed import hashed from phi.ldap.utils import flatten_attributes
from phi.ldap.utils import get_response, make_user_dict, add_entry, delete_entry
from phi.logging import get_logger
log = get_logger(__name__)
def user_attributes_mapping(client):
return {
client.attribute_id: 'uid',
client.attribute_mail: 'mail',
'createTimestamp': 'created_at',
'modifyTimestamp': 'modified_at'
}
def get_user_by_uid(client, uid): def get_user_by_uid(client, uid):
log.info("Searching entry with identifier: {}".format(uid)) entry = get_entry_by_uid(client, uid)
filter_ = "({}={})".format('uid', uid) if not entry:
log.debug("Search filter: {}".format(filter_))
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None return None
if len(response) > 1: mapping = user_attributes_mapping(client)
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
return make_user_dict(client, response[0]) user = {mapping[k]: v
for k, v in entry['attributes'].items()
if k in mapping.keys()}
return flatten_attributes(user)
def get_all_users(client):
log.info("Searching all the users")
dn = 'ou=Hackers,{}'.format(client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=person)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
users = [make_user_dict(client, entry) for entry in response]
return users
def add_user(client, uid, cn, sn, mail, password):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
hashed_password = hashed(HASHED_SALTED_SHA, password)
attributes={
'objectClass': [
'inetOrgPerson',
'organizationalPerson',
'person', 'top'
],
'cn': cn,
'sn': sn,
'mail': mail,
'userPassword': hashed_password
}
add_entry(client, dn, attributes)
def delete_user(client, user):
delete_entry(client, user['dn'])
def delete_user_by_uid(client, uid):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
delete_entry(client, dn)

View File

@ -1,70 +1,3 @@
import re def flatten_attributes(d):
from phi.logging import get_logger return {k: (v[0] if isinstance(v, list) else v)
for k, v in d.items()}
log = get_logger(__name__)
def make_user_dict(client, entry):
attributes = entry['attributes']
user = {}
user['uid'] = attributes['uid'][0]
user['dn'] = 'uid={},ou=Hackers,{}'.format(user['uid'], client.base_dn)
user['cn'] = attributes['cn'][0]
user['sn'] = attributes['sn'][0]
user['mail'] = attributes['mail'][0]
user['password'] = attributes['userPassword'][0]
return user
def get_uid_from_dn(client, dn):
uid = re.search('uid=(.+?),ou=Hackers,{}'.format(client.base_dn),
dn).group(1)
return uid
def make_group_dict(client, entry):
attributes = entry['attributes']
cn = attributes['cn'][0]
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
members = [get_uid_from_dn(client, u_dn)
for u_dn in attributes['member']]
group = {}
group['dn'] = dn
group['cn'] = cn
group['members'] = members
return group
def get_response(client, response_id):
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if result['description'] is not 'success':
raise Exception(result['description'])
return response
def add_entry(client, dn, attributes):
log.info('Adding entry with distinguiscet name: {}'
'and attributes {}'.format(dn, attributes))
response_id = client.connection.add(dn, attributes=attributes)
response = get_response(client, response_id)
return response
def delete_entry(client, dn):
log.info('Deleting entry with distinguiscet name: {}')
response_id = client.connection.delete(dn)
response = get_response(client, response_id)
return response

View File

@ -1,130 +0,0 @@
#!/usr/bin/env python3
from pprint import pformat as pp
from getpass import getpass
from phi.config import get_config
from phi.logging import setup_logging, get_logger
from phi import cli
import phi.ldap.client
from phi.ldap.user import get_user_by_uid, add_user, delete_user
from phi.ldap.group import get_group_by_cn, get_all_groups, add_group_member
log = get_logger(__name__)
@cli.register('dispaly user fields', ['user identifier'])
def showuser(uid):
user = get_user_by_uid(client, uid)
if user is None:
print('User {} not found'.format(uid))
return
print(pp(user))
@cli.register('add a new user', ['user identifier'])
def adduser(uid):
def ask(prompt, default):
full_prompt = '{} [{}] '.format(prompt, default)
return input(full_prompt) or default
user = get_user_by_uid(client, uid)
if user is not None:
print("User {} already existing".format(uid))
return
cn = ask('Common name:', uid)
sn = ask('Last name:', uid)
mail = ask('Mail:', '{}@localhost'.format(uid))
password = getpass()
pass_check = getpass('Retype password: ')
if password != pass_check:
print('Password not matching')
return
add_user(client, uid, cn, sn, mail, password)
# Check
user = get_user_by_uid(client, uid)
print()
print(pp(user))
@cli.register('delete an user', ['user identifier'])
def deluser(uid):
check = input('Are you sure? [y/N] ') or 'N'
if check.lower() != 'y':
print('Ok then')
return
user = get_user_by_uid(client, uid)
if user is not None:
delete_user(client, user)
print('Done')
else:
print('User {} not found'.format(uid))
@cli.register('show a group', ['group common name'])
def showgroup(cn):
group = get_group_by_cn(client, cn)
if group is None:
print('Group {} not found'.format(gcn))
return
print(pp(group))
@cli.register('list all groups')
def listgroups():
groups = get_all_groups(client)
for group in groups:
print(group['cn'])
@cli.register('add an user to a group',
['user identifier', 'group common name'])
def addtogroup(uid, gcn):
user = get_user_by_uid(client, uid)
group = get_group_by_cn(client, gcn)
if user is None:
print('User {} not found'.format(uid))
return
if group is None:
print('Group {} not found'.format(gcn))
return
if uid in group['members']:
print('User {} is already in group {}'.format(uid, gcn))
return
add_group_member(client, group, user)
if __name__ == '__main__':
cli.add_arg('--config', 'config.yml', 'custom configuration file')
args = cli.get_args()
config_file = args['config']
config_file, config = get_config(config_file)
setup_logging(config.get('logging', {}))
log.info("Using configuration at '{}':\n{}"
.format(config_file, pp(config)))
# TODO: check fields in config
client = phi.ldap.client.Client(**config['ldap'])
log.info('Opening LDAP client')
client.open()
log.info('Arguments: {}'.format(pp(args)))
cli.run(args)
log.info('Closing LDAP client')
client.close()

View File

@ -9,11 +9,10 @@ def ldap_client():
host='localhost', port=389, host='localhost', port=389,
encryption='TLSv1.2', ciphers='HIGH', encryption='TLSv1.2', ciphers='HIGH',
validate=False, validate=False,
# username='uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org', username='uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
# password='phi', password='phi',
username='cn=root,dc=unit,dc=macaomilano,dc=org', base_dn='dc=unit,dc=macaomilano,dc=org',
password='root', attribute_id='uid', attribute_mail='mail')
base_dn='dc=unit,dc=macaomilano,dc=org')
client.open() client.open()
yield client yield client
client.close() client.close()

View File

@ -1,7 +1,4 @@
from phi.ldap.user import get_user_by_uid, get_all_users, \ from phi.ldap.user import get_user_by_uid
add_user, delete_user_by_uid, delete_user
from phi.ldap.group import add_group_member, get_group_by_cn, get_all_groups
def test_connection(ldap_client): def test_connection(ldap_client):
@ -10,90 +7,6 @@ def test_connection(ldap_client):
def test_get_user_by_id(ldap_client): def test_get_user_by_id(ldap_client):
user = get_user_by_uid(ldap_client, 'conte_mascetti') entry = get_user_by_uid(ldap_client, 'conte_mascetti')
assert user['uid'] == 'conte_mascetti' assert entry['uid'] == 'conte_mascetti'
assert user['mail'] == 'rmascetti@autistici.org' assert entry['mail'] == 'rmascetti@autistici.org'
def test_get_all_users(ldap_client):
users = get_all_users(ldap_client)
# print(users)
assert 'conte_mascetti' in [u['uid'] for u in users]
def test_add_delete_user(ldap_client):
uid = 'rosa_rossi'
cn = 'Rosa'
sn = 'Rossi'
mail = 'foo@autistici.org'
password = 'changeme'
add_user(ldap_client, uid, cn, sn, mail, password)
user = get_user_by_uid(ldap_client, uid)
assert user['uid'] == uid
assert user['mail'] == mail
delete_user(ldap_client, user)
# print(user)
user = get_user_by_uid(ldap_client, uid)
assert user is None
def test_failing_add_user(ldap_client):
uid = 'conte_mascetti'
try:
add_user(ldap_client, uid, 'name', 'surname', 'mail', 'pass')
except: # User alrady existing
pass
else:
assert False
def test_failing_delete_user(ldap_client):
uid = 'rosa_rossi'
try:
delete_user_by_uid(ldap_client, uid)
except: # User already not existing
pass
else:
assert False
def test_get_all_groups(ldap_client):
groups = get_all_groups(ldap_client)
cns = [g['cn'] for g in groups]
assert 'WikiUsers' in cns
def test_add_to_group(ldap_client):
client = ldap_client
group_cn = 'WikiUsers'
member_uid = 'rosa_rossi'
add_user(client, member_uid, 'name', 'surname', 'mail', 'pass')
user = get_user_by_uid(client, member_uid)
# print(user)
group = get_group_by_cn(client, group_cn)
group_members = group['members']
assert len(group_members) == 1
# print(group_members)
add_group_member(client, group, user)
group = get_group_by_cn(client, group_cn)
group_members = group['members']
assert len(group_members) == 2
assert user['uid'] in group_members
# print(group_members)
# print(user)
delete_user(client, user)