from ssl import CERT_REQUIRED, PROTOCOL_TLSv1_2 from ldap3 import Tls, Server, Connection, ASYNC from phi.logging import get_logger log = get_logger(__name__) def make_connection(host=None, port=389, encryption=None, ciphers=None, validate=False, ca_certs=None, username=None, password=None): # TLSv1.2 is supported since Python 3.4 if encryption is None: log.warning("The connection to the LDAP server will not be encrypted.") tls = None elif encryption == "TLSv1.2": log.info("The connection to the LDAP server will use TLSv1.2.") tls = Tls(version=PROTOCOL_TLSv1_2) else: raise NotImplementedError("Sorry, use TLSv1.2.") if encryption is not None and ciphers is not None: log.info("The connection to the LDAP server will use the " "following ciphers: {}".format(ciphers)) tls.ciphers = ciphers if encryption is not None and validate is True: log.info("The certificate hostname will be checked to match the " "remote hostname.") tls.validate = CERT_REQUIRED if encryption is not None and validate is True and ca_certs is not None: log.info("Using the following CA certificates: {}" .format(ca_certs)) tls.ca_certs_file = ca_certs server = Server(host=host, port=port, tls=tls) connection = Connection(server, user=username, password=password, client_strategy=ASYNC) return connection def open_connection(connection): log.info("Opening connection to LDAP server.") connection.open() if connection.server.tls is not None and connection.server.ssl is False: log.info("Issuing StartTLS command.") connection.start_tls() log.info("Issuing BIND command.") connection.bind() def close_connection(connection): log.info("Closing connection to LDAP server.") log.info("Issuing UNBIND command.") connection.unbind()