232 lines
7.0 KiB
Python
232 lines
7.0 KiB
Python
# -*- encoding: utf-8 -*-
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import logging
|
|
import re
|
|
import typing as T
|
|
|
|
from latecomers.helpers import logit
|
|
|
|
from lxml import etree as et
|
|
|
|
TIME_RE = re.compile(r"\d\d?:\d\d")
|
|
AIRPORT_RE = re.compile(r"[\w\d\s\S]+")
|
|
STATUS_RE = re.compile(r"(Arrivato|In Arrivo|Schedulato|Cancellato)")
|
|
PARSER = et.HTMLParser()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def not_empty(obj: et._Element) -> bool:
|
|
if type(obj) is et._Element:
|
|
children = len(obj.xpath(".//h5"))
|
|
return children == 5 or children == 6
|
|
|
|
raise RuntimeError(f"provided argument is of unsupported type: {type(obj)}")
|
|
|
|
|
|
@logit(logger)
|
|
def find_table(html_content: T.Text) -> T.List[et._ElementTree]:
|
|
"""
|
|
Find the table that holds the data in the html response
|
|
"""
|
|
root = et.fromstring(html_content, parser=PARSER)
|
|
tbody = root.xpath("//tbody[contains(@class, 'table-data')]")
|
|
if not tbody:
|
|
return []
|
|
|
|
if len(tbody) != 1:
|
|
raise ValueError(f"Unexpected parsing result: found {len(tbody)} results")
|
|
|
|
result = [child for child in tbody[0].xpath(".//tr") if not_empty(child)]
|
|
|
|
return result
|
|
|
|
|
|
class Status(Enum):
|
|
ARRIVED = "Arrivato"
|
|
ARRIVING = "In Arrivo"
|
|
SCHEDULED = "Schedulato"
|
|
CANCELED = "Cancellato"
|
|
UNKNOWN = "Sconosciuto"
|
|
|
|
@classmethod
|
|
def from_str(cls, text: T.Text) -> "Status":
|
|
if "Arrivato" in text:
|
|
return cls.ARRIVED
|
|
elif "In Arrivo" in text:
|
|
return cls.ARRIVING
|
|
elif "Schedulato" in text:
|
|
return cls.SCHEDULED
|
|
elif "Cancellato" in text:
|
|
return cls.CANCELED
|
|
else:
|
|
return cls.UNKNOWN
|
|
|
|
|
|
@dataclass
|
|
class Details(object):
|
|
th_arrival: T.Optional[T.Text] = None
|
|
real_arrival: T.Optional[T.Text] = None
|
|
code: T.Optional[T.Text] = None
|
|
origin: T.Optional[T.Text] = None
|
|
status: Status = Status.UNKNOWN
|
|
fr24_landing_time: T.Optional[T.Text] = None
|
|
|
|
def maybe_parse_hour_th(self, h5: et._ElementTree) -> None:
|
|
"""
|
|
This function fills the fileds related to the theoric arrival hour,
|
|
if the input matches some heuristics.
|
|
"""
|
|
hour = TIME_RE.findall(h5.text)
|
|
if len(hour) == 1:
|
|
self.th_arrival = hour[0]
|
|
if "text-decoration: line-through" in h5.attrib.get("style", ""):
|
|
self.real_arrival = None
|
|
|
|
def maybe_parse_hour_real(self, h5: et._ElementTree) -> None:
|
|
"""
|
|
This function fills the fileds related to the theoric arrival hour,
|
|
if the input matches some heuristics.
|
|
"""
|
|
hour = TIME_RE.findall(h5.text)
|
|
if len(hour) == 1:
|
|
self.real_arrival = hour[0]
|
|
|
|
def maybe_parse_code(self, h5: et._ElementTree) -> None:
|
|
"""
|
|
This function fills the fileds related to the flight code,
|
|
if present and the input matches some heuristics.
|
|
"""
|
|
if "flight-numb" not in h5.attrib.get("class", ""):
|
|
return
|
|
child = h5.xpath(".//strong")
|
|
if len(child) == 1:
|
|
self.code = child[0].text.strip("\t\n ").replace(" ", "")
|
|
|
|
def maybe_parse_airport(self, h5: et._ElementTree) -> None:
|
|
"""
|
|
This function fills the field for the airport, if the input matches some
|
|
heuristics.
|
|
"""
|
|
airport = h5.text.strip("\t\n")
|
|
if len(airport) > 0 and "blue-title" in h5.attrib.get("class", ""):
|
|
self.origin = airport
|
|
|
|
def maybe_parse_status(self, h5: et._ElementTree) -> None:
|
|
"""
|
|
This function fills the filed for the status, if the input matches some
|
|
heuristics.
|
|
"""
|
|
_class = h5.attrib.get("class", "")
|
|
if "arrivato" in _class or "schedulato" in _class:
|
|
self.status = Status.from_str(h5.text)
|
|
else:
|
|
parsed = STATUS_RE.findall(h5.text)
|
|
if len(parsed) == 1:
|
|
self.status = Status.from_str(parsed[0])
|
|
|
|
def maybe_add_aux_data(self, aux_data: T.Dict[T.Text, T.Text]) -> None:
|
|
"""
|
|
This function extends the current data with auxiliary sources (currently
|
|
only FlightRadar24 data).
|
|
"""
|
|
if not self.code:
|
|
return
|
|
|
|
self.fr24_landing_time = aux_data.get(self.code)
|
|
|
|
def __str__(self) -> T.Text:
|
|
res: T.Dict[T.Text, T.Optional[T.Text]] = {}
|
|
if self.th_arrival:
|
|
res["theoric"] = self.th_arrival
|
|
if self.real_arrival:
|
|
res["real"] = self.real_arrival
|
|
if self.code:
|
|
res["code"] = self.code
|
|
res["origin"] = self.origin
|
|
res["status"] = self.status.value
|
|
if self.fr24_landing_time:
|
|
res["fr24_landing_time"] = self.fr24_landing_time
|
|
desc = ",".join([f"{k}={v}" for k, v in res.items()])
|
|
return f"Detail<{desc}>"
|
|
|
|
|
|
def get_details(
|
|
table_entry: et._ElementTree,
|
|
aux_data: T.Optional[T.Dict[T.Text, T.Text]] = None,
|
|
debug: bool = False,
|
|
) -> Details:
|
|
"""
|
|
Find the dates in a table row. If a strikenthrough time is found, it is
|
|
returned as second element in the tuple.
|
|
"""
|
|
res = table_entry.xpath(".//h5")
|
|
if len(res) > 6:
|
|
raise ValueError(f"Unexpected number of h5 found in line: {len(res)}")
|
|
|
|
if debug:
|
|
for r in res:
|
|
txt = r.text.strip("\t\n ")
|
|
print(f"[DEBUG] text={txt} attrs={r.attrib}")
|
|
|
|
d = Details()
|
|
|
|
if len(res) == 5:
|
|
d.maybe_parse_hour_th(res[0])
|
|
d.maybe_parse_code(res[1])
|
|
d.maybe_parse_airport(res[2])
|
|
d.maybe_parse_status(res[3])
|
|
elif len(res) == 6:
|
|
d.maybe_parse_hour_th(res[0])
|
|
d.maybe_parse_hour_real(res[1])
|
|
d.maybe_parse_code(res[2])
|
|
d.maybe_parse_airport(res[3])
|
|
d.maybe_parse_status(res[4])
|
|
|
|
if aux_data:
|
|
d.maybe_add_aux_data(aux_data)
|
|
|
|
return d
|
|
|
|
|
|
def parse_fr24(
|
|
data: T.Optional[T.Dict[T.Text, T.Any]]
|
|
) -> T.Optional[T.Dict[T.Text, T.Text]]:
|
|
"""
|
|
This function parses the given FlightRadar24 data into a pandas DataFrame.
|
|
"""
|
|
logger.debug("fr24 raw data: %s", data)
|
|
|
|
if not data:
|
|
return None
|
|
|
|
try:
|
|
results = {}
|
|
for flight in data["result"]["response"]["airport"]["pluginData"]["schedule"][
|
|
"arrivals"
|
|
][
|
|
"data"
|
|
]: # noqa: E501
|
|
try:
|
|
id_num = flight["flight"]["identification"]["number"]
|
|
if _code := id_num.get("default"):
|
|
code = _code
|
|
elif _code := id_num.get("alternative"):
|
|
code = _code
|
|
else:
|
|
# skip if no flight code found
|
|
continue
|
|
ts = flight["flight"]["time"]["real"]["arrival"]
|
|
real_arrival = datetime.fromtimestamp(ts).strftime("%H:%M")
|
|
results[code] = real_arrival
|
|
logger.debug(f"{code} -> {real_arrival}")
|
|
except: # noqa: E722
|
|
continue
|
|
|
|
return results
|
|
|
|
except: # noqa: E722
|
|
return None
|