latecomers/latecomers/parse.py

232 lines
7.0 KiB
Python
Raw Permalink Normal View History

2022-08-18 18:49:47 +02:00
# -*- encoding: utf-8 -*-
2022-08-24 12:43:32 +02:00
from dataclasses import dataclass
2022-09-07 23:44:53 +02:00
from datetime import datetime
2022-08-18 18:49:47 +02:00
from enum import Enum
2022-08-25 13:11:33 +02:00
import logging
2022-08-18 18:49:47 +02:00
import re
import typing as T
2022-08-25 13:11:33 +02:00
from latecomers.helpers import logit
2022-08-18 18:49:47 +02:00
from lxml import etree as et
TIME_RE = re.compile(r"\d\d?:\d\d")
AIRPORT_RE = re.compile(r"[\w\d\s\S]+")
STATUS_RE = re.compile(r"(Arrivato|In Arrivo|Schedulato|Cancellato)")
PARSER = et.HTMLParser()
2022-08-25 13:11:33 +02:00
logger = logging.getLogger(__name__)
2022-08-18 18:49:47 +02:00
def not_empty(obj: et._Element) -> bool:
if type(obj) is et._Element:
children = len(obj.xpath(".//h5"))
return children == 5 or children == 6
raise RuntimeError(f"provided argument is of unsupported type: {type(obj)}")
2022-08-25 13:11:33 +02:00
@logit(logger)
2022-08-18 18:49:47 +02:00
def find_table(html_content: T.Text) -> T.List[et._ElementTree]:
"""
Find the table that holds the data in the html response
"""
root = et.fromstring(html_content, parser=PARSER)
tbody = root.xpath("//tbody[contains(@class, 'table-data')]")
if not tbody:
return []
if len(tbody) != 1:
raise ValueError(f"Unexpected parsing result: found {len(tbody)} results")
result = [child for child in tbody[0].xpath(".//tr") if not_empty(child)]
return result
class Status(Enum):
ARRIVED = "Arrivato"
ARRIVING = "In Arrivo"
SCHEDULED = "Schedulato"
CANCELED = "Cancellato"
UNKNOWN = "Sconosciuto"
@classmethod
def from_str(cls, text: T.Text) -> "Status":
if "Arrivato" in text:
return cls.ARRIVED
elif "In Arrivo" in text:
return cls.ARRIVING
elif "Schedulato" in text:
return cls.SCHEDULED
elif "Cancellato" in text:
return cls.CANCELED
else:
return cls.UNKNOWN
2022-08-24 12:43:32 +02:00
@dataclass
2022-08-18 18:49:47 +02:00
class Details(object):
th_arrival: T.Optional[T.Text] = None
real_arrival: T.Optional[T.Text] = None
code: T.Optional[T.Text] = None
origin: T.Optional[T.Text] = None
status: Status = Status.UNKNOWN
2022-09-07 23:44:53 +02:00
fr24_landing_time: T.Optional[T.Text] = None
2022-08-18 18:49:47 +02:00
2022-08-25 21:35:25 +02:00
def maybe_parse_hour_th(self, h5: et._ElementTree) -> None:
2022-08-18 18:49:47 +02:00
"""
2022-08-25 21:35:25 +02:00
This function fills the fileds related to the theoric arrival hour,
2022-08-18 18:49:47 +02:00
if the input matches some heuristics.
"""
hour = TIME_RE.findall(h5.text)
if len(hour) == 1:
2022-08-25 21:35:25 +02:00
self.th_arrival = hour[0]
2022-08-18 18:49:47 +02:00
if "text-decoration: line-through" in h5.attrib.get("style", ""):
2022-08-25 21:35:25 +02:00
self.real_arrival = None
def maybe_parse_hour_real(self, h5: et._ElementTree) -> None:
"""
This function fills the fileds related to the theoric arrival hour,
if the input matches some heuristics.
"""
hour = TIME_RE.findall(h5.text)
if len(hour) == 1:
self.real_arrival = hour[0]
2022-08-18 18:49:47 +02:00
def maybe_parse_code(self, h5: et._ElementTree) -> None:
"""
This function fills the fileds related to the flight code,
if present and the input matches some heuristics.
"""
2022-09-07 22:17:22 +02:00
if "flight-numb" not in h5.attrib.get("class", ""):
return
child = h5.xpath(".//strong")
if len(child) == 1:
2022-09-07 23:44:53 +02:00
self.code = child[0].text.strip("\t\n ").replace(" ", "")
2022-08-18 18:49:47 +02:00
def maybe_parse_airport(self, h5: et._ElementTree) -> None:
"""
This function fills the field for the airport, if the input matches some
heuristics.
"""
airport = h5.text.strip("\t\n")
if len(airport) > 0 and "blue-title" in h5.attrib.get("class", ""):
self.origin = airport
def maybe_parse_status(self, h5: et._ElementTree) -> None:
"""
This function fills the filed for the status, if the input matches some
heuristics.
"""
_class = h5.attrib.get("class", "")
if "arrivato" in _class or "schedulato" in _class:
self.status = Status.from_str(h5.text)
else:
parsed = STATUS_RE.findall(h5.text)
if len(parsed) == 1:
self.status = Status.from_str(parsed[0])
2022-09-07 23:44:53 +02:00
def maybe_add_aux_data(self, aux_data: T.Dict[T.Text, T.Text]) -> None:
"""
This function extends the current data with auxiliary sources (currently
only FlightRadar24 data).
"""
if not self.code:
return
self.fr24_landing_time = aux_data.get(self.code)
2022-08-18 18:49:47 +02:00
def __str__(self) -> T.Text:
res: T.Dict[T.Text, T.Optional[T.Text]] = {}
if self.th_arrival:
res["theoric"] = self.th_arrival
2022-09-07 23:44:53 +02:00
if self.real_arrival:
res["real"] = self.real_arrival
2022-08-18 18:49:47 +02:00
if self.code:
res["code"] = self.code
res["origin"] = self.origin
res["status"] = self.status.value
2022-09-07 23:44:53 +02:00
if self.fr24_landing_time:
res["fr24_landing_time"] = self.fr24_landing_time
2022-08-18 18:49:47 +02:00
desc = ",".join([f"{k}={v}" for k, v in res.items()])
return f"Detail<{desc}>"
2022-09-07 23:44:53 +02:00
def get_details(
table_entry: et._ElementTree,
aux_data: T.Optional[T.Dict[T.Text, T.Text]] = None,
debug: bool = False,
) -> Details:
2022-08-18 18:49:47 +02:00
"""
Find the dates in a table row. If a strikenthrough time is found, it is
returned as second element in the tuple.
"""
res = table_entry.xpath(".//h5")
if len(res) > 6:
raise ValueError(f"Unexpected number of h5 found in line: {len(res)}")
if debug:
for r in res:
txt = r.text.strip("\t\n ")
print(f"[DEBUG] text={txt} attrs={r.attrib}")
d = Details()
if len(res) == 5:
2022-08-25 21:35:25 +02:00
d.maybe_parse_hour_th(res[0])
2022-08-18 18:49:47 +02:00
d.maybe_parse_code(res[1])
d.maybe_parse_airport(res[2])
d.maybe_parse_status(res[3])
elif len(res) == 6:
2022-08-25 21:35:25 +02:00
d.maybe_parse_hour_th(res[0])
d.maybe_parse_hour_real(res[1])
2022-08-18 18:49:47 +02:00
d.maybe_parse_code(res[2])
d.maybe_parse_airport(res[3])
d.maybe_parse_status(res[4])
2022-09-07 23:44:53 +02:00
if aux_data:
d.maybe_add_aux_data(aux_data)
2022-08-18 18:49:47 +02:00
return d
2022-09-07 23:44:53 +02:00
def parse_fr24(
data: T.Optional[T.Dict[T.Text, T.Any]]
) -> T.Optional[T.Dict[T.Text, T.Text]]:
"""
This function parses the given FlightRadar24 data into a pandas DataFrame.
"""
2022-09-09 11:08:37 +02:00
logger.debug("fr24 raw data: %s", data)
2022-09-07 23:44:53 +02:00
if not data:
return None
try:
results = {}
for flight in data["result"]["response"]["airport"]["pluginData"]["schedule"][
"arrivals"
][
"data"
]: # noqa: E501
try:
id_num = flight["flight"]["identification"]["number"]
2022-09-18 23:29:49 +02:00
if _code := id_num.get("default"):
2022-09-07 23:44:53 +02:00
code = _code
2022-09-18 23:29:49 +02:00
elif _code := id_num.get("alternative"):
2022-09-07 23:44:53 +02:00
code = _code
else:
# skip if no flight code found
continue
ts = flight["flight"]["time"]["real"]["arrival"]
real_arrival = datetime.fromtimestamp(ts).strftime("%H:%M")
results[code] = real_arrival
2022-09-09 11:08:37 +02:00
logger.debug(f"{code} -> {real_arrival}")
2022-09-07 23:44:53 +02:00
except: # noqa: E722
continue
return results
except: # noqa: E722
return None