latecomers/latecomers/parse.py
2022-09-18 23:29:49 +02:00

232 lines
7.0 KiB
Python

# -*- encoding: utf-8 -*-
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import logging
import re
import typing as T
from latecomers.helpers import logit
from lxml import etree as et
TIME_RE = re.compile(r"\d\d?:\d\d")
AIRPORT_RE = re.compile(r"[\w\d\s\S]+")
STATUS_RE = re.compile(r"(Arrivato|In Arrivo|Schedulato|Cancellato)")
PARSER = et.HTMLParser()
logger = logging.getLogger(__name__)
def not_empty(obj: et._Element) -> bool:
if type(obj) is et._Element:
children = len(obj.xpath(".//h5"))
return children == 5 or children == 6
raise RuntimeError(f"provided argument is of unsupported type: {type(obj)}")
@logit(logger)
def find_table(html_content: T.Text) -> T.List[et._ElementTree]:
"""
Find the table that holds the data in the html response
"""
root = et.fromstring(html_content, parser=PARSER)
tbody = root.xpath("//tbody[contains(@class, 'table-data')]")
if not tbody:
return []
if len(tbody) != 1:
raise ValueError(f"Unexpected parsing result: found {len(tbody)} results")
result = [child for child in tbody[0].xpath(".//tr") if not_empty(child)]
return result
class Status(Enum):
ARRIVED = "Arrivato"
ARRIVING = "In Arrivo"
SCHEDULED = "Schedulato"
CANCELED = "Cancellato"
UNKNOWN = "Sconosciuto"
@classmethod
def from_str(cls, text: T.Text) -> "Status":
if "Arrivato" in text:
return cls.ARRIVED
elif "In Arrivo" in text:
return cls.ARRIVING
elif "Schedulato" in text:
return cls.SCHEDULED
elif "Cancellato" in text:
return cls.CANCELED
else:
return cls.UNKNOWN
@dataclass
class Details(object):
th_arrival: T.Optional[T.Text] = None
real_arrival: T.Optional[T.Text] = None
code: T.Optional[T.Text] = None
origin: T.Optional[T.Text] = None
status: Status = Status.UNKNOWN
fr24_landing_time: T.Optional[T.Text] = None
def maybe_parse_hour_th(self, h5: et._ElementTree) -> None:
"""
This function fills the fileds related to the theoric arrival hour,
if the input matches some heuristics.
"""
hour = TIME_RE.findall(h5.text)
if len(hour) == 1:
self.th_arrival = hour[0]
if "text-decoration: line-through" in h5.attrib.get("style", ""):
self.real_arrival = None
def maybe_parse_hour_real(self, h5: et._ElementTree) -> None:
"""
This function fills the fileds related to the theoric arrival hour,
if the input matches some heuristics.
"""
hour = TIME_RE.findall(h5.text)
if len(hour) == 1:
self.real_arrival = hour[0]
def maybe_parse_code(self, h5: et._ElementTree) -> None:
"""
This function fills the fileds related to the flight code,
if present and the input matches some heuristics.
"""
if "flight-numb" not in h5.attrib.get("class", ""):
return
child = h5.xpath(".//strong")
if len(child) == 1:
self.code = child[0].text.strip("\t\n ").replace(" ", "")
def maybe_parse_airport(self, h5: et._ElementTree) -> None:
"""
This function fills the field for the airport, if the input matches some
heuristics.
"""
airport = h5.text.strip("\t\n")
if len(airport) > 0 and "blue-title" in h5.attrib.get("class", ""):
self.origin = airport
def maybe_parse_status(self, h5: et._ElementTree) -> None:
"""
This function fills the filed for the status, if the input matches some
heuristics.
"""
_class = h5.attrib.get("class", "")
if "arrivato" in _class or "schedulato" in _class:
self.status = Status.from_str(h5.text)
else:
parsed = STATUS_RE.findall(h5.text)
if len(parsed) == 1:
self.status = Status.from_str(parsed[0])
def maybe_add_aux_data(self, aux_data: T.Dict[T.Text, T.Text]) -> None:
"""
This function extends the current data with auxiliary sources (currently
only FlightRadar24 data).
"""
if not self.code:
return
self.fr24_landing_time = aux_data.get(self.code)
def __str__(self) -> T.Text:
res: T.Dict[T.Text, T.Optional[T.Text]] = {}
if self.th_arrival:
res["theoric"] = self.th_arrival
if self.real_arrival:
res["real"] = self.real_arrival
if self.code:
res["code"] = self.code
res["origin"] = self.origin
res["status"] = self.status.value
if self.fr24_landing_time:
res["fr24_landing_time"] = self.fr24_landing_time
desc = ",".join([f"{k}={v}" for k, v in res.items()])
return f"Detail<{desc}>"
def get_details(
table_entry: et._ElementTree,
aux_data: T.Optional[T.Dict[T.Text, T.Text]] = None,
debug: bool = False,
) -> Details:
"""
Find the dates in a table row. If a strikenthrough time is found, it is
returned as second element in the tuple.
"""
res = table_entry.xpath(".//h5")
if len(res) > 6:
raise ValueError(f"Unexpected number of h5 found in line: {len(res)}")
if debug:
for r in res:
txt = r.text.strip("\t\n ")
print(f"[DEBUG] text={txt} attrs={r.attrib}")
d = Details()
if len(res) == 5:
d.maybe_parse_hour_th(res[0])
d.maybe_parse_code(res[1])
d.maybe_parse_airport(res[2])
d.maybe_parse_status(res[3])
elif len(res) == 6:
d.maybe_parse_hour_th(res[0])
d.maybe_parse_hour_real(res[1])
d.maybe_parse_code(res[2])
d.maybe_parse_airport(res[3])
d.maybe_parse_status(res[4])
if aux_data:
d.maybe_add_aux_data(aux_data)
return d
def parse_fr24(
data: T.Optional[T.Dict[T.Text, T.Any]]
) -> T.Optional[T.Dict[T.Text, T.Text]]:
"""
This function parses the given FlightRadar24 data into a pandas DataFrame.
"""
logger.debug("fr24 raw data: %s", data)
if not data:
return None
try:
results = {}
for flight in data["result"]["response"]["airport"]["pluginData"]["schedule"][
"arrivals"
][
"data"
]: # noqa: E501
try:
id_num = flight["flight"]["identification"]["number"]
if _code := id_num.get("default"):
code = _code
elif _code := id_num.get("alternative"):
code = _code
else:
# skip if no flight code found
continue
ts = flight["flight"]["time"]["real"]["arrival"]
real_arrival = datetime.fromtimestamp(ts).strftime("%H:%M")
results[code] = real_arrival
logger.debug(f"{code} -> {real_arrival}")
except: # noqa: E722
continue
return results
except: # noqa: E722
return None