BotZ/bot_z/bot_z.py

298 lines
9.7 KiB
Python

# -*- coding: utf-8 -*-
"""
Operator is the main object that interacts with the foreign
service. It exposes methods to login, logout and check in and out.
"""
from datetime import datetime, timedelta
import logging
import os
import pkg_resources
import shutil
import sys
import time
import typing as T
from urllib.parse import urlparse
geckoexe = shutil.which("geckodriver")
if geckoexe is None:
local_path = pkg_resources.resource_filename(__name__, "bin")
try:
os.stat(os.path.join(local_path, "geckodriver"))
os.environ["PATH"] = os.environ["PATH"] + ":" + local_path
except FileNotFoundError:
print("Missing geckodriver executable in path", file=sys.stderr)
raise
from selenium import webdriver as wd # noqa
from selenium.common.exceptions import (
WebDriverException,
NoSuchElementException,
) # noqa
# TODO: read it from configuration.
RETRIES = 3
log_fmt = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(log_fmt)
logger = logging.getLogger(__name__)
logger.addHandler(console)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
logger.debug("Init at debug")
def safely(retries: int = 0) -> T.Callable:
retr = retries
def _safely(f: T.Callable) -> T.Callable:
ret = retr
def _protection(self, *args, **kwargs):
r = ret
done = False
while r > 0:
try:
f(self, *args, **kwargs)
logger.debug("Success executing %s", f.__name__)
self.switch_to.default_content()
return
except WebDriverException as e:
self.logger.error(
"Something went wrong: %s [tentative #%s]", e, ret - r
)
r -= 1
return _protection
return _safely
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
try:
driver.find_element_by_xpath(xpath)
return True
except NoSuchElementException:
return False
def is_present(
driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta] = None
) -> bool:
"""
Helper function. If an element is present in the DOM tree,
returns true. False otherwise.
"""
if timeout is None:
return _is_present(driver, xpath)
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while _elapsed < timeout:
logger.debug("Not yet present: %s", xpath)
if _is_present(driver, xpath):
logger.debug("Present: %s", xpath)
return True
time.sleep(0.5)
_elapsed = datetime.now() - _now
return False
class Operator(wd.Firefox):
def __init__(
self,
base_uri: str,
name: str = None,
timeout: int = 20,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
*args,
**kwargs
) -> None:
"""
Adds some configuration to Firefox.
"""
self.retries = RETRIES
self.profile = wd.FirefoxProfile()
# Do not send telemetry
self.profile.set_preference("datareporting.policy.dataSubmissionEnabled", False)
self.profile.set_preference("datareporting.healthreport.service.enabled", False)
self.profile.set_preference("datareporting.healthreport.uploadEnabled", False)
self.opts = wd.firefox.options.Options()
self.opts.headless = headless
self.debug = debug
self.base_uri = base_uri
self.uri = urlparse(base_uri)
self.timeout = timedelta(seconds=timeout)
if proxy:
self.profile.set_preference("network.proxy.type", 1)
self.profile.set_preference("network.proxy.http", proxy[0])
self.profile.set_preference("network.proxy.http_port", proxy[1])
self.profile.set_preference("network.proxy.ssl", proxy[0])
self.profile.set_preference("network.proxy.ssl_port", proxy[1])
super().__init__(
firefox_profile=self.profile, options=self.opts, *args, **kwargs
)
self.fullscreen_window()
self.z_name = name if name is not None else __name__
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.debug("Debug level")
self._logged_in = False
self._checked_in = False
self.username = None
# Clean preceding session
self.delete_all_cookies()
@safely(RETRIES)
def login(self, user: str, password: str, force: bool = False) -> None:
"""
Do the login and proceed.
"""
if self._logged_in:
self.logger.warning("Already logged in: %s", user)
if not force:
return
self.logger.info("Forcing login: %s", user)
# Retrieve login page
self.get(self.base_uri)
_correct_url = "cpccchk" in self.current_url
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while not _correct_url:
self.logger.debug("Not yet on login page: %s", self.current_url)
time.sleep(0.5)
_correct_url = "cpccchk" in self.current_url
_elapsed = datetime.now() - _now
if _elapsed > self.timeout:
break
self.logger.debug("After login get: %s", self.current_url)
time.sleep(1)
# Username
user_form = self.find_element_by_name("m_cUserName")
# Password
pass_form = self.find_element_by_name("m_cPassword")
# Login button
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
# Compile and submit
user_form.send_keys(user)
pass_form.send_keys(password)
do_it = True
if self.debug and not force:
_do_it = input("Really do the login? [y/n] ").lower()
do_it = True if _do_it == "y" else False
if do_it:
login_butt.submit()
time.sleep(5)
self.logger.debug("Login result: %s", self.title)
if "Routine window" in self.title:
self.logger.debug("Reloading...")
self.refresh()
self.switch_to.alert.accept()
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
self._logged_in = True
self.logger.info("Login success for user: %s", user)
else:
self.logger.error("Login failed: %s", user)
self.username = user
@safely(RETRIES)
def logout(self, force: bool = False) -> None:
"""
Do the logout.
"""
if not self._logged_in:
self.logger.warning("Not yet logged in")
if not force:
return
self.logger.info("Forcing logout: %s", self.username)
# Find the Profile menu and open it
profile_butt = self.find_element_by_xpath(
'//span[contains(@id, "imgNoPhotoLabel")]'
)
profile_butt.click()
time.sleep(1)
# Find the logout button
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
logout_butt.click()
if "jsp/usut_wapplogout_portlet.jsp" in self.current_url:
self.logger.info("User successfully logged out: %s", self.username)
self.username = None
else:
self.logger.warning("Logout failed: %s", self.username)
@property
def logged_in(self) -> bool:
"""
Check if already logged in. Checks if page is '/jsp/home.jsp'
and if login cookie is set (and not expired).
"""
_base_domain = ".".join(self.uri.netloc.split(".")[-2:])
cookies = [c["name"] for c in self.get_cookies() if _base_domain in c["domain"]]
_right_url = "/jsp/home.jsp" in self.current_url
_cookies = "dtLatC" in cookies
return _right_url and _cookies
@property
def checked_in(self) -> bool:
"""
Check if the user is checked in already.
"""
return self._checked_in
@safely(RETRIES)
def check_in(self, force: bool = False) -> None:
"""
Click the check in button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if self._checked_in:
self.logger.warn("Already checked in!")
if not force:
return
iframe = self.find_element_by_xpath(
'//iframe[contains(@id, "gsmd_container.jsp")]'
)
self.switch_to.frame(iframe)
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
enter_butt.click()
# Click the check in button and change
# self._checked_in state in case of success
self._checked_in = True
@safely(RETRIES)
def check_out(self, force: bool = False) -> None:
"""
Click the check out button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if not self._checked_in:
self.logger.warn("Not yet checked in!")
if not force:
return
iframe = self.find_element_by_xpath(
'//iframe[contains(@id, "gsmd_container.jsp")]'
)
self.switch_to.frame(iframe)
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
exit_butt.click()
# Click the check in button and change
# self._checked_in state in case of success
self._checked_in = False
def __del__(self) -> None:
self.quit()