388 lines
13 KiB
Python
388 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Operator is the main object that interacts with the foreign
|
|
service. It exposes methods to login, logout and check in and out.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
import os
|
|
import platform
|
|
import pkg_resources
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import typing as T
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
def path_from_platform() -> T.Text:
|
|
res = None
|
|
_plat = sys.platform
|
|
_arch = platform.architecture()
|
|
if _plat == "darwin":
|
|
return "macos"
|
|
if _plat == "win32":
|
|
res = "win"
|
|
elif _plat.startswith("linux"):
|
|
res = "linux"
|
|
|
|
if res is None:
|
|
raise RuntimeError("Platform unknown: {}".format(_plat))
|
|
|
|
if "64" in _arch[0]:
|
|
res += "64"
|
|
elif "32" in _arch[0]:
|
|
res += "32"
|
|
|
|
return res
|
|
|
|
|
|
geckoname = "geckodriver"
|
|
geckoexe = shutil.which(geckoname)
|
|
if geckoexe is None:
|
|
_plat_path = path_from_platform()
|
|
local_path = pkg_resources.resource_filename(
|
|
__name__, os.path.join("bin", _plat_path)
|
|
)
|
|
try:
|
|
if "win" in _plat_path:
|
|
geckoname = f"{geckoname}.exe"
|
|
os.stat(os.path.join(local_path, geckoname))
|
|
os.environ["PATH"] = os.environ["PATH"] + ":" + local_path
|
|
except FileNotFoundError:
|
|
print("Missing geckodriver executable in path", file=sys.stderr)
|
|
raise
|
|
|
|
from selenium import webdriver as wd # noqa
|
|
from selenium.common.exceptions import (
|
|
WebDriverException,
|
|
NoSuchElementException,
|
|
) # noqa
|
|
|
|
|
|
# TODO: read it from configuration.
|
|
RETRIES = 3
|
|
|
|
log_fmt = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
|
|
console = logging.StreamHandler(stream=sys.stdout)
|
|
console.setFormatter(log_fmt)
|
|
logger = logging.getLogger(__name__)
|
|
logger.addHandler(console)
|
|
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
|
|
logger.debug("Init at debug")
|
|
|
|
|
|
def safely(retries: int = 0) -> T.Callable:
|
|
retr = retries
|
|
|
|
def _safely(f: T.Callable) -> T.Callable:
|
|
ret = retr
|
|
|
|
def _protection(self, *args, **kwargs):
|
|
r = ret
|
|
while r > 0:
|
|
try:
|
|
val = f(self, *args, **kwargs)
|
|
logger.debug("Success executing %s", f.__name__)
|
|
self.switch_to.default_content()
|
|
return val
|
|
except WebDriverException as e:
|
|
self.logger.error(
|
|
"Something went wrong: %s [tentative #%s]", e, ret - r
|
|
)
|
|
r -= 1
|
|
time.sleep(2) # TODO: set value from config.
|
|
|
|
return _protection
|
|
|
|
return _safely
|
|
|
|
|
|
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
|
|
try:
|
|
driver.find_element_by_xpath(xpath)
|
|
return True
|
|
except NoSuchElementException:
|
|
return False
|
|
|
|
|
|
def is_present(
|
|
driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta] = None
|
|
) -> bool:
|
|
"""
|
|
Helper function. If an element is present in the DOM tree,
|
|
returns true. False otherwise.
|
|
"""
|
|
if timeout is None:
|
|
return _is_present(driver, xpath)
|
|
|
|
_now = datetime.now()
|
|
_elapsed = timedelta(seconds=0)
|
|
while _elapsed < timeout:
|
|
logger.debug("Not yet present: %s", xpath)
|
|
if _is_present(driver, xpath):
|
|
logger.debug("Present: %s", xpath)
|
|
return True
|
|
time.sleep(0.5)
|
|
_elapsed = datetime.now() - _now
|
|
return False
|
|
|
|
|
|
class Operator(wd.Firefox):
|
|
def __init__(
|
|
self,
|
|
base_uri: str,
|
|
name: str = None,
|
|
timeout: int = 20,
|
|
proxy: T.Optional[T.Tuple[str, int]] = None,
|
|
headless: bool = True,
|
|
debug: bool = False,
|
|
*args,
|
|
**kwargs,
|
|
) -> None:
|
|
"""
|
|
Adds some configuration to Firefox.
|
|
"""
|
|
self.retries = RETRIES
|
|
self.profile = wd.FirefoxProfile()
|
|
# Do not send telemetry
|
|
self.profile.set_preference("datareporting.policy.dataSubmissionEnabled", False)
|
|
self.profile.set_preference("datareporting.healthreport.service.enabled", False)
|
|
self.profile.set_preference("datareporting.healthreport.uploadEnabled", False)
|
|
self.profile.set_preference("dom.webnotifications.enabled", False)
|
|
|
|
self.opts = wd.firefox.options.Options()
|
|
self.opts.headless = headless
|
|
|
|
self.debug = debug
|
|
self.base_uri = base_uri
|
|
self.uri = urlparse(base_uri)
|
|
self.timeout = timedelta(seconds=timeout)
|
|
|
|
if proxy:
|
|
self.profile.set_preference("network.proxy.type", 1)
|
|
self.profile.set_preference("network.proxy.http", proxy[0])
|
|
self.profile.set_preference("network.proxy.http_port", proxy[1])
|
|
self.profile.set_preference("network.proxy.ssl", proxy[0])
|
|
self.profile.set_preference("network.proxy.ssl_port", proxy[1])
|
|
|
|
super().__init__(
|
|
firefox_profile=self.profile, options=self.opts, *args, **kwargs
|
|
)
|
|
self.fullscreen_window()
|
|
|
|
self.z_name = name if name is not None else __name__
|
|
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
|
|
if debug:
|
|
self.logger.setLevel(logging.DEBUG)
|
|
self.logger.debug("Debug level")
|
|
self._logged_in = False
|
|
self.username = None
|
|
# Clean preceding session
|
|
self.delete_all_cookies()
|
|
# Fix incompatibility
|
|
if not hasattr(self, "switch_to_default_content"):
|
|
self.logger.debug("switch_to_default_content patched.")
|
|
self.switch_to_default_content = self.switch_to.default_content
|
|
|
|
@safely(RETRIES)
|
|
def login(self, user: str, password: str, force: bool = False) -> None:
|
|
"""
|
|
Do the login and proceed.
|
|
"""
|
|
self.logger.info("Logging in - user: %s - session: %s", user, self.name)
|
|
if self.logged_in:
|
|
self.logger.warning("Already logged in: %s", user)
|
|
if not force:
|
|
return
|
|
self.logger.info("Forcing login: %s", user)
|
|
# Retrieve login page if not yet on it
|
|
if self.base_uri not in self.current_url:
|
|
self.get(self.base_uri)
|
|
time.sleep(1)
|
|
_correct_url = "cpccchk" in self.current_url
|
|
_now = datetime.now()
|
|
_elapsed = timedelta(seconds=0)
|
|
while (
|
|
not is_present(self, '//input[contains(@id, "_Accedi")]') and _correct_url
|
|
):
|
|
self.logger.debug("Not yet on login page: %s", self.current_url)
|
|
time.sleep(0.5)
|
|
_correct_url = "cpccchk" in self.current_url
|
|
_elapsed = datetime.now() - _now
|
|
if _elapsed > self.timeout:
|
|
self.logger.error(
|
|
"Login page did not load properly: %s", self.current_url
|
|
)
|
|
return
|
|
self.logger.debug("After login get: %s", self.current_url)
|
|
time.sleep(1)
|
|
self.logger.info("Looking for fields in session %s", self.name)
|
|
# Username
|
|
user_form = self.find_element_by_name("m_cUserName")
|
|
# Password
|
|
pass_form = self.find_element_by_name("m_cPassword")
|
|
# Login button
|
|
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
|
|
# Compile and submit
|
|
user_form.send_keys(user)
|
|
pass_form.send_keys(password)
|
|
login_butt.submit()
|
|
time.sleep(5)
|
|
self.logger.debug("Login result: %s", self.title)
|
|
safe_counter = 0
|
|
while "Routine window" in self.title:
|
|
self.logger.debug("Reloading...")
|
|
self.refresh()
|
|
self.logger.info("Reloaded %s", self.name)
|
|
self.switch_to.alert.accept()
|
|
time.sleep(0.5)
|
|
if safe_counter > 5:
|
|
self.logger.error("Too many reloads. Aborting.")
|
|
raise RuntimeError("Too many reloads.")
|
|
safe_counter += 1
|
|
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
|
|
self._logged_in = True
|
|
self.logger.info("Login succeeded for user: %s", user)
|
|
else:
|
|
self.logger.error("Login failed: %s", user)
|
|
self.username = user
|
|
|
|
@safely(RETRIES)
|
|
def logout(self, force: bool = False) -> None:
|
|
"""
|
|
Do the logout.
|
|
"""
|
|
if not self._logged_in:
|
|
self.logger.warning("Not yet logged in")
|
|
if not force:
|
|
return
|
|
self.logger.info("Forcing logout: %s", self.username)
|
|
# Find the Profile menu and open it
|
|
profile_butt = self.find_element_by_xpath(
|
|
'//span[contains(@id, "imgNoPhotoLabel")]'
|
|
)
|
|
profile_butt.click()
|
|
time.sleep(1)
|
|
# Find the logout button
|
|
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
|
|
logout_butt.click()
|
|
if self._back_to_login():
|
|
self.logger.debug("Back on login page")
|
|
|
|
@safely(RETRIES)
|
|
def _back_to_login(self) -> bool:
|
|
_now = datetime.now()
|
|
_elapsed = timedelta(seconds=0)
|
|
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
|
|
while not _is_logout_page or _elapsed > self.timeout:
|
|
time.sleep(1)
|
|
self.logger.debug("Waiting to land on logout page...")
|
|
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
|
|
_elapsed = datetime.now() - _now
|
|
if _is_logout_page:
|
|
self.logger.info("User successfully logged out: %s", self.username)
|
|
self.username = None
|
|
back_to_login_butt = self.find_element_by_xpath(
|
|
'//input[contains(@id, "_button15")]'
|
|
)
|
|
back_to_login_butt.click()
|
|
self.delete_all_cookies()
|
|
return True
|
|
else:
|
|
self.logger.warning("Logout failed: %s", self.username)
|
|
return False
|
|
|
|
@property
|
|
def logged_in(self) -> bool:
|
|
"""
|
|
Check if already logged in. Checks if page is '/jsp/home.jsp'
|
|
and if login cookie is set (and not expired).
|
|
"""
|
|
_base_domain = ".".join(self.uri.netloc.split(".")[-2:])
|
|
cookies = [c["name"] for c in self.get_cookies() if _base_domain in c["domain"]]
|
|
self.logger.debug("Cookies: %s", cookies)
|
|
# _right_url = "/jsp/home.jsp" in self.current_url
|
|
_cookies = all(c in cookies for c in ("spcookie", "JSESSIONID", "ipclientid"))
|
|
return _cookies
|
|
|
|
def _switch_to_container(self) -> None:
|
|
try:
|
|
iframe = self.find_element_by_xpath(
|
|
'//iframe[contains(@id, "gsmd_container.jsp")]'
|
|
)
|
|
self.switch_to.frame(iframe)
|
|
except NoSuchElementException:
|
|
pass
|
|
|
|
def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
|
|
self._switch_to_container()
|
|
try:
|
|
result = [] # type: T.List[T.Tuple[T.Text]]
|
|
movements_table = self.find_elements_by_xpath(
|
|
'//div[contains(@class, "ushp_wterminale_container")]//div[@ps-name="Grid2"]//table/tbody/tr'
|
|
)
|
|
for row in movements_table:
|
|
data = row.text.strip().split("\n") # type: T.List[T.Text]
|
|
result.append( # type: ignore
|
|
tuple(i.strip() for i in data if i.strip())
|
|
)
|
|
except NoSuchElementException:
|
|
result = []
|
|
finally:
|
|
self.switch_to_default_content()
|
|
return result # type: ignore
|
|
|
|
@property
|
|
def checked_in(self) -> bool:
|
|
"""
|
|
Check if the user is checked in already.
|
|
"""
|
|
if not self.logged_in:
|
|
return False
|
|
dates = self.get_movements()
|
|
if not dates:
|
|
return False
|
|
if dates[-1][0] == "Entrata":
|
|
return True
|
|
return False
|
|
|
|
@safely(RETRIES)
|
|
def check_in(self, force: bool = False) -> None:
|
|
"""
|
|
Click the check in button.
|
|
"""
|
|
if not force and not self.logged_in:
|
|
self.logger.warning("Not logged in!")
|
|
return
|
|
if self.checked_in:
|
|
self.logger.warn("Already checked in!")
|
|
if not force:
|
|
return
|
|
self._switch_to_container()
|
|
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
|
|
enter_butt.click()
|
|
self.switch_to_default_content()
|
|
|
|
@safely(RETRIES)
|
|
def check_out(self, force: bool = False) -> None:
|
|
"""
|
|
Click the check out button.
|
|
"""
|
|
if not force and not self.logged_in:
|
|
self.logger.warning("Not logged in!")
|
|
return
|
|
if not self.checked_in:
|
|
self.logger.warn("Not yet checked in!")
|
|
if not force:
|
|
return
|
|
self._switch_to_container()
|
|
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
|
|
exit_butt.click()
|
|
self.switch_to_default_content()
|
|
|
|
def __del__(self) -> None:
|
|
self.quit()
|