# -*- coding: utf-8 -*- """Daemon for local execution of commands.""" import functools import json import lockfile import logging import logging.handlers import os import queue import signal import sys import threading import typing as T from bot_z.bot_z import Operator from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal import daemon from selenium.webdriver.common.keys import Keys daemon_format = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s") console = logging.StreamHandler(stream=sys.stdout) console.setFormatter(daemon_format) console.addFilter(logging.Filter(__name__)) syslog = logging.handlers.SysLogHandler(address="/dev/log") syslog.setFormatter(daemon_format) logger = logging.getLogger(__name__) logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO)) logger.addHandler(console) logger.addHandler(syslog) logger.debug("Init at debug") class StopDaemon(Exception): """Auxiliary exception to help stop the program in daemon mode.""" pass def start_daemon( name: str, base_uri: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, ) -> Operator: """ Spawns the Operator object. """ optional_args: T.Dict[str, T.Any] = {} if timeout is not None: optional_args["timeout"] = timeout if proxy is not None: optional_args["proxy"] = proxy optional_args["headless"] = headless optional_args["debug"] = debug logger.info("Started Operator.") logger.debug( "Operator parameters: " "{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}", base_uri, name, timeout, proxy, headless, debug, ) return Operator(base_uri=base_uri, name=name, **optional_args) def operator_drop(op: Operator) -> None: """ Stops the Operator. """ logger.debug("Stopping Operator...") logger.debug("Operator id: %s", id(op)) op.quit() logger.info("Stopped Operator.") def operator_reload(op: Operator) -> None: """ Reload the Operator using the same object. """ op_focus = op.switch_to.active_element op_focus.send_keys(Keys.CONTROL + "Escape") op.delete_all_cookies() logger.info("Operator session cleaned.") def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]: """ Get the current operator status. """ status = {} # type: T.Dict[str, str] status["is_logged_in"] = str(op.logged_in) status["is_checked_in"] = str(op.checked_in) if debug: status["interal_state"] = str(dir(op)) logger.info("Status: %s", status) return status def operator_login(op: Operator, username: str, password: str, force: bool) -> None: """ Instructs the operator to perform login. """ logger.debug("Performing login.") op.login(username, password, force) logger.info("Login done.") def operator_logout(op: Operator, username: str, force: bool) -> None: """ Instructs the operator to perform logout. """ logger.debug("Performing logout.") op.logout(username, password, force) logger.info("Logout done.") def daemon_process( fifo_path: str, working_dir: str, umask: int, pidfile: lockfile.FileLock, base_uri: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, foreground: bool = False, ) -> None: """ The daemon function. """ if debug: logger.setLevel(logging.DEBUG) lifo_path = os.path.join(working_dir, "botz_open_daemons.list") if os.path.exists(lifo_path): logger.warning("Lifo (%s) exists. Removing!", lifo_path) os.remove(lifo_path) starter = functools.partial( start_daemon, base_uri=base_uri, timeout=timeout, proxy=proxy, headless=headless, debug=debug, ) def stop_all() -> None: pass def reload_all() -> None: pass def list_all() -> None: pass context = daemon.DaemonContext( working_directory=working_dir, umask=umask, pidfile=pidfile ) logger.debug("context defined") context.signal_map = { signal.SIGTERM: stop_all, signal.SIGHUP: "terminate", signal.SIGUSR1: reload_all, signal.SIGUSR2: list_all, } logger.debug("signal map defined") cmd_map = { "start": starter, "stop": operator_drop, "reload": operator_reload, "status": operator_status, "login": operator_login, "logout": operator_logout, } logger.debug("command map defined") if foreground: logger.debug("Started in foreground") try: listen_commands(lifo_path, fifo_path, cmd_map) except KeyboardInterrupt: logger.info("Process killed!") finally: os.remove(lifo_path) return with context: logger.debug("Started in background") try: listen_commands(lifo_path, fifo_path, cmd_map) except StopDaemon: os.remove(lifo_path) return def listen_client( name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable] ) -> None: """ The listen loop for an operator instance. """ op = cmd_map["start"](name) logger.debug("Started operator id: %s", id(op)) while True: cmd = chan.get() if cmd["name"] != name: log.warning("Command sent to the wrong instance: %s\n%s", name, cmd) continue if cmd["cmd"] == "start": logger.error("Alredy started: %s", name) continue if cmd["cmd"] == "stop": logger.debug("Received command: stop") cmd_map["stop"](op) return elif cmd["cmd"] == "reload": logger.debug("Received command: reload") cmd_map["reload"](op) elif cmd["cmd"] == "status": logger.debug("Received command: status") status = cmd_map["status"](op) logger.debug("Status: %s", status) with Fifo(cmd["rfifo_path"], "w") as rfifo: rfifo.write(cmd_marshal(name=cmd["name"], cmd=status)) def listen_commands( lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable] ) -> None: """ The main loop to listen incoming commands. """ client_chans = {} while True: with Fifo(fifopath) as fifo: for cmd_str in fifo: logger.debug("Command received in main loop: %s", cmd_str) cmd = cmd_unmarshal(cmd_str) logger.debug("Cmd unmarshalled: %s", cmd) if cmd["cmd"] == "start": if cmd["name"] not in client_chans: client_chans[cmd["name"]] = queue.Queue(1) logger.debug('Queue created for "%s".', cmd["name"]) if cmd["name"] in PLifo.All(lifopath): logger.warning("Name %s is yet used. Not proceeding.", name) continue logger.debug( '"%s" being pushed onto "%s"...', cmd["name"], lifopath ) PLifo.Push(lifopath, cmd["name"]) logger.debug("Client %s has been newly created.", cmd["name"]) chan = client_chans.get(cmd["name"], None) logger.debug("Starting new client in a thread...") client = threading.Thread( name=cmd["name"], target=functools.partial( listen_client, name=cmd["name"], chan=chan, cmd_map=cmd_map ), ) client.start() logger.debug('Client "%s" started.', cmd["name"]) continue if cmd["cmd"] == "stop-daemon": stop_all(client_chans) logger.info("Daemon clients stopped. Exiting...") raise StopDaemon logger.debug("Opening client: %s", cmd["name"]) chan = client_chans.get(cmd["name"], None) if chan is None: logger.warning( 'No client found with name "%s". Skipping.', cmd["name"] ) continue chan.put(cmd) def stop_all(client_chans: T.Dict[str, queue.Queue]) -> None: """ Send the stop command to all the clients. """ for name, chan in client_chans.items(): logger.debug('Stopping "%s"...', name) chan.put({"name": name, "cmd": "stop"}) logger.info("Stopped %s.", name)