# -*- coding: utf-8 -*- """ Operator is the main object that interacts with the foreign service. It exposes methods to login, logout and check in and out. """ from datetime import datetime, timedelta import logging import os import pkg_resources import shutil import sys import time import typing as T from urllib.parse import urlparse geckoexe = shutil.which("geckodriver") if geckoexe is None: local_path = pkg_resources.resource_filename(__name__, "bin") try: os.stat(os.path.join(local_path, "geckodriver")) os.environ["PATH"] = os.environ["PATH"] + ":" + local_path except FileNotFoundError: print("Missing geckodriver executable in path", file=sys.stderr) raise from selenium import webdriver as wd # noqa from selenium.common.exceptions import ( WebDriverException, NoSuchElementException, ) # noqa # TODO: read it from configuration. RETRIES = 3 log_fmt = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s") console = logging.StreamHandler(stream=sys.stdout) console.setFormatter(log_fmt) logger = logging.getLogger(__name__) logger.addHandler(console) logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO)) logger.debug("Init at debug") def safely(retries: int = 0) -> T.Callable: retr = retries def _safely(f: T.Callable) -> T.Callable: ret = retr def _protection(self, *args, **kwargs): r = ret done = False while r > 0: try: f(self, *args, **kwargs) logger.debug("Success executing %s", f.__name__) self.switch_to.default_content() return except WebDriverException as e: self.logger.error( "Something went wrong: %s [tentative #%s]", e, ret - r ) r -= 1 return _protection return _safely def _is_present(driver: wd.Firefox, xpath: str) -> bool: try: driver.find_element_by_xpath(xpath) return True except NoSuchElementException: return False def is_present( driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta] = None ) -> bool: """ Helper function. If an element is present in the DOM tree, returns true. False otherwise. """ if timeout is None: return _is_present(driver, xpath) _now = datetime.now() _elapsed = timedelta(seconds=0) while _elapsed < timeout: logger.debug("Not yet present: %s", xpath) if _is_present(driver, xpath): logger.debug("Present: %s", xpath) return True time.sleep(0.5) _elapsed = datetime.now() - _now return False class Operator(wd.Firefox): def __init__( self, base_uri: str, name: str = None, timeout: int = 20, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, *args, **kwargs ) -> None: """ Adds some configuration to Firefox. """ self.retries = RETRIES self.profile = wd.FirefoxProfile() # Do not send telemetry self.profile.set_preference("datareporting.policy.dataSubmissionEnabled", False) self.profile.set_preference("datareporting.healthreport.service.enabled", False) self.profile.set_preference("datareporting.healthreport.uploadEnabled", False) self.opts = wd.firefox.options.Options() self.opts.headless = headless self.debug = debug self.base_uri = base_uri self.uri = urlparse(base_uri) self.timeout = timedelta(seconds=timeout) if proxy: self.profile.set_preference("network.proxy.type", 1) self.profile.set_preference("network.proxy.http", proxy[0]) self.profile.set_preference("network.proxy.http_port", proxy[1]) self.profile.set_preference("network.proxy.ssl", proxy[0]) self.profile.set_preference("network.proxy.ssl_port", proxy[1]) super().__init__( firefox_profile=self.profile, options=self.opts, *args, **kwargs ) self.fullscreen_window() self.z_name = name if name is not None else __name__ self.logger = logging.getLogger("{}.{}".format(__name__, self.name)) if debug: self.logger.setLevel(logging.DEBUG) self.logger.debug("Debug level") self._logged_in = False self._checked_in = False self.username = None # Clean preceding session self.delete_all_cookies() @safely(RETRIES) def login(self, user: str, password: str, force: bool = False) -> None: """ Do the login and proceed. """ if self._logged_in: self.logger.warning("Already logged in: %s", user) if not force: return self.logger.info("Forcing login: %s", user) # Retrieve login page self.get(self.base_uri) _correct_url = "cpccchk" in self.current_url _now = datetime.now() _elapsed = timedelta(seconds=0) while not _correct_url: self.logger.debug("Not yet on login page: %s", self.current_url) time.sleep(0.5) _correct_url = "cpccchk" in self.current_url _elapsed = datetime.now() - _now if _elapsed > self.timeout: break self.logger.debug("After login get: %s", self.current_url) time.sleep(1) # Username user_form = self.find_element_by_name("m_cUserName") # Password pass_form = self.find_element_by_name("m_cPassword") # Login button login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]') # Compile and submit user_form.send_keys(user) pass_form.send_keys(password) do_it = True if self.debug and not force: _do_it = input("Really do the login? [y/n] ").lower() do_it = True if _do_it == "y" else False if do_it: login_butt.submit() time.sleep(5) self.logger.debug("Login result: %s", self.title) if "Routine window" in self.title: self.logger.debug("Reloading...") self.refresh() self.switch_to.alert.accept() if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout): self._logged_in = True self.logger.info("Login success for user: %s", user) else: self.logger.error("Login failed: %s", user) self.username = user @safely(RETRIES) def logout(self, force: bool = False) -> None: """ Do the logout. """ if not self._logged_in: self.logger.warning("Not yet logged in") if not force: return self.logger.info("Forcing logout: %s", self.username) # Find the Profile menu and open it profile_butt = self.find_element_by_xpath( '//span[contains(@id, "imgNoPhotoLabel")]' ) profile_butt.click() time.sleep(1) # Find the logout button logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]') logout_butt.click() if "jsp/usut_wapplogout_portlet.jsp" in self.current_url: self.logger.info("User successfully logged out: %s", self.username) self.username = None else: self.logger.warning("Logout failed: %s", self.username) @property def logged_in(self) -> bool: """ Check if already logged in. Checks if page is '/jsp/home.jsp' and if login cookie is set (and not expired). """ _base_domain = ".".join(self.uri.netloc.split(".")[-2:]) cookies = [c["name"] for c in self.get_cookies() if _base_domain in c["domain"]] _right_url = "/jsp/home.jsp" in self.current_url _cookies = "dtLatC" in cookies return _right_url and _cookies @property def checked_in(self) -> bool: """ Check if the user is checked in already. """ return self._checked_in @safely(RETRIES) def check_in(self, force: bool = False) -> None: """ Click the check in button. """ if not force and not self.logged_in: self.logger.warning("Not logged in!") return if self._checked_in: self.logger.warn("Already checked in!") if not force: return iframe = self.find_element_by_xpath( '//iframe[contains(@id, "gsmd_container.jsp")]' ) self.switch_to.frame(iframe) enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]') enter_butt.click() # Click the check in button and change # self._checked_in state in case of success self._checked_in = True @safely(RETRIES) def check_out(self, force: bool = False) -> None: """ Click the check out button. """ if not force and not self.logged_in: self.logger.warning("Not logged in!") return if not self._checked_in: self.logger.warn("Not yet checked in!") if not force: return iframe = self.find_element_by_xpath( '//iframe[contains(@id, "gsmd_container.jsp")]' ) self.switch_to.frame(iframe) exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]') exit_butt.click() # Click the check in button and change # self._checked_in state in case of success self._checked_in = False def __del__(self) -> None: self.quit()