latecomers/latecomers/notifier.py

117 lines
3.4 KiB
Python
Raw Permalink Normal View History

2022-08-24 11:13:12 +02:00
# -*- encoding: utf-8 -*-
2022-08-25 13:11:53 +02:00
from contextlib import contextmanager
from datetime import datetime
2022-08-24 11:13:12 +02:00
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import logging
import smtplib
import ssl
import typing as T
2022-09-18 23:29:02 +02:00
from latecomers.helpers import retry_and_log, get_date
2022-08-24 11:13:12 +02:00
2022-08-24 12:42:23 +02:00
RETRIES = 3
logger = logging.getLogger(__name__)
2022-08-24 11:13:12 +02:00
class Notifier(object):
def __init__(
self,
smtp_addr: T.Text,
smtp_port: int,
smtp_starttls: bool,
smtp_user: T.Text,
smtp_pass: T.Text,
smtp_from: T.Optional[T.Text] = None,
) -> None:
2022-08-25 13:11:53 +02:00
self._starttls = smtp_starttls
2022-08-24 11:13:12 +02:00
self._addr = smtp_addr
self._port = smtp_port
self._user = smtp_user
self._pass = smtp_pass
self._from = smtp_from if smtp_from else smtp_user
logger.info("%s initialized", self)
2022-08-25 13:11:53 +02:00
@contextmanager
def connect(self):
context = ssl.create_default_context()
if self._starttls:
with smtplib.SMTP(self._addr, self._port) as conn:
conn.starttls(context=context)
conn.login(self._user, self._pass)
yield conn
else:
with smtplib.SMTP_SSL(self._addr, self._port) as conn:
conn.login(self._user, self._pass)
yield conn
2022-08-24 11:13:12 +02:00
def __str__(self) -> T.Text:
return f"Notifier<{self._addr}:{self._port},from={self._from}>"
def send(self, to: T.List[T.Text], email: T.Text) -> None:
2022-08-25 13:11:53 +02:00
with self.connect() as conn:
conn.sendmail(self._from, to, email)
2022-08-24 11:13:12 +02:00
2022-08-24 12:42:23 +02:00
@retry_and_log(logger, RETRIES)
2022-09-20 22:29:59 +02:00
def send_result(
self, to: T.List[T.Text], cc: T.List[T.Text], result: bytes
) -> None:
2022-09-18 23:29:02 +02:00
date = get_date()
2022-08-24 11:13:12 +02:00
body = f"Resoconto dei voli dal sito di AdR per l'aereoporto di Ciampino in data {date}" # noqa: E501
message = MIMEMultipart()
message["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
2022-08-24 11:13:12 +02:00
message["From"] = self._from
message["To"] = ",".join(to)
2022-09-20 22:29:59 +02:00
if cc:
message["Cc"] = ",".join(cc)
2022-08-24 11:13:12 +02:00
message["Subject"] = f"[{date}] Resoconto CIA da AdR"
message.attach(MIMEText(body, "plain"))
payload = MIMEBase("application", "octet-stream")
payload.set_payload(result)
encoders.encode_base64(payload)
payload.add_header(
"Content-Disposition",
f"attachment; filename={date}.xlsx",
)
message.attach(payload)
email = message.as_string()
2022-09-27 23:15:50 +02:00
rcpt = to
rcpt.extend(cc)
self.send(rcpt, email)
2022-08-24 12:42:23 +02:00
@retry_and_log(logger, RETRIES)
2022-09-20 22:29:59 +02:00
def send_no_data(self, to: T.List[T.Text], cc: T.List[T.Text]) -> None:
2022-09-18 23:29:02 +02:00
date = get_date()
2022-08-24 12:42:23 +02:00
body = f"""Attenzione
Nessun dato è stato trovato per i voli in data {date} dal sito di AdR per
l'aereoporto di Ciampino. Questo è probabile indice di errore. Contattate
il vostro tecnico preferito.
"""
message = MIMEMultipart()
message["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
2022-08-24 12:42:23 +02:00
message["From"] = self._from
message["To"] = ",".join(to)
2022-09-20 22:29:59 +02:00
if cc:
message["Cc"] = ",".join(cc)
2022-08-24 12:42:23 +02:00
message["Subject"] = f"ATTENZIONE: [{date}] Resoconto CIA da AdR"
message.attach(MIMEText(body, "plain"))
email = message.as_string()
2022-09-27 23:15:50 +02:00
rcpt = to
rcpt.extend(cc)
self.send(rcpt, email)