latecomers/latecomers/notifier.py

99 lines
2.9 KiB
Python

# -*- encoding: utf-8 -*-
from datetime import datetime
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import logging
import smtplib
import ssl
import typing as T
from latecomers.helpers import retry_and_log
RETRIES = 3
logger = logging.getLogger(__name__)
class Notifier(object):
def __init__(
self,
smtp_addr: T.Text,
smtp_port: int,
smtp_starttls: bool,
smtp_user: T.Text,
smtp_pass: T.Text,
smtp_from: T.Optional[T.Text] = None,
) -> None:
context = ssl.create_default_context()
if smtp_starttls:
self._conn = smtplib.SMTP_SSL(smtp_addr, smtp_port)
self._greet = self._conn.ehlo
else:
self._conn = smtplib.SMTP(smtp_addr, smtp_port)
self._greet = lambda: self._conn.starttls(context=context)
self._addr = smtp_addr
self._port = smtp_port
self._user = smtp_user
self._pass = smtp_pass
self._from = smtp_from if smtp_from else smtp_user
logger.info("%s initialized", self)
def __str__(self) -> T.Text:
return f"Notifier<{self._addr}:{self._port},from={self._from}>"
def send(self, to: T.List[T.Text], email: T.Text) -> None:
self._greet()
self._conn.login(self._user, self._pass)
self._conn.sendmail(self._from, to, email)
self._conn.close()
@retry_and_log(logger, RETRIES)
def send_result(self, to: T.List[T.Text], result: bytes) -> None:
date = datetime.now().strftime("%Y-%m-%d")
body = f"Resoconto dei voli dal sito di AdR per l'aereoporto di Ciampino in data {date}" # noqa: E501
message = MIMEMultipart()
message["From"] = self._from
message["To"] = ",".join(to)
message["Subject"] = f"[{date}] Resoconto CIA da AdR"
message.attach(MIMEText(body, "plain"))
payload = MIMEBase("application", "octet-stream")
payload.set_payload(result)
encoders.encode_base64(payload)
payload.add_header(
"Content-Disposition",
f"attachment; filename={date}.xlsx",
)
message.attach(payload)
email = message.as_string()
self.send(to, email)
@retry_and_log(logger, RETRIES)
def send_no_data(self, to: T.List[T.Text]) -> None:
date = datetime.now().strftime("%Y-%m-%d")
body = f"""Attenzione
Nessun dato è stato trovato per i voli in data {date} dal sito di AdR per
l'aereoporto di Ciampino. Questo è probabile indice di errore. Contattate
il vostro tecnico preferito.
"""
message = MIMEMultipart()
message["From"] = self._from
message["To"] = ",".join(to)
message["Subject"] = f"ATTENZIONE: [{date}] Resoconto CIA da AdR"
message.attach(MIMEText(body, "plain"))
email = message.as_string()
self.send(to, email)