Compare commits
No commits in common. "master" and "make-things-work-brutally" have entirely different histories.
master
...
make-thing
47
README.md
47
README.md
|
@ -9,50 +9,3 @@ APIs for the Unit hacklab.
|
|||
Requirements:
|
||||
|
||||
* Python >= 3.5
|
||||
|
||||
|
||||
Create a virtual environment and activate it (optional):
|
||||
```
|
||||
virtualenv --python=/usr/bin/python3 env
|
||||
source env/bin/activate
|
||||
```
|
||||
|
||||
Run the setup:
|
||||
```
|
||||
python setup.py install
|
||||
```
|
||||
|
||||
## Setup
|
||||
|
||||
In the ldap section of `config.yml` change host, port and password according to
|
||||
your setup.
|
||||
|
||||
|
||||
## Command Line
|
||||
|
||||
```
|
||||
usage: phicli [-h] [--config config.yml]
|
||||
{showuser,adduser,deluser,showgroup,listgroups,addtogroup} ...
|
||||
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
--config config.yml custom configuration file
|
||||
|
||||
actions:
|
||||
showuser dispaly user fields
|
||||
adduser add a new user
|
||||
deluser delete an user
|
||||
showgroup show a group
|
||||
listgroups list all groups
|
||||
addtogroup add an user to a group
|
||||
```
|
||||
|
||||
```
|
||||
phicli showuser [-h] user_id
|
||||
phicli adduser [-h] user_id
|
||||
phicli deluser [-h] user_id
|
||||
|
||||
phicli showgroup [-h] common_name
|
||||
phicli listgroups [-h]
|
||||
phicli addtogroup [-h] user_id group_common_name
|
||||
```
|
||||
|
|
12
config.yml
12
config.yml
|
@ -14,12 +14,12 @@ ldap:
|
|||
validate: True # Can either be True or False. Default: False
|
||||
ca_certs: openldap/cert.pem
|
||||
|
||||
# username: uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org
|
||||
# password: phi
|
||||
username: cn=root,dc=unit,dc=macaomilano,dc=org
|
||||
password: root
|
||||
username: uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org
|
||||
password: phi
|
||||
|
||||
base_dn: dc=unit,dc=macaomilano,dc=org
|
||||
attribute_id: uid
|
||||
attribute_mail: mail
|
||||
|
||||
|
||||
logging:
|
||||
|
@ -40,10 +40,10 @@ logging:
|
|||
|
||||
loggers:
|
||||
phi:
|
||||
level: WARNING
|
||||
level: DEBUG
|
||||
handlers: [console, file]
|
||||
aiohttp:
|
||||
level: WARNING
|
||||
level: DEBUG
|
||||
handlers: [console, file]
|
||||
ldap3:
|
||||
level: WARNING
|
||||
|
|
|
@ -38,9 +38,3 @@ sn: Mascetti
|
|||
mail: rmascetti@autistici.org
|
||||
uid: conte_mascetti
|
||||
userPassword: {SHA}oLY7P6V+DWaMJhix7vbMYGIfA+E=
|
||||
|
||||
dn: cn=WikiUsers,ou=Groups,dc=unit,dc=macaomilano,dc=org
|
||||
objectClass: groupOfNames
|
||||
objectClass: top
|
||||
cn: WikiUsers
|
||||
member: uid=conte_mascetti,ou=Hackers,dc=unit,dc=macaomilano,dc=org
|
||||
|
|
2
setup.py
2
setup.py
|
@ -14,7 +14,7 @@ setup(
|
|||
|
||||
package_dir={'': 'src'},
|
||||
packages=['phi', 'phi.api', 'phi.ldap'],
|
||||
scripts=['src/phid', 'src/phicli'],
|
||||
scripts=['src/phid'],
|
||||
|
||||
setup_requires=['pytest-runner'],
|
||||
install_requires=['pyYAML', 'ldap3'],
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
import sys
|
||||
import argparse
|
||||
import inspect
|
||||
from phi.logging import setup_logging, get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
subparses = parser.add_subparsers(title='actions', dest='action')
|
||||
|
||||
cli_callbacks = {}
|
||||
|
||||
|
||||
def register(action_info='', param_infos=[]):
|
||||
def decorator(action):
|
||||
# Get function name and arguments
|
||||
action_name = action.__name__
|
||||
param_names = inspect.getfullargspec(action)[0]
|
||||
|
||||
# Create subparser for specific action
|
||||
subparser = subparses.add_parser(action_name, help=action_info)
|
||||
|
||||
for i, name in enumerate(param_names):
|
||||
info = param_infos[i] if i<len(param_infos) else ''
|
||||
subparser.add_argument(dest=name, help=info)
|
||||
|
||||
# Register action
|
||||
cli_callbacks[action_name] = action, param_names
|
||||
return action
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def run(args):
|
||||
for action_name, (action, param_names) in cli_callbacks.items():
|
||||
if args['action'] == action_name:
|
||||
action(**{pname: args[pname] for pname in param_names})
|
||||
|
||||
|
||||
def add_arg(name, example, info):
|
||||
parser.add_argument(name, metavar=example, help=info)
|
||||
|
||||
|
||||
def get_args():
|
||||
namespace = parser.parse_args(sys.argv[1:])
|
||||
args = namespace.__dict__
|
||||
return args
|
|
@ -13,16 +13,12 @@ CONFIG_FILES = [os.path.join(p, CONFIG_FILE)
|
|||
for p in CONFIG_PATHS]
|
||||
|
||||
|
||||
def get_config(custom_config=None):
|
||||
def get_config():
|
||||
"""Return the path of the found configuration file and its content
|
||||
|
||||
:returns: (path, config)
|
||||
:rtype: (str, dict)
|
||||
"""
|
||||
if custom_config:
|
||||
global CONFIG_FILES
|
||||
CONFIG_FILES = [custom_config]
|
||||
|
||||
for f in CONFIG_FILES:
|
||||
try:
|
||||
with open(f, 'r') as c:
|
||||
|
@ -35,10 +31,5 @@ def get_config(custom_config=None):
|
|||
# in any of CONFIG_PATHS.
|
||||
pass
|
||||
else:
|
||||
if custom_config:
|
||||
raise FileNotFoundError('Config file {} not found.'
|
||||
.format(custom_config))
|
||||
else:
|
||||
raise FileNotFoundError("Could not find {} in any of {}."
|
||||
.format(CONFIG_FILE,
|
||||
', '.join(CONFIG_PATHS)))
|
||||
raise FileNotFoundError("Could not find {} in any of {}."
|
||||
.format(CONFIG_FILE, ', '.join(CONFIG_PATHS)))
|
||||
|
|
|
@ -14,7 +14,8 @@ class Client:
|
|||
host=None, port=389,
|
||||
encryption=None, ciphers=None, validate=False, ca_certs=None,
|
||||
username=None, password=None,
|
||||
base_dn=None):
|
||||
base_dn=None,
|
||||
attribute_id='uid', attribute_mail='mail'):
|
||||
log.info("Initializing LDAP Client.")
|
||||
|
||||
self.host = host
|
||||
|
@ -30,6 +31,9 @@ class Client:
|
|||
|
||||
self.base_dn = base_dn
|
||||
|
||||
self.attribute_id = attribute_id
|
||||
self.attribute_mail = attribute_mail
|
||||
|
||||
self.connection_lock = Lock()
|
||||
self.connection = make_connection(host=self.host, port=self.port,
|
||||
encryption=self.encryption,
|
||||
|
|
36
src/phi/ldap/entry.py
Normal file
36
src/phi/ldap/entry.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
|
||||
|
||||
from phi.logging import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def get_entry_by_uid(client, uid):
|
||||
log.info("Searching entry with identifier: {}".format(uid))
|
||||
|
||||
filter_ = "({}={})".format(client.attribute_id, uid)
|
||||
log.debug("Search filter: {}".format(filter_))
|
||||
|
||||
response_id = client.connection.search(
|
||||
client.base_dn, filter_,
|
||||
search_scope='SUBTREE',
|
||||
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES]
|
||||
)
|
||||
|
||||
response, result, request = client.connection.get_response(
|
||||
response_id, get_request=True
|
||||
)
|
||||
|
||||
log.debug("Request: {}".format(request))
|
||||
log.debug("Response: {}".format(response))
|
||||
log.debug("Result: {}".format(result))
|
||||
|
||||
if not response:
|
||||
return None
|
||||
|
||||
if response[1:]:
|
||||
log.erorr("Looking for exactly one result but server gave {}. "
|
||||
"Taking the first and ignoring the rest."
|
||||
.format(len(response)))
|
||||
|
||||
return response[0]
|
|
@ -1,61 +0,0 @@
|
|||
from ldap3 import ALL_ATTRIBUTES, MODIFY_ADD
|
||||
from phi.ldap.utils import get_response, make_group_dict
|
||||
from phi.logging import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def get_group_by_cn(client, cn):
|
||||
log.info("Searching groups with common name: {}".format(cn))
|
||||
|
||||
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
|
||||
log.debug("Search dn: {}".format(dn))
|
||||
|
||||
response_id = client.connection.search(
|
||||
dn, '(objectclass=groupOfNames)',
|
||||
search_scope='SUBTREE',
|
||||
attributes=[ALL_ATTRIBUTES]
|
||||
)
|
||||
|
||||
response = get_response(client, response_id)
|
||||
|
||||
if not response:
|
||||
return None
|
||||
|
||||
if len(response) > 1:
|
||||
log.error("Looking for exactly one result but server gave {}. "
|
||||
"Taking the first and ignoring the rest."
|
||||
.format(len(response)))
|
||||
|
||||
group = make_group_dict(client, response[0])
|
||||
return group
|
||||
|
||||
|
||||
def get_all_groups(client):
|
||||
log.info("Searching all the groups")
|
||||
dn = 'ou=Groups,{}'.format(client.base_dn)
|
||||
|
||||
log.debug("Search dn: {}".format(dn))
|
||||
|
||||
response_id = client.connection.search(
|
||||
dn, '(objectclass=groupOfNames)',
|
||||
search_scope='SUBTREE',
|
||||
attributes=[ALL_ATTRIBUTES]
|
||||
)
|
||||
|
||||
response = get_response(client, response_id)
|
||||
groups = [make_group_dict(client, entry) for entry in response]
|
||||
return groups
|
||||
|
||||
|
||||
def add_group_member(client, group, user):
|
||||
group_dn = group['dn']
|
||||
member_dn = user['dn']
|
||||
log.debug('Found adding {} to {}'.format(member_dn, group_dn))
|
||||
|
||||
response_id = client.connection.modify(
|
||||
group_dn,
|
||||
{'member': [(MODIFY_ADD, [member_dn])]}
|
||||
)
|
||||
|
||||
return get_response(client, response_id)
|
|
@ -1,77 +1,26 @@
|
|||
from ldap3 import ALL_ATTRIBUTES, HASHED_SALTED_SHA
|
||||
from ldap3.utils.hashed import hashed
|
||||
from phi.ldap.utils import get_response, make_user_dict, add_entry, delete_entry
|
||||
from phi.logging import get_logger
|
||||
from phi.ldap.entry import get_entry_by_uid
|
||||
from phi.ldap.utils import flatten_attributes
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
def user_attributes_mapping(client):
|
||||
return {
|
||||
client.attribute_id: 'uid',
|
||||
client.attribute_mail: 'mail',
|
||||
'createTimestamp': 'created_at',
|
||||
'modifyTimestamp': 'modified_at'
|
||||
}
|
||||
|
||||
|
||||
def get_user_by_uid(client, uid):
|
||||
log.info("Searching entry with identifier: {}".format(uid))
|
||||
entry = get_entry_by_uid(client, uid)
|
||||
|
||||
filter_ = "({}={})".format('uid', uid)
|
||||
log.debug("Search filter: {}".format(filter_))
|
||||
|
||||
response_id = client.connection.search(
|
||||
client.base_dn, filter_,
|
||||
search_scope='SUBTREE',
|
||||
attributes=[ALL_ATTRIBUTES]
|
||||
)
|
||||
|
||||
response = get_response(client, response_id)
|
||||
|
||||
if not response:
|
||||
if not entry:
|
||||
return None
|
||||
|
||||
if len(response) > 1:
|
||||
log.error("Looking for exactly one result but server gave {}. "
|
||||
"Taking the first and ignoring the rest."
|
||||
.format(len(response)))
|
||||
mapping = user_attributes_mapping(client)
|
||||
|
||||
return make_user_dict(client, response[0])
|
||||
user = {mapping[k]: v
|
||||
for k, v in entry['attributes'].items()
|
||||
if k in mapping.keys()}
|
||||
|
||||
|
||||
def get_all_users(client):
|
||||
log.info("Searching all the users")
|
||||
|
||||
dn = 'ou=Hackers,{}'.format(client.base_dn)
|
||||
log.debug("Search dn: {}".format(dn))
|
||||
|
||||
response_id = client.connection.search(
|
||||
dn, '(objectclass=person)',
|
||||
search_scope='SUBTREE',
|
||||
attributes=[ALL_ATTRIBUTES]
|
||||
)
|
||||
|
||||
response = get_response(client, response_id)
|
||||
|
||||
users = [make_user_dict(client, entry) for entry in response]
|
||||
return users
|
||||
|
||||
|
||||
def add_user(client, uid, cn, sn, mail, password):
|
||||
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
|
||||
hashed_password = hashed(HASHED_SALTED_SHA, password)
|
||||
|
||||
attributes={
|
||||
'objectClass': [
|
||||
'inetOrgPerson',
|
||||
'organizationalPerson',
|
||||
'person', 'top'
|
||||
],
|
||||
'cn': cn,
|
||||
'sn': sn,
|
||||
'mail': mail,
|
||||
'userPassword': hashed_password
|
||||
}
|
||||
|
||||
add_entry(client, dn, attributes)
|
||||
|
||||
|
||||
def delete_user(client, user):
|
||||
delete_entry(client, user['dn'])
|
||||
|
||||
|
||||
def delete_user_by_uid(client, uid):
|
||||
dn = 'uid={},ou=Hackers,{}'.format(uid, client.base_dn)
|
||||
delete_entry(client, dn)
|
||||
return flatten_attributes(user)
|
||||
|
|
|
@ -1,70 +1,3 @@
|
|||
import re
|
||||
from phi.logging import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def make_user_dict(client, entry):
|
||||
attributes = entry['attributes']
|
||||
|
||||
user = {}
|
||||
user['uid'] = attributes['uid'][0]
|
||||
user['dn'] = 'uid={},ou=Hackers,{}'.format(user['uid'], client.base_dn)
|
||||
user['cn'] = attributes['cn'][0]
|
||||
user['sn'] = attributes['sn'][0]
|
||||
user['mail'] = attributes['mail'][0]
|
||||
user['password'] = attributes['userPassword'][0]
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def get_uid_from_dn(client, dn):
|
||||
uid = re.search('uid=(.+?),ou=Hackers,{}'.format(client.base_dn),
|
||||
dn).group(1)
|
||||
return uid
|
||||
|
||||
|
||||
def make_group_dict(client, entry):
|
||||
attributes = entry['attributes']
|
||||
|
||||
cn = attributes['cn'][0]
|
||||
dn = 'cn={},ou=Groups,{}'.format(cn, client.base_dn)
|
||||
members = [get_uid_from_dn(client, u_dn)
|
||||
for u_dn in attributes['member']]
|
||||
|
||||
group = {}
|
||||
group['dn'] = dn
|
||||
group['cn'] = cn
|
||||
group['members'] = members
|
||||
|
||||
return group
|
||||
|
||||
|
||||
def get_response(client, response_id):
|
||||
response, result, request = client.connection.get_response(
|
||||
response_id, get_request=True
|
||||
)
|
||||
|
||||
log.debug("Request: {}".format(request))
|
||||
log.debug("Response: {}".format(response))
|
||||
log.debug("Result: {}".format(result))
|
||||
|
||||
if result['description'] is not 'success':
|
||||
raise Exception(result['description'])
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def add_entry(client, dn, attributes):
|
||||
log.info('Adding entry with distinguiscet name: {}'
|
||||
'and attributes {}'.format(dn, attributes))
|
||||
response_id = client.connection.add(dn, attributes=attributes)
|
||||
response = get_response(client, response_id)
|
||||
return response
|
||||
|
||||
|
||||
def delete_entry(client, dn):
|
||||
log.info('Deleting entry with distinguiscet name: {}')
|
||||
response_id = client.connection.delete(dn)
|
||||
response = get_response(client, response_id)
|
||||
return response
|
||||
def flatten_attributes(d):
|
||||
return {k: (v[0] if isinstance(v, list) else v)
|
||||
for k, v in d.items()}
|
||||
|
|
130
src/phicli
130
src/phicli
|
@ -1,130 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
from pprint import pformat as pp
|
||||
from getpass import getpass
|
||||
|
||||
from phi.config import get_config
|
||||
from phi.logging import setup_logging, get_logger
|
||||
from phi import cli
|
||||
import phi.ldap.client
|
||||
from phi.ldap.user import get_user_by_uid, add_user, delete_user
|
||||
from phi.ldap.group import get_group_by_cn, get_all_groups, add_group_member
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
@cli.register('dispaly user fields', ['user identifier'])
|
||||
def showuser(uid):
|
||||
user = get_user_by_uid(client, uid)
|
||||
if user is None:
|
||||
print('User {} not found'.format(uid))
|
||||
return
|
||||
|
||||
print(pp(user))
|
||||
|
||||
|
||||
@cli.register('add a new user', ['user identifier'])
|
||||
def adduser(uid):
|
||||
def ask(prompt, default):
|
||||
full_prompt = '{} [{}] '.format(prompt, default)
|
||||
return input(full_prompt) or default
|
||||
|
||||
user = get_user_by_uid(client, uid)
|
||||
if user is not None:
|
||||
print("User {} already existing".format(uid))
|
||||
return
|
||||
|
||||
cn = ask('Common name:', uid)
|
||||
sn = ask('Last name:', uid)
|
||||
mail = ask('Mail:', '{}@localhost'.format(uid))
|
||||
|
||||
password = getpass()
|
||||
pass_check = getpass('Retype password: ')
|
||||
if password != pass_check:
|
||||
print('Password not matching')
|
||||
return
|
||||
|
||||
add_user(client, uid, cn, sn, mail, password)
|
||||
|
||||
# Check
|
||||
user = get_user_by_uid(client, uid)
|
||||
print()
|
||||
print(pp(user))
|
||||
|
||||
|
||||
@cli.register('delete an user', ['user identifier'])
|
||||
def deluser(uid):
|
||||
check = input('Are you sure? [y/N] ') or 'N'
|
||||
if check.lower() != 'y':
|
||||
print('Ok then')
|
||||
return
|
||||
|
||||
user = get_user_by_uid(client, uid)
|
||||
if user is not None:
|
||||
delete_user(client, user)
|
||||
print('Done')
|
||||
else:
|
||||
print('User {} not found'.format(uid))
|
||||
|
||||
|
||||
@cli.register('show a group', ['group common name'])
|
||||
def showgroup(cn):
|
||||
group = get_group_by_cn(client, cn)
|
||||
if group is None:
|
||||
print('Group {} not found'.format(gcn))
|
||||
return
|
||||
|
||||
print(pp(group))
|
||||
|
||||
|
||||
@cli.register('list all groups')
|
||||
def listgroups():
|
||||
groups = get_all_groups(client)
|
||||
|
||||
for group in groups:
|
||||
print(group['cn'])
|
||||
|
||||
|
||||
@cli.register('add an user to a group',
|
||||
['user identifier', 'group common name'])
|
||||
def addtogroup(uid, gcn):
|
||||
user = get_user_by_uid(client, uid)
|
||||
group = get_group_by_cn(client, gcn)
|
||||
|
||||
if user is None:
|
||||
print('User {} not found'.format(uid))
|
||||
return
|
||||
|
||||
if group is None:
|
||||
print('Group {} not found'.format(gcn))
|
||||
return
|
||||
|
||||
if uid in group['members']:
|
||||
print('User {} is already in group {}'.format(uid, gcn))
|
||||
return
|
||||
|
||||
add_group_member(client, group, user)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cli.add_arg('--config', 'config.yml', 'custom configuration file')
|
||||
args = cli.get_args()
|
||||
|
||||
config_file = args['config']
|
||||
|
||||
config_file, config = get_config(config_file)
|
||||
setup_logging(config.get('logging', {}))
|
||||
log.info("Using configuration at '{}':\n{}"
|
||||
.format(config_file, pp(config)))
|
||||
|
||||
# TODO: check fields in config
|
||||
client = phi.ldap.client.Client(**config['ldap'])
|
||||
|
||||
log.info('Opening LDAP client')
|
||||
client.open()
|
||||
|
||||
log.info('Arguments: {}'.format(pp(args)))
|
||||
|
||||
cli.run(args)
|
||||
|
||||
log.info('Closing LDAP client')
|
||||
client.close()
|
|
@ -9,11 +9,10 @@ def ldap_client():
|
|||
host='localhost', port=389,
|
||||
encryption='TLSv1.2', ciphers='HIGH',
|
||||
validate=False,
|
||||
# username='uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
|
||||
# password='phi',
|
||||
username='cn=root,dc=unit,dc=macaomilano,dc=org',
|
||||
password='root',
|
||||
base_dn='dc=unit,dc=macaomilano,dc=org')
|
||||
username='uid=phi,ou=Services,dc=unit,dc=macaomilano,dc=org',
|
||||
password='phi',
|
||||
base_dn='dc=unit,dc=macaomilano,dc=org',
|
||||
attribute_id='uid', attribute_mail='mail')
|
||||
client.open()
|
||||
yield client
|
||||
client.close()
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
from phi.ldap.user import get_user_by_uid, get_all_users, \
|
||||
add_user, delete_user_by_uid, delete_user
|
||||
|
||||
from phi.ldap.group import add_group_member, get_group_by_cn, get_all_groups
|
||||
from phi.ldap.user import get_user_by_uid
|
||||
|
||||
|
||||
def test_connection(ldap_client):
|
||||
|
@ -10,90 +7,6 @@ def test_connection(ldap_client):
|
|||
|
||||
|
||||
def test_get_user_by_id(ldap_client):
|
||||
user = get_user_by_uid(ldap_client, 'conte_mascetti')
|
||||
assert user['uid'] == 'conte_mascetti'
|
||||
assert user['mail'] == 'rmascetti@autistici.org'
|
||||
|
||||
|
||||
def test_get_all_users(ldap_client):
|
||||
users = get_all_users(ldap_client)
|
||||
# print(users)
|
||||
assert 'conte_mascetti' in [u['uid'] for u in users]
|
||||
|
||||
|
||||
def test_add_delete_user(ldap_client):
|
||||
uid = 'rosa_rossi'
|
||||
cn = 'Rosa'
|
||||
sn = 'Rossi'
|
||||
mail = 'foo@autistici.org'
|
||||
password = 'changeme'
|
||||
|
||||
add_user(ldap_client, uid, cn, sn, mail, password)
|
||||
|
||||
user = get_user_by_uid(ldap_client, uid)
|
||||
assert user['uid'] == uid
|
||||
assert user['mail'] == mail
|
||||
|
||||
delete_user(ldap_client, user)
|
||||
# print(user)
|
||||
|
||||
user = get_user_by_uid(ldap_client, uid)
|
||||
assert user is None
|
||||
|
||||
|
||||
def test_failing_add_user(ldap_client):
|
||||
uid = 'conte_mascetti'
|
||||
|
||||
try:
|
||||
add_user(ldap_client, uid, 'name', 'surname', 'mail', 'pass')
|
||||
except: # User alrady existing
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
||||
def test_failing_delete_user(ldap_client):
|
||||
uid = 'rosa_rossi'
|
||||
|
||||
try:
|
||||
delete_user_by_uid(ldap_client, uid)
|
||||
except: # User already not existing
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
||||
|
||||
def test_get_all_groups(ldap_client):
|
||||
groups = get_all_groups(ldap_client)
|
||||
|
||||
cns = [g['cn'] for g in groups]
|
||||
assert 'WikiUsers' in cns
|
||||
|
||||
|
||||
def test_add_to_group(ldap_client):
|
||||
client = ldap_client
|
||||
|
||||
group_cn = 'WikiUsers'
|
||||
member_uid = 'rosa_rossi'
|
||||
add_user(client, member_uid, 'name', 'surname', 'mail', 'pass')
|
||||
|
||||
user = get_user_by_uid(client, member_uid)
|
||||
# print(user)
|
||||
|
||||
group = get_group_by_cn(client, group_cn)
|
||||
group_members = group['members']
|
||||
|
||||
assert len(group_members) == 1
|
||||
# print(group_members)
|
||||
|
||||
add_group_member(client, group, user)
|
||||
|
||||
group = get_group_by_cn(client, group_cn)
|
||||
group_members = group['members']
|
||||
|
||||
assert len(group_members) == 2
|
||||
assert user['uid'] in group_members
|
||||
|
||||
# print(group_members)
|
||||
# print(user)
|
||||
|
||||
delete_user(client, user)
|
||||
entry = get_user_by_uid(ldap_client, 'conte_mascetti')
|
||||
assert entry['uid'] == 'conte_mascetti'
|
||||
assert entry['mail'] == 'rmascetti@autistici.org'
|
||||
|
|
Loading…
Reference in New Issue
Block a user