# -*- coding: utf-8 -*- """Daemon for local execution of commands.""" import errno import functools import json import lockfile import logging import logging.handlers import os import signal import typing as T from bot_z.bot_z import Operator import daemon from selenium.webdriver.common.keys import Keys logging.basicConfig( level=os.environ.get("BOTZ_LOGLEVEL", logging.INFO), format="%(levelname)s: [%(name)s] -> %(message)s", ) syslog = logging.handlers.SysLogHandler() logger = logging.getLogger(__name__) logger.addHandler(syslog) logger.debug("Init at debug") def start_daemon( base_uri: str, name: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, ) -> Operator: """ Spawns the Operator object. """ optional_args = {} # type: T.Dict[str, T.Any] if timeout is not None: optional_args["timeout"] = timeout if proxy is not None: optional_args["proxy"] = proxy optional_args["headless"] = headless optional_args["debug"] = debug logger.info("Started Operator.") logger.debug( "Operator parameters: " "{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}", base_uri, name, timeout, proxy, headless, debug, ) return Operator(base_uri=base_uri, name=name, **optional_args) def drop_operator(op: Operator) -> None: """ Stops the Operator. """ logger.debug("Stopping Operator...") logger.debug("Operator id: %s", id(op)) op.quit() logger.info("Stopped Operator.") def reload_operator(op: Operator) -> None: """ Reload the Operator using the same object. """ op_focus = op.switch_to.active_element op_focus.send_keys(Keys.CONTROL + "Escape") op.delete_all_cookies() logger.info("Operator session cleaned.") def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]: """ Get the current operator status. """ status = {} # type: T.Dict[str, str] status["is_logged_in"] = str(op.logged_in) status["is_checked_in"] = str(op.checked_in) if debug: status["interal_state"] = str(dir(op)) logger.info("Status: %s", status) return status def daemon_process( fifo_path: str, working_dir: str, umask: int, pidfile: lockfile.FileLock, base_uri: str, name: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, foreground: bool = False, ) -> None: """ The daemon function. """ op = start_daemon(base_uri, name, timeout, proxy, headless, debug) logger.debug("Started operator id: %s", id(op)) context = daemon.DaemonContext( working_directory=working_dir, umask=umask, pidfile=pidfile ) _stop = functools.partial(drop_operator, op) _reload = functools.partial(reload_operator, op) _status = functools.partial(operator_status, op) context.signal_map = { signal.SIGTERM: _stop, signal.SIGHUP: "terminate", signal.SIGUSR1: _reload, signal.SIGUSR2: _status, } cmd_map = {"stop": _stop, "reload": _reload, "status": _status} if foreground: listen_commands(fifo_path, cmd_map) return with context: listen_commands(fifo_path, cmd_map) class Fifo(object): """ Iterator to continuously read the command fifo. Supports also write operations if opened with mode="r". """ def __init__(self, fifopath: str, mode: str = "r") -> None: try: os.mkfifo(fifopath) logger.info("Control pipe opened at: %s", fifopath) except OSError as err: if err.errno != errno.EEXIST: logger.critical("Could not open control pipe at: %s", fifopath) raise self.fh = open(fifopath, mode) def __iter__(self): # pragma: noqa return self def __next__(self) -> str: data = self.fh.read() # This is a never ending iterator. We should exit the # loop independently. return data def __del__(self) -> None: self.fh.close() def __enter__(self) -> T.TextIO: return self.fh def __exit__(self, *args) -> None: self.fh.close() def write(self, data) -> None: self.fh.write(data) self.fh.flush() def listen_commands(fifopath: str, cmd_map: T.Dict[str, T.Callable]) -> None: """ The main loop to listen incoming commands. """ while True: fifo = Fifo(fifopath, "r") for command in fifo: if command == "stop": logger.debug("Received command: %s", "stop") cmd_map["stop"]() return elif command == "reload": logger.debug("Received command: %s", "reload") cmd_map["reload"]() elif command.startswith("status"): logger.debug("Received command: %s", "status") status = cmd_map["status"]() logger.debug("Status: %s", status) jenc = json.JSONEncoder() with Fifo(command.split("|")[-1], "w") as rfifo: rfifo.write(jenc.encode(status))