229 lines
7.6 KiB
Python
229 lines
7.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Operator is the main object that interacts with the foreign
|
|
service. It exposes methods to login, logout and check in and out.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
import os
|
|
import time
|
|
import typing as T
|
|
|
|
os.environ['PATH'] = os.environ['PATH'] + \
|
|
':' + os.path.join(os.path.abspath(os.path.curdir), 'bin')
|
|
|
|
from selenium import webdriver as wd
|
|
from selenium.common.exceptions import WebDriverException, NoSuchElementException
|
|
|
|
|
|
logging.basicConfig(
|
|
level=os.environ.get('BOTZ_LOGLEVEL', logging.INFO),
|
|
format='%(levelname)s: [%(name)s] -> %(message)s'
|
|
)
|
|
|
|
m_logger = logging.getLogger(__name__)
|
|
m_logger.debug("Init at debug")
|
|
|
|
def safely(f: T.Callable) -> T.Callable:
|
|
def _protection(self, *args, **kwargs):
|
|
try:
|
|
f(self, *args, **kwargs)
|
|
except WebDriverException as e:
|
|
self.logger.error("Something went wrong: %s", e)
|
|
|
|
return _protection
|
|
|
|
|
|
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
|
|
try:
|
|
driver.find_element_by_xpath(xpath)
|
|
return True
|
|
except NoSuchElementException:
|
|
return False
|
|
|
|
|
|
def is_present(driver: wd.Firefox,
|
|
xpath: str,
|
|
timeout: T.Optional[timedelta]=None
|
|
) -> bool:
|
|
"""
|
|
Helper function. If an element is present in the DOM tree,
|
|
returns true. False otherwise.
|
|
"""
|
|
if timeout is None:
|
|
return _is_present(driver, xpath)
|
|
|
|
_now = datetime.now()
|
|
_elapsed = timedelta(seconds=0)
|
|
while _elapsed < timeout:
|
|
m_logger.debug("Not yet present: %s", xpath)
|
|
if _is_present(driver, xpath):
|
|
m_logger.debug("Present: %s", xpath)
|
|
return True
|
|
time.sleep(0.5)
|
|
_elapsed = datetime.now() - _now
|
|
return False
|
|
|
|
|
|
class Operator(wd.Firefox):
|
|
def __init__(
|
|
self,
|
|
base_uri: str,
|
|
name: str = None,
|
|
timeout: int=20,
|
|
proxy: T.Optional[T.Tuple[str, int]] = None,
|
|
headless: bool = True,
|
|
debug: bool = False,
|
|
*args, **kwargs) -> None:
|
|
"""
|
|
Adds some configuration to Firefox.
|
|
"""
|
|
self.profile = wd.FirefoxProfile()
|
|
# Do not send telemetry
|
|
self.profile.set_preference('datareporting.policy.dataSubmissionEnabled', False)
|
|
self.profile.set_preference('datareporting.healthreport.service.enabled', False)
|
|
self.profile.set_preference('datareporting.healthreport.uploadEnabled', False)
|
|
|
|
self.opts = wd.firefox.options.Options()
|
|
self.opts.headless = headless
|
|
|
|
self.debug = debug
|
|
self.base_uri = base_uri
|
|
self.timeout = timedelta(seconds=timeout)
|
|
|
|
if proxy:
|
|
self.profile.set_preference('network.proxy.type', 1)
|
|
self.profile.set_preference('network.proxy.http', proxy[0])
|
|
self.profile.set_preference('network.proxy.http_port', proxy[1])
|
|
self.profile.set_preference('network.proxy.ssl', proxy[0])
|
|
self.profile.set_preference('network.proxy.ssl_port', proxy[1])
|
|
|
|
super().__init__(firefox_profile=self.profile, options=self.opts, *args, **kwargs)
|
|
|
|
self.z_name = name if name is not None else __name__
|
|
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
|
|
if debug:
|
|
self.logger.setLevel(logging.DEBUG)
|
|
self.logger.debug("Debug level")
|
|
self._logged_in = False
|
|
self._checked_in = False
|
|
|
|
@safely
|
|
def login(self, user: str, password: str, force: bool=False) -> None:
|
|
"""
|
|
Do the login and proceed.
|
|
"""
|
|
if self._logged_in:
|
|
self.logger.warning("Already logged in: %s", user)
|
|
if not force:
|
|
return
|
|
self.logger.info("Forcing login: %s", user)
|
|
# Retrieve login page
|
|
self.get(self.base_uri)
|
|
_correct_url = 'cpccchk' in self.current_url
|
|
_now = datetime.now()
|
|
_elapsed = timedelta(seconds=0)
|
|
while not _correct_url:
|
|
self.logger.debug("Not yet on login page: %s", self.current_url)
|
|
time.sleep(0.5)
|
|
_correct_url = 'cpccchk' in self.current_url
|
|
_elapsed = datetime.now() - _now
|
|
if _elapsed > self.timeout:
|
|
break
|
|
self.logger.debug("After login get: %s", self.current_url)
|
|
time.sleep(1)
|
|
# Username
|
|
user_form = self.find_element_by_name('m_cUserName')
|
|
# Password
|
|
pass_form = self.find_element_by_name('m_cPassword')
|
|
# Login button
|
|
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
|
|
# Compile and submit
|
|
user_form.send_keys(user)
|
|
pass_form.send_keys(password)
|
|
do_it = True
|
|
if self.debug and not force:
|
|
_do_it = input("Really do the login? [y/n] ").lower()
|
|
do_it = True if _do_it == "y" else False
|
|
if do_it:
|
|
login_butt.submit()
|
|
time.sleep(5)
|
|
self.logger.debug("Login result: %s", self.title)
|
|
if 'Routine window' in self.title:
|
|
self.logger.debug("Reloading...")
|
|
self.refresh()
|
|
self.switch_to.alert.accept()
|
|
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
|
|
self._logged_in = True
|
|
self.logger.info("Login success for user: %s", user)
|
|
else:
|
|
self.logger.error("Login failed: %s", user)
|
|
|
|
@safely
|
|
def logout(self, user: str, force: bool=False) -> None:
|
|
"""
|
|
Do the logout.
|
|
"""
|
|
if not self._logged_in:
|
|
self.logger.warning("Not yet logged in for user: %s", user)
|
|
if not force:
|
|
return
|
|
self.logger.info("Forcing logout: %s", user)
|
|
# Find the Profile menu and open it
|
|
profile_butt = self.find_element_by_xpath('//span[contains(@id, "imgNoPhotoLabel")]')
|
|
profile_butt.click()
|
|
time.sleep(1)
|
|
# Find the logout button
|
|
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
|
|
logout_butt.click()
|
|
if "jsp/usut_wapplogout_portlet.jsp" in self.current_url:
|
|
self.logger.info("User successfully logged out: %s", user)
|
|
else:
|
|
self.logger.warning("Logout failed: %s", user)
|
|
|
|
def logged_in(self):
|
|
"""
|
|
Check if already logged in. Checks if page is '/jsp/home.jsp'
|
|
and if login cookie is set (and not expired).
|
|
"""
|
|
pass
|
|
|
|
@safely
|
|
def check_in(self, force: bool=False) -> None:
|
|
"""
|
|
Click the check in button.
|
|
"""
|
|
# TODO: Check if the page displays you're already
|
|
# checked in.
|
|
if self._checked_in:
|
|
self.logger.warn("Already logged in!")
|
|
if not force:
|
|
return
|
|
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
|
|
enter_butt.submit()
|
|
# Click the check in button and change
|
|
# self._checked_in state in case of success
|
|
pass
|
|
|
|
@safely
|
|
def check_out(self, force: bool=False) -> None:
|
|
"""
|
|
Click the check out button.
|
|
"""
|
|
# TODO: Check if the page displays you're already
|
|
# checked out.
|
|
if not self._checked_in:
|
|
self.logger.warn("Not yet logged in!")
|
|
if not force:
|
|
return
|
|
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
|
|
exit_butt.submit()
|
|
# Click the check in button and change
|
|
# self._checked_in state in case of success
|
|
pass
|
|
|
|
def __del__(self) -> None:
|
|
self.quit()
|