Compare commits

...

24 Commits

Author SHA1 Message Date
uid d90151ce25 Handle user already in group 2020-10-12 18:55:36 +02:00
uid 1333c03954 Nicer prints 2020-10-12 18:42:09 +02:00
uid eb9827bb33 Lower log level 2020-10-12 18:38:13 +02:00
uid 6003807e44 Update readme 2020-10-11 22:03:21 +02:00
uid 69959d0b18 Add to group 2020-10-11 14:53:32 +02:00
uid 8b901ca5d5 Add and delete users 2020-10-11 12:41:44 +02:00
uid ae81ec3885 Move cli abstraction 2020-10-11 11:55:36 +02:00
uid d762251476 Command line sugar 2020-10-11 01:11:56 +02:00
uid a5bb63fb14 Show user 2020-10-10 23:27:08 +02:00
uid d6f48e4861 Load custom configs 2020-10-10 21:14:21 +02:00
uid 75e2ee1b04 Begin phicli 2020-10-10 18:45:16 +02:00
uid 86e8ed669b Hash password 2020-10-10 12:59:41 +02:00
uid affcc47fe9 Clean user and group dictionaries 2020-10-10 12:54:28 +02:00
uid b46b4fa01e Get all groups 2020-10-10 11:25:14 +02:00
uid 6d14ed9246 Users and group dictionaries 2020-10-05 14:51:16 +02:00
uid 8efd4bd75c Add to groups 2020-10-04 23:01:38 +02:00
uid cdddc250fb Clean abstraction 2020-10-04 17:45:09 +02:00
User Identifier 0b91ee1f22 Delete user after adding it 2020-10-04 16:47:33 +02:00
User Identifier 81b81c6a50 Log as root 2020-10-04 14:23:26 +02:00
User Identifier 93328aa2cf Failing add user test 2020-10-03 18:42:18 +02:00
User Identifier dbf6ca966f Search by organizational unit 2020-09-30 18:42:28 +02:00
User Identifier 3e299f96b1 Fix typo 2020-09-30 16:42:53 +02:00
User Identifier 4dd11a881e Remove attribute mapping 2020-09-30 16:38:31 +02:00
crudo 7883b2d1c0 Make tests run again
I had to remove all the aiohttp crap for the moment.
2020-09-30 12:43:15 +02:00
15 changed files with 573 additions and 107 deletions

View File

@ -9,3 +9,50 @@ APIs for the Unit hacklab.
Requirements:
* Python >= 3.5
Create a virtual environment and activate it (optional):
```
virtualenv --python=/usr/bin/python3 env
source env/bin/activate
```
Run the setup:
```
python setup.py install
```
## Setup
In the ldap section of `config.yml` change host, port and password according to
your setup.
## Command Line
```
usage: phicli [-h] [--config config.yml]
{showuser,adduser,deluser,showgroup,listgroups,addtogroup} ...
optional arguments:
-h, --help show this help message and exit
--config config.yml custom configuration file
actions:
showuser dispaly user fields
adduser add a new user
deluser delete an user
showgroup show a group
listgroups list all groups
addtogroup add an user to a group
```
```
phicli showuser [-h] user_id
phicli adduser [-h] user_id
phicli deluser [-h] user_id
phicli showgroup [-h] common_name
phicli listgroups [-h]
phicli addtogroup [-h] user_id group_common_name
```

View File

@ -14,12 +14,12 @@ ldap:
validate: True # Can either be True or False. Default: False
ca_certs: openldap/cert.pem
username: uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org
password: phi
# username: uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org
# password: phi
username: cn=root,dc=unit,dc=macaomilano,dc=org
password: root
base_dn: dc=unit,dc=macaomilano,dc=org
attribute_id: uid
attribute_mail: mail
logging:
@ -40,10 +40,10 @@ logging:
loggers:
phi:
level: DEBUG
level: WARNING
handlers: [console, file]
aiohttp:
level: DEBUG
level: WARNING
handlers: [console, file]
ldap3:
level: WARNING

View File

@ -38,3 +38,9 @@ sn: Mascetti
mail: rmascetti@autistici.org
uid: conte_mascetti
userPassword: {SHA}oLY7P6V+DWaMJhix7vbMYGIfA+E=
dn: cn=WikiUsers,ou=Groups,dc=unit,dc=macaomilano,dc=org
objectClass: groupOfNames
objectClass: top
cn: WikiUsers
member: uid=conte_mascetti,ou=Hackers,dc=unit,dc=macaomilano,dc=org

View File

@ -14,9 +14,9 @@ setup(
package_dir={'': 'src'},
packages=['phi', 'phi.api', 'phi.ldap'],
scripts=['src/phid'],
scripts=['src/phid', 'src/phicli'],
setup_requires=['pytest-runner'],
install_requires=['aiohttp==2.3.8', 'pyYAML', 'ldap3'],
tests_require=['pytest', 'pytest-aiohttp']
install_requires=['pyYAML', 'ldap3'],
tests_require=['pytest']
)

47
src/phi/cli.py 100644
View File

@ -0,0 +1,47 @@
import sys
import argparse
import inspect
from phi.logging import setup_logging, get_logger
log = get_logger(__name__)
parser = argparse.ArgumentParser()
subparses = parser.add_subparsers(title='actions', dest='action')
cli_callbacks = {}
def register(action_info='', param_infos=[]):
def decorator(action):
# Get function name and arguments
action_name = action.__name__
param_names = inspect.getfullargspec(action)[0]
# Create subparser for specific action
subparser = subparses.add_parser(action_name, help=action_info)
for i, name in enumerate(param_names):
info = param_infos[i] if i<len(param_infos) else ''
subparser.add_argument(dest=name, help=info)
# Register action
cli_callbacks[action_name] = action, param_names
return action
return decorator
def run(args):
for action_name, (action, param_names) in cli_callbacks.items():
if args['action'] == action_name:
action(**{pname: args[pname] for pname in param_names})
def add_arg(name, example, info):
parser.add_argument(name, metavar=example, help=info)
def get_args():
namespace = parser.parse_args(sys.argv[1:])
args = namespace.__dict__
return args

View File

@ -13,12 +13,16 @@ CONFIG_FILES = [os.path.join(p, CONFIG_FILE)
for p in CONFIG_PATHS]
def get_config():
def get_config(custom_config=None):
"""Return the path of the found configuration file and its content
:returns: (path, config)
:rtype: (str, dict)
"""
if custom_config:
global CONFIG_FILES
CONFIG_FILES = [custom_config]
for f in CONFIG_FILES:
try:
with open(f, 'r') as c:
@ -31,5 +35,10 @@ def get_config():
# in any of CONFIG_PATHS.
pass
else:
raise FileNotFoundError("Could not find {} in any of {}."
.format(CONFIG_FILE, ', '.join(CONFIG_PATHS)))
if custom_config:
raise FileNotFoundError('Config file {} not found.'
.format(custom_config))
else:
raise FileNotFoundError("Could not find {} in any of {}."
.format(CONFIG_FILE,
', '.join(CONFIG_PATHS)))

View File

@ -14,8 +14,7 @@ class Client:
host=None, port=389,
encryption=None, ciphers=None, validate=False, ca_certs=None,
username=None, password=None,
base_dn=None,
attribute_id='uid', attribute_mail='mail'):
base_dn=None):
log.info("Initializing LDAP Client.")
self.host = host
@ -31,9 +30,6 @@ class Client:
self.base_dn = base_dn
self.attribute_id = attribute_id
self.attribute_mail = attribute_mail
self.connection_lock = Lock()
self.connection = make_connection(host=self.host, port=self.port,
encryption=self.encryption,

View File

@ -1,36 +0,0 @@
from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
from phi.logging import get_logger
log = get_logger(__name__)
def get_entry_by_uid(client, uid):
log.info("Searching entry with identifier: {}".format(uid))
filter_ = "({}={})".format(client.attribute_id, uid)
log.debug("Search filter: {}".format(filter_))
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
)
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if not response:
return None
if response[1:]:
log.erorr("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
return response[0]

View File

@ -0,0 +1,61 @@
from ldap3 import ALL_ATTRIBUTES, MODIFY_ADD
from phi.ldap.utils import get_response, make_group_dict
from phi.logging import get_logger
log = get_logger(__name__)
def get_group_by_cn(client, cn):
log.info("Searching groups with common name: {}".format(cn))
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=groupOfNames)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
group = make_group_dict(client, response[0])
return group
def get_all_groups(client):
log.info("Searching all the groups")
dn = 'ou=Groups,{}'.format(client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=groupOfNames)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
groups = [make_group_dict(client, entry) for entry in response]
return groups
def add_group_member(client, group, user):
group_dn = group['dn']
member_dn = user['dn']
log.debug('Found adding {} to {}'.format(member_dn, group_dn))
response_id = client.connection.modify(
group_dn,
{'member': [(MODIFY_ADD, [member_dn])]}
)
return get_response(client, response_id)

View File

@ -1,26 +1,77 @@
from phi.ldap.entry import get_entry_by_uid
from phi.ldap.utils import flatten_attributes
from ldap3 import ALL_ATTRIBUTES, HASHED_SALTED_SHA
from ldap3.utils.hashed import hashed
from phi.ldap.utils import get_response, make_user_dict, add_entry, delete_entry
from phi.logging import get_logger
def user_attributes_mapping(client):
return {
client.attribute_id: 'uid',
client.attribute_mail: 'mail',
'createTimestamp': 'created_at',
'modifyTimestamp': 'modified_at'
}
log = get_logger(__name__)
def get_user_by_uid(client, uid):
entry = get_entry_by_uid(client, uid)
log.info("Searching entry with identifier: {}".format(uid))
if not entry:
filter_ = "({}={})".format('uid', uid)
log.debug("Search filter: {}".format(filter_))
response_id = client.connection.search(
client.base_dn, filter_,
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
if not response:
return None
mapping = user_attributes_mapping(client)
if len(response) > 1:
log.error("Looking for exactly one result but server gave {}. "
"Taking the first and ignoring the rest."
.format(len(response)))
user = {mapping[k]: v
for k, v in entry['attributes'].items()
if k in mapping.keys()}
return make_user_dict(client, response[0])
return flatten_attributes(user)
def get_all_users(client):
log.info("Searching all the users")
dn = 'ou=Hackers,{}'.format(client.base_dn)
log.debug("Search dn: {}".format(dn))
response_id = client.connection.search(
dn, '(objectclass=person)',
search_scope='SUBTREE',
attributes=[ALL_ATTRIBUTES]
)
response = get_response(client, response_id)
users = [make_user_dict(client, entry) for entry in response]
return users
def add_user(client, uid, cn, sn, mail, password):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
hashed_password = hashed(HASHED_SALTED_SHA, password)
attributes={
'objectClass': [
'inetOrgPerson',
'organizationalPerson',
'person', 'top'
],
'cn': cn,
'sn': sn,
'mail': mail,
'userPassword': hashed_password
}
add_entry(client, dn, attributes)
def delete_user(client, user):
delete_entry(client, user['dn'])
def delete_user_by_uid(client, uid):
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
delete_entry(client, dn)

View File

@ -1,3 +1,70 @@
def flatten_attributes(d):
return {k: (v[0] if isinstance(v, list) else v)
for k, v in d.items()}
import re
from phi.logging import get_logger
log = get_logger(__name__)
def make_user_dict(client, entry):
attributes = entry['attributes']
user = {}
user['uid'] = attributes['uid'][0]
user['dn'] = 'uid={},ou=Hackers,{}'.format(user['uid'], client.base_dn)
user['cn'] = attributes['cn'][0]
user['sn'] = attributes['sn'][0]
user['mail'] = attributes['mail'][0]
user['password'] = attributes['userPassword'][0]
return user
def get_uid_from_dn(client, dn):
uid = re.search('uid=(.+?),ou=Hackers,{}'.format(client.base_dn),
dn).group(1)
return uid
def make_group_dict(client, entry):
attributes = entry['attributes']
cn = attributes['cn'][0]
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
members = [get_uid_from_dn(client, u_dn)
for u_dn in attributes['member']]
group = {}
group['dn'] = dn
group['cn'] = cn
group['members'] = members
return group
def get_response(client, response_id):
response, result, request = client.connection.get_response(
response_id, get_request=True
)
log.debug("Request: {}".format(request))
log.debug("Response: {}".format(response))
log.debug("Result: {}".format(result))
if result['description'] is not 'success':
raise Exception(result['description'])
return response
def add_entry(client, dn, attributes):
log.info('Adding entry with distinguiscet name: {}'
'and attributes {}'.format(dn, attributes))
response_id = client.connection.add(dn, attributes=attributes)
response = get_response(client, response_id)
return response
def delete_entry(client, dn):
log.info('Deleting entry with distinguiscet name: {}')
response_id = client.connection.delete(dn)
response = get_response(client, response_id)
return response

130
src/phicli 100755
View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
from pprint import pformat as pp
from getpass import getpass
from phi.config import get_config
from phi.logging import setup_logging, get_logger
from phi import cli
import phi.ldap.client
from phi.ldap.user import get_user_by_uid, add_user, delete_user
from phi.ldap.group import get_group_by_cn, get_all_groups, add_group_member
log = get_logger(__name__)
@cli.register('dispaly user fields', ['user identifier'])
def showuser(uid):
user = get_user_by_uid(client, uid)
if user is None:
print('User {} not found'.format(uid))
return
print(pp(user))
@cli.register('add a new user', ['user identifier'])
def adduser(uid):
def ask(prompt, default):
full_prompt = '{} [{}] '.format(prompt, default)
return input(full_prompt) or default
user = get_user_by_uid(client, uid)
if user is not None:
print("User {} already existing".format(uid))
return
cn = ask('Common name:', uid)
sn = ask('Last name:', uid)
mail = ask('Mail:', '{}@localhost'.format(uid))
password = getpass()
pass_check = getpass('Retype password: ')
if password != pass_check:
print('Password not matching')
return
add_user(client, uid, cn, sn, mail, password)
# Check
user = get_user_by_uid(client, uid)
print()
print(pp(user))
@cli.register('delete an user', ['user identifier'])
def deluser(uid):
check = input('Are you sure? [y/N] ') or 'N'
if check.lower() != 'y':
print('Ok then')
return
user = get_user_by_uid(client, uid)
if user is not None:
delete_user(client, user)
print('Done')
else:
print('User {} not found'.format(uid))
@cli.register('show a group', ['group common name'])
def showgroup(cn):
group = get_group_by_cn(client, cn)
if group is None:
print('Group {} not found'.format(gcn))
return
print(pp(group))
@cli.register('list all groups')
def listgroups():
groups = get_all_groups(client)
for group in groups:
print(group['cn'])
@cli.register('add an user to a group',
['user identifier', 'group common name'])
def addtogroup(uid, gcn):
user = get_user_by_uid(client, uid)
group = get_group_by_cn(client, gcn)
if user is None:
print('User {} not found'.format(uid))
return
if group is None:
print('Group {} not found'.format(gcn))
return
if uid in group['members']:
print('User {} is already in group {}'.format(uid, gcn))
return
add_group_member(client, group, user)
if __name__ == '__main__':
cli.add_arg('--config', 'config.yml', 'custom configuration file')
args = cli.get_args()
config_file = args['config']
config_file, config = get_config(config_file)
setup_logging(config.get('logging', {}))
log.info("Using configuration at '{}':\n{}"
.format(config_file, pp(config)))
# TODO: check fields in config
client = phi.ldap.client.Client(**config['ldap'])
log.info('Opening LDAP client')
client.open()
log.info('Arguments: {}'.format(pp(args)))
cli.run(args)
log.info('Closing LDAP client')
client.close()

View File

@ -1,6 +1,6 @@
import pytest
import phi.ldap.client
import phi.api.app
# import phi.api.app
@pytest.fixture
@ -9,26 +9,27 @@ def ldap_client():
host='localhost', port=389,
encryption='TLSv1.2', ciphers='HIGH',
validate=False,
username='uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
password='phi',
base_dn='dc=unit,dc=macaomilano,dc=org',
attribute_id='uid', attribute_mail='mail')
# username='uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
# password='phi',
username='cn=root,dc=unit,dc=macaomilano,dc=org',
password='root',
base_dn='dc=unit,dc=macaomilano,dc=org')
client.open()
yield client
client.close()
@pytest.fixture
def api_app():
return phi.api.app.api_app({
'ldap': {
'host': 'localhost',
'port': 389,
'encryption': 'TLSv1.2',
'ciphers': 'HIGH',
'validate': 'False',
'username': 'uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
'password': 'phi',
'base_dn': 'dc=unit,dc=macaomilano,dc=org',
'attribute_id': 'uid',
'attribute_mail': 'mail'}})
# @pytest.fixture
# def api_app():
# return phi.api.app.api_app({
# 'ldap': {
# 'host': 'localhost',
# 'port': 389,
# 'encryption': 'TLSv1.2',
# 'ciphers': 'HIGH',
# 'validate': 'False',
# 'username': 'uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
# 'password': 'phi',
# 'base_dn': 'dc=unit,dc=macaomilano,dc=org',
# 'attribute_id': 'uid',
# 'attribute_mail': 'mail'}})

View File

@ -1,15 +1,15 @@
async def test_user_request_not_valid(test_client, api_app):
client = await test_client(api_app)
# async def test_user_request_not_valid(test_client, api_app):
# client = await test_client(api_app)
resp = await client.get('/user')
assert resp.status == 422
resp = None
# resp = await client.get('/user')
# assert resp.status == 422
# resp = None
resp = await client.get('/user/')
assert resp.status == 422
# resp = await client.get('/user/')
# assert resp.status == 422
async def test_user_not_found(test_client, api_app):
client = await test_client(api_app)
resp = await client.get('/user/nonexistent')
assert resp.status == 404
# async def test_user_not_found(test_client, api_app):
# client = await test_client(api_app)
# resp = await client.get('/user/nonexistent')
# assert resp.status == 404

View File

@ -1,4 +1,7 @@
from phi.ldap.user import get_user_by_uid
from phi.ldap.user import get_user_by_uid, get_all_users, \
add_user, delete_user_by_uid, delete_user
from phi.ldap.group import add_group_member, get_group_by_cn, get_all_groups
def test_connection(ldap_client):
@ -7,6 +10,90 @@ def test_connection(ldap_client):
def test_get_user_by_id(ldap_client):
entry = get_user_by_uid(ldap_client, 'conte_mascetti')
assert entry['uid'] == 'conte_mascetti'
assert entry['mail'] == 'rmascetti@autistici.org'
user = get_user_by_uid(ldap_client, 'conte_mascetti')
assert user['uid'] == 'conte_mascetti'
assert user['mail'] == 'rmascetti@autistici.org'
def test_get_all_users(ldap_client):
users = get_all_users(ldap_client)
# print(users)
assert 'conte_mascetti' in [u['uid'] for u in users]
def test_add_delete_user(ldap_client):
uid = 'rosa_rossi'
cn = 'Rosa'
sn = 'Rossi'
mail = 'foo@autistici.org'
password = 'changeme'
add_user(ldap_client, uid, cn, sn, mail, password)
user = get_user_by_uid(ldap_client, uid)
assert user['uid'] == uid
assert user['mail'] == mail
delete_user(ldap_client, user)
# print(user)
user = get_user_by_uid(ldap_client, uid)
assert user is None
def test_failing_add_user(ldap_client):
uid = 'conte_mascetti'
try:
add_user(ldap_client, uid, 'name', 'surname', 'mail', 'pass')
except: # User alrady existing
pass
else:
assert False
def test_failing_delete_user(ldap_client):
uid = 'rosa_rossi'
try:
delete_user_by_uid(ldap_client, uid)
except: # User already not existing
pass
else:
assert False
def test_get_all_groups(ldap_client):
groups = get_all_groups(ldap_client)
cns = [g['cn'] for g in groups]
assert 'WikiUsers' in cns
def test_add_to_group(ldap_client):
client = ldap_client
group_cn = 'WikiUsers'
member_uid = 'rosa_rossi'
add_user(client, member_uid, 'name', 'surname', 'mail', 'pass')
user = get_user_by_uid(client, member_uid)
# print(user)
group = get_group_by_cn(client, group_cn)
group_members = group['members']
assert len(group_members) == 1
# print(group_members)
add_group_member(client, group, user)
group = get_group_by_cn(client, group_cn)
group_members = group['members']
assert len(group_members) == 2
assert user['uid'] in group_members
# print(group_members)
# print(user)
delete_user(client, user)