BotZ/bot_z/operator.py

388 lines
13 KiB
Python

# -*- coding: utf-8 -*-
"""
Operator is the main object that interacts with the foreign
service. It exposes methods to login, logout and check in and out.
"""
from datetime import datetime, timedelta
import logging
import os
import platform
import pkg_resources
import shutil
import sys
import time
import typing as T
from urllib.parse import urlparse
def path_from_platform() -> T.Text:
res = None
_plat = sys.platform
_arch = platform.architecture()
if _plat == "darwin":
return "macos"
if _plat == "win32":
res = "win"
elif _plat.startswith("linux"):
res = "linux"
if res is None:
raise RuntimeError("Platform unknown: {}".format(_plat))
if "64" in _arch[0]:
res += "64"
elif "32" in _arch[0]:
res += "32"
return res
geckoname = "geckodriver"
geckoexe = shutil.which(geckoname)
if geckoexe is None:
_plat_path = path_from_platform()
local_path = pkg_resources.resource_filename(
__name__, os.path.join("bin", _plat_path)
)
try:
if "win" in _plat_path:
geckoname = f"{geckoname}.exe"
os.stat(os.path.join(local_path, geckoname))
os.environ["PATH"] = os.environ["PATH"] + ":" + local_path
except FileNotFoundError:
print("Missing geckodriver executable in path", file=sys.stderr)
raise
from selenium import webdriver as wd # noqa
from selenium.common.exceptions import (
WebDriverException,
NoSuchElementException,
) # noqa
# TODO: read it from configuration.
RETRIES = 3
log_fmt = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(log_fmt)
logger = logging.getLogger(__name__)
logger.addHandler(console)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
logger.debug("Init at debug")
def safely(retries: int = 0) -> T.Callable:
retr = retries
def _safely(f: T.Callable) -> T.Callable:
ret = retr
def _protection(self, *args, **kwargs):
r = ret
while r > 0:
try:
val = f(self, *args, **kwargs)
logger.debug("Success executing %s", f.__name__)
self.switch_to.default_content()
return val
except WebDriverException as e:
self.logger.error(
"Something went wrong: %s [tentative #%s]", e, ret - r
)
r -= 1
time.sleep(2) # TODO: set value from config.
return _protection
return _safely
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
try:
driver.find_element_by_xpath(xpath)
return True
except NoSuchElementException:
return False
def is_present(
driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta] = None
) -> bool:
"""
Helper function. If an element is present in the DOM tree,
returns true. False otherwise.
"""
if timeout is None:
return _is_present(driver, xpath)
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while _elapsed < timeout:
logger.debug("Not yet present: %s", xpath)
if _is_present(driver, xpath):
logger.debug("Present: %s", xpath)
return True
time.sleep(0.5)
_elapsed = datetime.now() - _now
return False
class Operator(wd.Firefox):
def __init__(
self,
base_uri: str,
name: str = None,
timeout: int = 20,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
*args,
**kwargs,
) -> None:
"""
Adds some configuration to Firefox.
"""
self.retries = RETRIES
self.profile = wd.FirefoxProfile()
# Do not send telemetry
self.profile.set_preference("datareporting.policy.dataSubmissionEnabled", False)
self.profile.set_preference("datareporting.healthreport.service.enabled", False)
self.profile.set_preference("datareporting.healthreport.uploadEnabled", False)
self.profile.set_preference("dom.webnotifications.enabled", False)
self.opts = wd.firefox.options.Options()
self.opts.headless = headless
self.debug = debug
self.base_uri = base_uri
self.uri = urlparse(base_uri)
self.timeout = timedelta(seconds=timeout)
if proxy:
self.profile.set_preference("network.proxy.type", 1)
self.profile.set_preference("network.proxy.http", proxy[0])
self.profile.set_preference("network.proxy.http_port", proxy[1])
self.profile.set_preference("network.proxy.ssl", proxy[0])
self.profile.set_preference("network.proxy.ssl_port", proxy[1])
super().__init__(
firefox_profile=self.profile, options=self.opts, *args, **kwargs
)
self.fullscreen_window()
self.z_name = name if name is not None else __name__
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.debug("Debug level")
self._logged_in = False
self.username = None
# Clean preceding session
self.delete_all_cookies()
# Fix incompatibility
if not hasattr(self, "switch_to_default_content"):
self.logger.debug("switch_to_default_content patched.")
self.switch_to_default_content = self.switch_to.default_content
@safely(RETRIES)
def login(self, user: str, password: str, force: bool = False) -> None:
"""
Do the login and proceed.
"""
self.logger.info("Logging in - user: %s - session: %s", user, self.name)
if self.logged_in:
self.logger.warning("Already logged in: %s", user)
if not force:
return
self.logger.info("Forcing login: %s", user)
# Retrieve login page if not yet on it
if self.base_uri not in self.current_url:
self.get(self.base_uri)
time.sleep(1)
_correct_url = "cpccchk" in self.current_url
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while (
not is_present(self, '//input[contains(@id, "_Accedi")]') and _correct_url
):
self.logger.debug("Not yet on login page: %s", self.current_url)
time.sleep(0.5)
_correct_url = "cpccchk" in self.current_url
_elapsed = datetime.now() - _now
if _elapsed > self.timeout:
self.logger.error(
"Login page did not load properly: %s", self.current_url
)
return
self.logger.debug("After login get: %s", self.current_url)
time.sleep(1)
self.logger.info("Looking for fields in session %s", self.name)
# Username
user_form = self.find_element_by_name("m_cUserName")
# Password
pass_form = self.find_element_by_name("m_cPassword")
# Login button
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
# Compile and submit
user_form.send_keys(user)
pass_form.send_keys(password)
login_butt.submit()
time.sleep(5)
self.logger.debug("Login result: %s", self.title)
safe_counter = 0
while "Routine window" in self.title:
self.logger.debug("Reloading...")
self.refresh()
self.logger.info("Reloaded %s", self.name)
self.switch_to.alert.accept()
time.sleep(0.5)
if safe_counter > 5:
self.logger.error("Too many reloads. Aborting.")
raise RuntimeError("Too many reloads.")
safe_counter += 1
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
self._logged_in = True
self.logger.info("Login succeeded for user: %s", user)
else:
self.logger.error("Login failed: %s", user)
self.username = user
@safely(RETRIES)
def logout(self, force: bool = False) -> None:
"""
Do the logout.
"""
if not self._logged_in:
self.logger.warning("Not yet logged in")
if not force:
return
self.logger.info("Forcing logout: %s", self.username)
# Find the Profile menu and open it
profile_butt = self.find_element_by_xpath(
'//span[contains(@id, "imgNoPhotoLabel")]'
)
profile_butt.click()
time.sleep(1)
# Find the logout button
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
logout_butt.click()
if self._back_to_login():
self.logger.debug("Back on login page")
@safely(RETRIES)
def _back_to_login(self) -> bool:
_now = datetime.now()
_elapsed = timedelta(seconds=0)
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
while not _is_logout_page or _elapsed > self.timeout:
time.sleep(1)
self.logger.debug("Waiting to land on logout page...")
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
_elapsed = datetime.now() - _now
if _is_logout_page:
self.logger.info("User successfully logged out: %s", self.username)
self.username = None
back_to_login_butt = self.find_element_by_xpath(
'//input[contains(@id, "_button15")]'
)
back_to_login_butt.click()
self.delete_all_cookies()
return True
else:
self.logger.warning("Logout failed: %s", self.username)
return False
@property
def logged_in(self) -> bool:
"""
Check if already logged in. Checks if page is '/jsp/home.jsp'
and if login cookie is set (and not expired).
"""
_base_domain = ".".join(self.uri.netloc.split(".")[-2:])
cookies = [c["name"] for c in self.get_cookies() if _base_domain in c["domain"]]
self.logger.debug("Cookies: %s", cookies)
# _right_url = "/jsp/home.jsp" in self.current_url
_cookies = all(c in cookies for c in ("spcookie", "JSESSIONID", "ipclientid"))
return _cookies
def _switch_to_container(self) -> None:
try:
iframe = self.find_element_by_xpath(
'//iframe[contains(@id, "gsmd_container.jsp")]'
)
self.switch_to.frame(iframe)
except NoSuchElementException:
pass
def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
self._switch_to_container()
try:
result = [] # type: T.List[T.Tuple[T.Text]]
movements_table = self.find_elements_by_xpath(
'//div[contains(@class, "ushp_wterminale_container")]//div[@ps-name="Grid2"]//table/tbody/tr'
)
for row in movements_table:
data = row.text.strip().split("\n") # type: T.List[T.Text]
result.append( # type: ignore
tuple(i.strip() for i in data if i.strip())
)
except NoSuchElementException:
result = []
finally:
self.switch_to_default_content()
return result # type: ignore
@property
def checked_in(self) -> bool:
"""
Check if the user is checked in already.
"""
if not self.logged_in:
return False
dates = self.get_movements()
if not dates:
return False
if dates[-1][0] == "Entrata":
return True
return False
@safely(RETRIES)
def check_in(self, force: bool = False) -> None:
"""
Click the check in button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if self.checked_in:
self.logger.warn("Already checked in!")
if not force:
return
self._switch_to_container()
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
enter_butt.click()
self.switch_to_default_content()
@safely(RETRIES)
def check_out(self, force: bool = False) -> None:
"""
Click the check out button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if not self.checked_in:
self.logger.warn("Not yet checked in!")
if not force:
return
self._switch_to_container()
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
exit_butt.click()
self.switch_to_default_content()
def __del__(self) -> None:
self.quit()