70 lines
2.0 KiB
Python
70 lines
2.0 KiB
Python
from ssl import CERT_REQUIRED, PROTOCOL_TLSv1_2
|
|
from ldap3 import Tls, Server, Connection, ASYNC
|
|
|
|
from phi.logging import get_logger
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
def make_connection(
|
|
host=None,
|
|
port=389,
|
|
encryption=None,
|
|
ciphers=None,
|
|
validate=False,
|
|
ca_certs=None,
|
|
username=None,
|
|
password=None,
|
|
):
|
|
# TLSv1.2 is supported since Python 3.4
|
|
if encryption is None:
|
|
log.warning("The connection to the LDAP server will not be encrypted.")
|
|
tls = None
|
|
elif encryption == "TLSv1.2":
|
|
log.info("The connection to the LDAP server will use TLSv1.2.")
|
|
tls = Tls(version=PROTOCOL_TLSv1_2)
|
|
else:
|
|
raise NotImplementedError("Sorry, use TLSv1.2.")
|
|
|
|
if encryption is not None and ciphers is not None:
|
|
log.info(
|
|
"The connection to the LDAP server will use the "
|
|
"following ciphers: {}".format(ciphers)
|
|
)
|
|
tls.ciphers = ciphers
|
|
|
|
if encryption is not None and validate is True:
|
|
log.info(
|
|
"The certificate hostname will be checked to match the " "remote hostname."
|
|
)
|
|
tls.validate = CERT_REQUIRED
|
|
|
|
if encryption is not None and validate is True and ca_certs is not None:
|
|
log.info("Using the following CA certificates: {}".format(ca_certs))
|
|
tls.ca_certs_file = ca_certs
|
|
|
|
server = Server(host=host, port=port, tls=tls)
|
|
connection = Connection(
|
|
server, user=username, password=password, client_strategy=ASYNC
|
|
)
|
|
|
|
return connection
|
|
|
|
|
|
def open_connection(connection):
|
|
log.info("Opening connection to LDAP server.")
|
|
connection.open()
|
|
|
|
if connection.server.tls is not None and connection.server.ssl is False:
|
|
log.info("Issuing StartTLS command.")
|
|
connection.start_tls()
|
|
|
|
log.info("Issuing BIND command.")
|
|
connection.bind()
|
|
|
|
|
|
def close_connection(connection):
|
|
log.info("Closing connection to LDAP server.")
|
|
log.info("Issuing UNBIND command.")
|
|
connection.unbind()
|