latecomers/latecomers/notifier.py

117 lines
3.4 KiB
Python

# -*- encoding: utf-8 -*-
from contextlib import contextmanager
from datetime import datetime
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import logging
import smtplib
import ssl
import typing as T
from latecomers.helpers import retry_and_log, get_date
RETRIES = 3
logger = logging.getLogger(__name__)
class Notifier(object):
def __init__(
self,
smtp_addr: T.Text,
smtp_port: int,
smtp_starttls: bool,
smtp_user: T.Text,
smtp_pass: T.Text,
smtp_from: T.Optional[T.Text] = None,
) -> None:
self._starttls = smtp_starttls
self._addr = smtp_addr
self._port = smtp_port
self._user = smtp_user
self._pass = smtp_pass
self._from = smtp_from if smtp_from else smtp_user
logger.info("%s initialized", self)
@contextmanager
def connect(self):
context = ssl.create_default_context()
if self._starttls:
with smtplib.SMTP(self._addr, self._port) as conn:
conn.starttls(context=context)
conn.login(self._user, self._pass)
yield conn
else:
with smtplib.SMTP_SSL(self._addr, self._port) as conn:
conn.login(self._user, self._pass)
yield conn
def __str__(self) -> T.Text:
return f"Notifier<{self._addr}:{self._port},from={self._from}>"
def send(self, to: T.List[T.Text], email: T.Text) -> None:
with self.connect() as conn:
conn.sendmail(self._from, to, email)
@retry_and_log(logger, RETRIES)
def send_result(
self, to: T.List[T.Text], cc: T.List[T.Text], result: bytes
) -> None:
date = get_date()
body = f"Resoconto dei voli dal sito di AdR per l'aereoporto di Ciampino in data {date}" # noqa: E501
message = MIMEMultipart()
message["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
message["From"] = self._from
message["To"] = ",".join(to)
if cc:
message["Cc"] = ",".join(cc)
message["Subject"] = f"[{date}] Resoconto CIA da AdR"
message.attach(MIMEText(body, "plain"))
payload = MIMEBase("application", "octet-stream")
payload.set_payload(result)
encoders.encode_base64(payload)
payload.add_header(
"Content-Disposition",
f"attachment; filename={date}.xlsx",
)
message.attach(payload)
email = message.as_string()
rcpt = to
rcpt.extend(cc)
self.send(rcpt, email)
@retry_and_log(logger, RETRIES)
def send_no_data(self, to: T.List[T.Text], cc: T.List[T.Text]) -> None:
date = get_date()
body = f"""Attenzione
Nessun dato è stato trovato per i voli in data {date} dal sito di AdR per
l'aereoporto di Ciampino. Questo è probabile indice di errore. Contattate
il vostro tecnico preferito.
"""
message = MIMEMultipart()
message["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
message["From"] = self._from
message["To"] = ",".join(to)
if cc:
message["Cc"] = ",".join(cc)
message["Subject"] = f"ATTENZIONE: [{date}] Resoconto CIA da AdR"
message.attach(MIMEText(body, "plain"))
email = message.as_string()
rcpt = to
rcpt.extend(cc)
self.send(rcpt, email)