# -*- encoding: utf-8 -*- from dataclasses import dataclass from datetime import datetime from enum import Enum import logging import re import typing as T from latecomers.helpers import logit from lxml import etree as et TIME_RE = re.compile(r"\d\d?:\d\d") AIRPORT_RE = re.compile(r"[\w\d\s\S]+") STATUS_RE = re.compile(r"(Arrivato|In Arrivo|Schedulato|Cancellato)") PARSER = et.HTMLParser() logger = logging.getLogger(__name__) def not_empty(obj: et._Element) -> bool: if type(obj) is et._Element: if "lfr-template" in obj.attrib.get("class"): return False children = len(obj.xpath(".//td")) return children in (5, 6) raise RuntimeError(f"provided argument is of unsupported type: {type(obj)}") @logit(logger) def count_pages(html_content: T.Text) -> int: """ Count how many pages there are to be accessed """ root = et.fromstring(html_content, parser=PARSER) il_items = root.xpath( "//div[contains(@data-qa-id, 'paginator')]/ul[contains(@class, 'pagination')]/li" ) if not il_items: return 1 return len(il_items) - 2 @logit(logger) def find_table(html_content: T.Text) -> T.List[et._ElementTree]: """ Find the table that holds the data in the html response """ root = et.fromstring(html_content, parser=PARSER) tbody = root.xpath("//tbody") if not tbody: return [] if len(tbody) != 1: raise ValueError(f"Unexpected parsing result: found {len(tbody)} results") result = [ child for child in tbody[0].xpath(".//tr[contains(@data-qa-id, 'row')]") if not_empty(child) ] return result class Status(Enum): ARRIVED = "Arrivato" ARRIVING = "In Arrivo" SCHEDULED = "Schedulato" CANCELED = "Cancellato" UNKNOWN = "Sconosciuto" @classmethod def from_str(cls, text: T.Text) -> "Status": if "Arrivato" in text: return cls.ARRIVED elif "In Arrivo" in text: return cls.ARRIVING elif "Schedulato" in text: return cls.SCHEDULED elif "Cancellato" in text: return cls.CANCELED else: return cls.UNKNOWN @dataclass class Details(object): th_arrival: T.Optional[T.Text] = None real_arrival: T.Optional[T.Text] = None code: T.Optional[T.Text] = None origin: T.Optional[T.Text] = None status: Status = Status.UNKNOWN fr24_landing_time: T.Optional[T.Text] = None def __init__(self, row: et._ElementTree) -> None: self.row = row def maybe_parse_hour_th(self) -> None: """ This function fills the fileds related to the theoric arrival hour, if the input matches some heuristics. """ hour = self.row.xpath(".//span[contains(@class, 'date-estimated__time')]") if len(hour) != 1: logger.debug("Cannot parse estimated time") return self.th_arrival = hour[0].text def maybe_parse_hour_real(self) -> None: """ This function fills the fileds related to the theoric arrival hour, if the input matches some heuristics. """ hour = self.row.xpath(".//span[contains(@class, 'date-actual__time')]") if len(hour) != 1: logger.debug("Cannot parse actual time") return self.real_arrival = hour[0].text def maybe_parse_code(self) -> None: """ This function fills the fileds related to the flight code, if present and the input matches some heuristics. """ code = self.row.xpath( ".//td[contains(@class, 'lfr-departure-column-column')]//div/a/strong" ) if len(code) != 1: logger.debug("Cannot parse code") return self.code = code[0].text.strip("\t\n ").replace(" ", "") def maybe_parse_airport(self) -> None: """ This function fills the field for the airport, if the input matches some heuristics. """ airport = self.row.xpath( ".//td[contains(@class, 'lfr-flight-departure-column')]/h5" ) if len(airport) != 1: logger.debug("Cannot parse airport") return self.origin = airport[0].text.strip("\t\n") def maybe_parse_status(self) -> None: """ This function fills the filed for the status, if the input matches some heuristics. """ status = self.row.xpath( ".//td[contains(@class, 'lfr-flight-status-column')]/h5" ) if len(status) != 1: logger.debug("Cannot parse status") return _class = status[0].attrib.get("class", "") if "arrivato" in _class or "schedulato" in _class: self.status = Status.from_str(status[0].text) else: parsed = STATUS_RE.findall(status[0].text) if len(parsed) == 1: self.status = Status.from_str(parsed[0]) def maybe_add_aux_data(self, aux_data: T.Dict[T.Text, T.Text]) -> None: """ This function extends the current data with auxiliary sources (currently only FlightRadar24 data). """ if not self.code: logger.debug("Cannot add aux data: missing code") return self.fr24_landing_time = aux_data.get(self.code) def __str__(self) -> T.Text: res: T.Dict[T.Text, T.Optional[T.Text]] = {} if self.th_arrival: res["theoric"] = self.th_arrival if self.real_arrival: res["real"] = self.real_arrival if self.code: res["code"] = self.code res["origin"] = self.origin res["status"] = self.status.value if self.fr24_landing_time: res["fr24_landing_time"] = self.fr24_landing_time desc = ",".join([f"{k}={v}" for k, v in res.items()]) return f"Detail<{desc}>" def get_details( table_entry: et._ElementTree, aux_data: T.Optional[T.Dict[T.Text, T.Text]] = None, ) -> Details: """ Find the dates in a table row. If a strikenthrough time is found, it is returned as second element in the tuple. """ d = Details(table_entry) d.maybe_parse_hour_th() d.maybe_parse_hour_real() d.maybe_parse_code() d.maybe_parse_airport() d.maybe_parse_status() if aux_data: d.maybe_add_aux_data(aux_data) return d def parse_fr24( data: T.Optional[T.Dict[T.Text, T.Any]], ) -> T.Optional[T.Dict[T.Text, T.Text]]: """ This function parses the given FlightRadar24 data into a pandas DataFrame. """ logger.debug("fr24 raw data: %s", data) if not data: return None try: results = {} for flight in data["result"]["response"]["airport"]["pluginData"]["schedule"][ "arrivals" ]["data"]: # noqa: E501 try: id_num = flight["flight"]["identification"]["number"] if _code := id_num.get("default"): code = _code elif _code := id_num.get("alternative"): code = _code else: # skip if no flight code found continue ts = flight["flight"]["time"]["real"]["arrival"] real_arrival = datetime.fromtimestamp(ts).strftime("%H:%M") results[code] = real_arrival logger.debug(f"{code} -> {real_arrival}") except: # noqa: E722 continue return results except: # noqa: E722 return None