60 lines
2.0 KiB
Python
60 lines
2.0 KiB
Python
from ssl import CERT_REQUIRED, PROTOCOL_TLSv1_2
|
|
from ldap3 import Tls, Server, Connection, ASYNC
|
|
|
|
from phi.logging import get_logger
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
def make_connection(host=None, port=389,
|
|
encryption=None, ciphers=None, validate=False,
|
|
ca_certs=None, username=None, password=None):
|
|
# TLSv1.2 is supported since Python 3.4
|
|
if encryption is None:
|
|
log.warning("The connection to the LDAP server will not be encrypted.")
|
|
tls = None
|
|
elif encryption == "TLSv1.2":
|
|
log.info("The connection to the LDAP server will use TLSv1.2.")
|
|
tls = Tls(version=PROTOCOL_TLSv1_2)
|
|
else:
|
|
raise NotImplementedError("Sorry, use TLSv1.2.")
|
|
|
|
if encryption is not None and ciphers is not None:
|
|
log.info("The connection to the LDAP server will use the "
|
|
"following ciphers: {}".format(ciphers))
|
|
tls.ciphers = ciphers
|
|
|
|
if encryption is not None and validate is True:
|
|
log.info("The certificate hostname will be checked to match the "
|
|
"remote hostname.")
|
|
tls.validate = CERT_REQUIRED
|
|
|
|
if encryption is not None and validate is True and ca_certs is not None:
|
|
log.info("Using the following CA certificates: {}"
|
|
.format(ca_certs))
|
|
tls.ca_certs_file = ca_certs
|
|
|
|
server = Server(host=host, port=port, tls=tls)
|
|
connection = Connection(server, user=username, password=password,
|
|
client_strategy=ASYNC)
|
|
|
|
return connection
|
|
|
|
|
|
def open_connection(connection):
|
|
log.info("Opening connection to LDAP server.")
|
|
connection.open()
|
|
|
|
if connection.server.tls is not None and connection.server.ssl is False:
|
|
log.info("Issuing StartTLS command.")
|
|
connection.start_tls()
|
|
|
|
log.info("Issuing BIND command.")
|
|
connection.bind()
|
|
|
|
|
|
def close_connection(connection):
|
|
log.info("Closing connection to LDAP server.")
|
|
log.info("Issuing UNBIND command.")
|
|
connection.unbind()
|