# -*- coding: utf-8 -*- """ Operator is the main object that interacts with the foreign service. It exposes methods to login, logout and check in and out. """ from datetime import datetime, timedelta import logging import os import time import typing as T os.environ['PATH'] = os.environ['PATH'] + \ ':' + os.path.join(os.path.abspath(os.path.curdir), 'bin') from selenium import webdriver as wd from selenium.common.exceptions import WebDriverException, NoSuchElementException logging.basicConfig( level=os.environ.get('BOTZ_LOGLEVEL', logging.INFO), format='%(levelname)s: [%(name)s] -> %(message)s' ) m_logger = logging.getLogger(__name__) m_logger.debug("Init at debug") def safely(f: T.Callable) -> T.Callable: def _protection(self, *args, **kwargs): try: f(self, *args, **kwargs) except WebDriverException as e: self.logger.error("Something went wrong: %s", e) return _protection def _is_present(driver: wd.Firefox, xpath: str) -> bool: try: driver.find_element_by_xpath(xpath) return True except NoSuchElementException: return False def is_present(driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta]=None ) -> bool: """ Helper function. If an element is present in the DOM tree, returns true. False otherwise. """ if timeout is None: return _is_present(driver, xpath) _now = datetime.now() _elapsed = timedelta(seconds=0) while _elapsed < timeout: m_logger.debug("Not yet present: %s", xpath) if _is_present(driver, xpath): m_logger.debug("Present: %s", xpath) return True time.sleep(0.5) _elapsed = datetime.now() - _now return False class Operator(wd.Firefox): def __init__( self, base_uri: str, name: str = None, timeout: int=20, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, *args, **kwargs) -> None: """ Adds some configuration to Firefox. """ self.profile = wd.FirefoxProfile() # Do not send telemetry self.profile.set_preference('datareporting.policy.dataSubmissionEnabled', False) self.profile.set_preference('datareporting.healthreport.service.enabled', False) self.profile.set_preference('datareporting.healthreport.uploadEnabled', False) self.opts = wd.firefox.options.Options() self.opts.headless = headless self.debug = debug self.base_uri = base_uri self.timeout = timedelta(seconds=timeout) if proxy: self.profile.set_preference('network.proxy.type', 1) self.profile.set_preference('network.proxy.http', proxy[0]) self.profile.set_preference('network.proxy.http_port', proxy[1]) self.profile.set_preference('network.proxy.ssl', proxy[0]) self.profile.set_preference('network.proxy.ssl_port', proxy[1]) super().__init__(firefox_profile=self.profile, options=self.opts, *args, **kwargs) self.z_name = name if name is not None else __name__ self.logger = logging.getLogger("{}.{}".format(__name__, self.name)) if debug: self.logger.setLevel(logging.DEBUG) self.logger.debug("Debug level") self._logged_in = False self._checked_in = False @safely def login(self, user: str, password: str, force: bool=False) -> None: """ Do the login and proceed. """ if self._logged_in: self.logger.warning("Already logged in: %s", user) if not force: return self.logger.info("Forcing login: %s", user) # Retrieve login page self.get(self.base_uri) _correct_url = 'cpccchk' in self.current_url _now = datetime.now() _elapsed = timedelta(seconds=0) while not _correct_url: self.logger.debug("Not yet on login page: %s", self.current_url) time.sleep(0.5) _correct_url = 'cpccchk' in self.current_url _elapsed = datetime.now() - _now if _elapsed > self.timeout: break self.logger.debug("After login get: %s", self.current_url) time.sleep(1) # Username user_form = self.find_element_by_name('m_cUserName') # Password pass_form = self.find_element_by_name('m_cPassword') # Login button login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]') # Compile and submit user_form.send_keys(user) pass_form.send_keys(password) do_it = True if self.debug and not force: _do_it = input("Really do the login? [y/n] ").lower() do_it = True if _do_it == "y" else False if do_it: login_butt.submit() time.sleep(5) self.logger.debug("Login result: %s", self.title) if 'Routine window' in self.title: self.logger.debug("Reloading...") self.refresh() self.switch_to.alert.accept() if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout): self._logged_in = True self.logger.info("Login success for user: %s", user) else: self.logger.error("Login failed: %s", user) @safely def logout(self, user: str, force: bool=False) -> None: """ Do the logout. """ if not self._logged_in: self.logger.warning("Not yet logged in for user: %s", user) if not force: return self.logger.info("Forcing logout: %s", user) # Find the Profile menu and open it profile_butt = self.find_element_by_xpath('//span[contains(@id, "imgNoPhotoLabel")]') profile_butt.click() time.sleep(1) # Find the logout button logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]') logout_butt.click() if "jsp/usut_wapplogout_portlet.jsp" in self.current_url: self.logger.info("User successfully logged out: %s", user) else: self.logger.warning("Logout failed: %s", user) def logged_in(self): """ Check if already logged in. Checks if page is '/jsp/home.jsp' and if login cookie is set (and not expired). """ pass @safely def check_in(self, force: bool=False) -> None: """ Click the check in button. """ # TODO: Check if the page displays you're already # checked in. if self._checked_in: self.logger.warn("Already logged in!") if not force: return enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]') enter_butt.submit() # Click the check in button and change # self._checked_in state in case of success pass @safely def check_out(self, force: bool=False) -> None: """ Click the check out button. """ # TODO: Check if the page displays you're already # checked out. if not self._checked_in: self.logger.warn("Not yet logged in!") if not force: return exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]') exit_butt.submit() # Click the check in button and change # self._checked_in state in case of success pass def __del__(self) -> None: self.quit()