# -*- encoding: utf-8 -*- from contextlib import contextmanager from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import logging import smtplib import ssl import typing as T from latecomers.helpers import retry_and_log, get_date RETRIES = 3 logger = logging.getLogger(__name__) class Notifier(object): def __init__( self, smtp_addr: T.Text, smtp_port: int, smtp_starttls: bool, smtp_user: T.Text, smtp_pass: T.Text, smtp_from: T.Optional[T.Text] = None, ) -> None: self._starttls = smtp_starttls self._addr = smtp_addr self._port = smtp_port self._user = smtp_user self._pass = smtp_pass self._from = smtp_from if smtp_from else smtp_user logger.info("%s initialized", self) @contextmanager def connect(self): context = ssl.create_default_context() if self._starttls: with smtplib.SMTP(self._addr, self._port) as conn: conn.starttls(context=context) conn.login(self._user, self._pass) yield conn else: with smtplib.SMTP_SSL(self._addr, self._port) as conn: conn.login(self._user, self._pass) yield conn def __str__(self) -> T.Text: return f"Notifier<{self._addr}:{self._port},from={self._from}>" def send(self, to: T.List[T.Text], email: T.Text) -> None: with self.connect() as conn: conn.sendmail(self._from, to, email) @retry_and_log(logger, RETRIES) def send_result(self, to: T.List[T.Text], result: bytes) -> None: date = get_date() body = f"Resoconto dei voli dal sito di AdR per l'aereoporto di Ciampino in data {date}" # noqa: E501 message = MIMEMultipart() message["From"] = self._from message["To"] = ",".join(to) message["Subject"] = f"[{date}] Resoconto CIA da AdR" message.attach(MIMEText(body, "plain")) payload = MIMEBase("application", "octet-stream") payload.set_payload(result) encoders.encode_base64(payload) payload.add_header( "Content-Disposition", f"attachment; filename={date}.xlsx", ) message.attach(payload) email = message.as_string() self.send(to, email) @retry_and_log(logger, RETRIES) def send_no_data(self, to: T.List[T.Text]) -> None: date = get_date() body = f"""Attenzione Nessun dato è stato trovato per i voli in data {date} dal sito di AdR per l'aereoporto di Ciampino. Questo è probabile indice di errore. Contattate il vostro tecnico preferito. """ message = MIMEMultipart() message["From"] = self._from message["To"] = ",".join(to) message["Subject"] = f"ATTENZIONE: [{date}] Resoconto CIA da AdR" message.attach(MIMEText(body, "plain")) email = message.as_string() self.send(to, email)