BotZ/bot_z/operator.py

342 lines
11 KiB
Python
Raw Normal View History

2019-01-07 11:16:16 +01:00
# -*- coding: utf-8 -*-
2019-01-14 12:11:17 +01:00
"""
Operator is the main object that interacts with the foreign
service. It exposes methods to login, logout and check in and out.
"""
2019-01-14 12:11:17 +01:00
from datetime import datetime, timedelta
import logging
import os
2019-01-14 17:51:19 +01:00
import pkg_resources
2019-01-21 09:32:52 +01:00
import shutil
import sys
import time
import typing as T
2019-01-14 15:41:08 +01:00
from urllib.parse import urlparse
2019-01-21 09:32:52 +01:00
geckoexe = shutil.which("geckodriver")
if geckoexe is None:
local_path = pkg_resources.resource_filename(__name__, "bin")
try:
os.stat(os.path.join(local_path, "geckodriver"))
os.environ["PATH"] = os.environ["PATH"] + ":" + local_path
except FileNotFoundError:
print("Missing geckodriver executable in path", file=sys.stderr)
raise
2019-01-21 09:32:52 +01:00
from selenium import webdriver as wd # noqa
from selenium.common.exceptions import (
WebDriverException,
NoSuchElementException,
) # noqa
2019-01-28 18:15:22 +01:00
# TODO: read it from configuration.
RETRIES = 3
2019-02-05 15:10:45 +01:00
log_fmt = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(log_fmt)
logger = logging.getLogger(__name__)
logger.addHandler(console)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
logger.debug("Init at debug")
2019-01-28 18:15:22 +01:00
def safely(retries: int = 0) -> T.Callable:
retr = retries
2019-01-28 18:15:22 +01:00
def _safely(f: T.Callable) -> T.Callable:
ret = retr
2019-01-28 18:15:22 +01:00
def _protection(self, *args, **kwargs):
r = ret
2019-01-28 18:15:22 +01:00
done = False
while r > 0:
2019-01-28 18:15:22 +01:00
try:
2019-03-05 14:48:17 +01:00
val = f(self, *args, **kwargs)
logger.debug("Success executing %s", f.__name__)
self.switch_to.default_content()
2019-03-05 14:48:17 +01:00
return val
2019-01-28 18:15:22 +01:00
except WebDriverException as e:
self.logger.error(
"Something went wrong: %s [tentative #%s]", e, ret - r
2019-01-28 18:15:22 +01:00
)
r -= 1
2019-03-05 14:48:17 +01:00
time.sleep(1)
2019-01-14 17:51:19 +01:00
2019-01-28 18:15:22 +01:00
return _protection
2019-01-28 18:15:22 +01:00
return _safely
2019-01-14 12:11:17 +01:00
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
try:
driver.find_element_by_xpath(xpath)
return True
except NoSuchElementException:
return False
2019-01-14 17:51:19 +01:00
def is_present(
driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta] = None
) -> bool:
2019-01-14 12:11:17 +01:00
"""
Helper function. If an element is present in the DOM tree,
returns true. False otherwise.
"""
if timeout is None:
return _is_present(driver, xpath)
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while _elapsed < timeout:
2019-02-05 15:10:45 +01:00
logger.debug("Not yet present: %s", xpath)
2019-01-14 12:11:17 +01:00
if _is_present(driver, xpath):
2019-02-05 15:10:45 +01:00
logger.debug("Present: %s", xpath)
2019-01-14 12:11:17 +01:00
return True
time.sleep(0.5)
_elapsed = datetime.now() - _now
return False
class Operator(wd.Firefox):
def __init__(
2019-01-14 17:51:19 +01:00
self,
base_uri: str,
name: str = None,
timeout: int = 20,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
*args,
2019-07-29 12:21:21 +02:00
**kwargs,
2019-01-14 17:51:19 +01:00
) -> None:
"""
Adds some configuration to Firefox.
"""
2019-01-28 18:15:22 +01:00
self.retries = RETRIES
self.profile = wd.FirefoxProfile()
# Do not send telemetry
2019-01-14 17:51:19 +01:00
self.profile.set_preference("datareporting.policy.dataSubmissionEnabled", False)
self.profile.set_preference("datareporting.healthreport.service.enabled", False)
self.profile.set_preference("datareporting.healthreport.uploadEnabled", False)
self.opts = wd.firefox.options.Options()
self.opts.headless = headless
self.debug = debug
self.base_uri = base_uri
2019-01-14 15:41:08 +01:00
self.uri = urlparse(base_uri)
2019-01-14 12:11:17 +01:00
self.timeout = timedelta(seconds=timeout)
if proxy:
2019-01-14 17:51:19 +01:00
self.profile.set_preference("network.proxy.type", 1)
self.profile.set_preference("network.proxy.http", proxy[0])
self.profile.set_preference("network.proxy.http_port", proxy[1])
self.profile.set_preference("network.proxy.ssl", proxy[0])
self.profile.set_preference("network.proxy.ssl_port", proxy[1])
2019-01-14 17:51:19 +01:00
super().__init__(
firefox_profile=self.profile, options=self.opts, *args, **kwargs
)
2019-01-14 15:41:08 +01:00
self.fullscreen_window()
self.z_name = name if name is not None else __name__
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.debug("Debug level")
self._logged_in = False
self.username = None
# Clean preceding session
self.delete_all_cookies()
2019-01-28 18:15:22 +01:00
@safely(RETRIES)
2019-01-14 17:51:19 +01:00
def login(self, user: str, password: str, force: bool = False) -> None:
"""
Do the login and proceed.
"""
2019-03-05 14:48:17 +01:00
self.logger.info("Logging in - user: %s - session: %s", user, self.name)
if self.logged_in:
2019-01-14 12:11:17 +01:00
self.logger.warning("Already logged in: %s", user)
if not force:
return
self.logger.info("Forcing login: %s", user)
2019-03-05 14:48:17 +01:00
# Retrieve login page if not yet on it
if self.base_uri not in self.current_url:
self.get(self.base_uri)
time.sleep(1)
2019-01-14 17:51:19 +01:00
_correct_url = "cpccchk" in self.current_url
2019-01-14 12:11:17 +01:00
_now = datetime.now()
_elapsed = timedelta(seconds=0)
2019-02-18 12:19:19 +01:00
while (
not is_present(self, '//input[contains(@id, "_Accedi")]') and _correct_url
):
2019-01-14 12:11:17 +01:00
self.logger.debug("Not yet on login page: %s", self.current_url)
time.sleep(0.5)
2019-01-14 17:51:19 +01:00
_correct_url = "cpccchk" in self.current_url
2019-01-14 12:11:17 +01:00
_elapsed = datetime.now() - _now
if _elapsed > self.timeout:
2019-02-18 12:19:19 +01:00
self.logger.error(
"Login page did not load properly: %s", self.current_url
)
return
self.logger.debug("After login get: %s", self.current_url)
2019-01-14 12:11:17 +01:00
time.sleep(1)
2019-03-05 14:48:17 +01:00
self.logger.info("Looking for fields in session %s", self.name)
# Username
2019-01-14 17:51:19 +01:00
user_form = self.find_element_by_name("m_cUserName")
# Password
2019-01-14 17:51:19 +01:00
pass_form = self.find_element_by_name("m_cPassword")
# Login button
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
# Compile and submit
user_form.send_keys(user)
pass_form.send_keys(password)
2019-03-05 14:48:17 +01:00
login_butt.submit()
time.sleep(5)
self.logger.debug("Login result: %s", self.title)
while "Routine window" in self.title:
self.logger.debug("Reloading...")
self.refresh()
self.logger.info("Reloaded %s", self.name)
self.switch_to.alert.accept()
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
self._logged_in = True
self.logger.info("Login succeeded for user: %s", user)
else:
self.logger.error("Login failed: %s", user)
self.username = user
2019-01-14 12:11:17 +01:00
2019-01-28 18:15:22 +01:00
@safely(RETRIES)
def logout(self, force: bool = False) -> None:
2019-01-14 12:11:17 +01:00
"""
Do the logout.
"""
if not self._logged_in:
self.logger.warning("Not yet logged in")
2019-01-14 12:11:17 +01:00
if not force:
return
self.logger.info("Forcing logout: %s", self.username)
2019-01-14 12:11:17 +01:00
# Find the Profile menu and open it
2019-01-14 17:51:19 +01:00
profile_butt = self.find_element_by_xpath(
'//span[contains(@id, "imgNoPhotoLabel")]'
)
2019-01-14 12:11:17 +01:00
profile_butt.click()
time.sleep(1)
# Find the logout button
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
logout_butt.click()
2019-03-05 14:48:17 +01:00
if self._back_to_login():
self.logger.debug("Back on login page")
@safely(RETRIES)
def _back_to_login(self) -> bool:
_now = datetime.now()
_elapsed = timedelta(seconds=0)
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
while not _is_logout_page or _elapsed > self.timeout:
time.sleep(1)
self.logger.debug("Waiting to land on logout page...")
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
_elapsed = datetime.now() - _now
if _is_logout_page:
self.logger.info("User successfully logged out: %s", self.username)
self.username = None
2019-03-05 14:48:17 +01:00
back_to_login_butt = self.find_element_by_xpath(
'//input[contains(@id, "_button15")]'
)
back_to_login_butt.click()
self.delete_all_cookies()
2019-03-05 14:48:17 +01:00
return True
2019-01-14 12:11:17 +01:00
else:
self.logger.warning("Logout failed: %s", self.username)
2019-03-05 14:48:17 +01:00
return False
2019-01-14 12:11:17 +01:00
2019-01-14 15:41:08 +01:00
@property
def logged_in(self) -> bool:
2019-01-14 12:11:17 +01:00
"""
Check if already logged in. Checks if page is '/jsp/home.jsp'
and if login cookie is set (and not expired).
"""
2019-01-14 17:51:19 +01:00
_base_domain = ".".join(self.uri.netloc.split(".")[-2:])
cookies = [c["name"] for c in self.get_cookies() if _base_domain in c["domain"]]
2019-02-16 19:51:51 +01:00
self.logger.debug("Cookies: %s", cookies)
# _right_url = "/jsp/home.jsp" in self.current_url
_cookies = all(c in cookies for c in ("spcookie", "JSESSIONID", "ipclientid"))
return _cookies
2019-07-30 12:25:32 +02:00
def _switch_to_container(self) -> None:
try:
iframe = self.find_element_by_xpath(
'//iframe[contains(@id, "gsmd_container.jsp")]'
)
self.switch_to.frame(iframe)
except NoSuchElementException:
pass
def get_movements(self) -> T.List[T.Tuple[T.Text]]:
self._switch_to_container()
try:
result = [] # type: T.List[T.Tuple[T.Text]]
movements_table = self.find_elements_by_xpath(
'//div[contains(@class, "ushp_wterminale_container")]//div[@ps-name="Grid2"]//table/tbody/tr'
)
for row in movements_table:
data = row.text.strip().split("\n") # type: T.List[T.Text]
result.append(tuple(i.strip() for i in data if i.strip())) # noqa
except NoSuchElementException:
return []
return result
2019-01-28 18:15:22 +01:00
@property
def checked_in(self) -> bool:
"""
Check if the user is checked in already.
"""
2019-07-30 12:25:32 +02:00
if not self.logged_in:
return False
dates = self.get_movements()
if not dates:
return False
if dates[-1][0] == "Entrata":
return True
return False
2019-01-28 18:15:22 +01:00
@safely(RETRIES)
2019-01-14 17:51:19 +01:00
def check_in(self, force: bool = False) -> None:
"""
Click the check in button.
"""
2019-01-14 15:41:08 +01:00
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
2019-07-30 12:25:32 +02:00
if self.checked_in:
2019-01-14 15:41:08 +01:00
self.logger.warn("Already checked in!")
if not force:
return
2019-07-30 12:25:32 +02:00
self._switch_to_container()
2019-01-14 12:11:17 +01:00
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
2019-01-14 15:41:08 +01:00
enter_butt.click()
2019-01-28 18:15:22 +01:00
@safely(RETRIES)
2019-01-14 17:51:19 +01:00
def check_out(self, force: bool = False) -> None:
"""
Click the check out button.
"""
2019-01-14 15:41:08 +01:00
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
2019-07-30 12:25:32 +02:00
if not self.checked_in:
2019-01-14 15:41:08 +01:00
self.logger.warn("Not yet checked in!")
if not force:
return
2019-07-30 12:25:32 +02:00
self._switch_to_container()
2019-01-14 12:11:17 +01:00
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
2019-01-14 15:41:08 +01:00
exit_butt.click()
2019-01-14 12:11:17 +01:00
def __del__(self) -> None:
self.quit()