133 lines
3.2 KiB
Python
133 lines
3.2 KiB
Python
|
from urllib.parse import urlparse
|
||
|
|
||
|
from bonsai import LDAPClient
|
||
|
|
||
|
from phi.logging import get_logger
|
||
|
|
||
|
|
||
|
log = get_logger(__name__)
|
||
|
|
||
|
|
||
|
def parse_host(host):
|
||
|
"""
|
||
|
Helper function to decompose the host in the address
|
||
|
and the (optional) protocol and the (optional) port.
|
||
|
If missing, protocol defaults to "ldap" and port to 389,
|
||
|
in case protocol is missing or is "ldap", or 636, in case
|
||
|
protocol is "ldaps".
|
||
|
"""
|
||
|
if "://" not in host:
|
||
|
host = f"//{host}"
|
||
|
|
||
|
p = urlparse(host)
|
||
|
if p.scheme is not None and p.scheme != "":
|
||
|
proto = p.scheme
|
||
|
else:
|
||
|
proto = "ldap"
|
||
|
|
||
|
if p.port is not None:
|
||
|
port = p.port
|
||
|
else:
|
||
|
port = None
|
||
|
|
||
|
if port is not None:
|
||
|
addr = p.netloc.split(":")[0]
|
||
|
else:
|
||
|
addr = p.netloc
|
||
|
if proto == "ldap":
|
||
|
port = 389
|
||
|
elif proto == "ldaps":
|
||
|
port = 636
|
||
|
|
||
|
return proto, addr, port
|
||
|
|
||
|
|
||
|
def checked_port(provided, auto):
|
||
|
"""
|
||
|
Check consistency of ports given via the connection string
|
||
|
and the explicit parameter.
|
||
|
"""
|
||
|
_provided = provided is not None
|
||
|
|
||
|
if _provided and provided != auto:
|
||
|
log.warning(
|
||
|
"Explicitly provided port ({}) does not match "
|
||
|
"the automatically provided one ({}). The former prevails.".format(
|
||
|
provided, auto
|
||
|
)
|
||
|
)
|
||
|
return provided
|
||
|
|
||
|
if _provided:
|
||
|
return provided
|
||
|
|
||
|
return auto
|
||
|
|
||
|
|
||
|
def compose_dn_username(username, base_dn, ou=None, attribute_id=None):
|
||
|
"""
|
||
|
Output the distinguished name of the user to use as login.
|
||
|
"""
|
||
|
if base_dn in username:
|
||
|
return username
|
||
|
|
||
|
if ou is None:
|
||
|
return f"{attribute_id}={username},{base_dn}"
|
||
|
|
||
|
return f"{attribute_id}={username},ou={ou},{base_dn}"
|
||
|
|
||
|
|
||
|
class AsyncClient(LDAPClient):
|
||
|
"""
|
||
|
Wrapper around LDAPClient.
|
||
|
"""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
host=None,
|
||
|
port=None,
|
||
|
encryption=None,
|
||
|
ciphers=None,
|
||
|
validate=False,
|
||
|
ca_cert=None,
|
||
|
username=None,
|
||
|
password=None,
|
||
|
base_dn=None,
|
||
|
attribute_id="uid",
|
||
|
ou=None,
|
||
|
method="SIMPLE",
|
||
|
):
|
||
|
self.proto, self.host, _port = parse_host(host)
|
||
|
self.port = checked_port(port, _port)
|
||
|
self.full_uri = "{}://{}:{}".format(self.proto, self.host, self.port)
|
||
|
self.base_dn = base_dn
|
||
|
|
||
|
if encryption:
|
||
|
self._tls = True
|
||
|
else:
|
||
|
if self.proto == "ldaps":
|
||
|
raise ValueError(
|
||
|
'Incompatible provided protocol ("%s") and encryption configuration: TLS=%s',
|
||
|
self.proto,
|
||
|
encryption,
|
||
|
)
|
||
|
self._tls = False
|
||
|
|
||
|
super().__init__(self.full_uri, self._tls)
|
||
|
log.info(
|
||
|
"Connected at %s (TLS -> %s)", self.full_uri, "ON" if self.tls else "OFF"
|
||
|
)
|
||
|
|
||
|
if not validate:
|
||
|
self.set_cert_policy("never")
|
||
|
|
||
|
if ca_cert is not None:
|
||
|
self.set_ca_cert(ca_cert)
|
||
|
|
||
|
self.username = compose_dn_username(username, self.base_dn, ou, attribute_id)
|
||
|
self.password = password
|
||
|
self.method = method
|
||
|
|
||
|
self.set_auto_page_acquire(True)
|
||
|
self.set_credentials(self.method, user=self.username, password=self.password)
|