# -*- coding: utf-8 -*- """Daemon for local execution of commands.""" import functools import json import lockfile import logging import logging.handlers import os import queue import signal import sys import threading import time import typing as T from bot_z.operator import Operator from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal import daemon from selenium.webdriver.common.keys import Keys daemon_format = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s") console = logging.StreamHandler(stream=sys.stdout) console.setFormatter(daemon_format) console.addFilter(logging.Filter(__name__)) syslog = logging.handlers.SysLogHandler(address="/dev/log") syslog.setFormatter(daemon_format) logger = logging.getLogger(__name__) logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO)) logger.addHandler(console) logger.addHandler(syslog) logger.debug("Init at debug") class StopDaemon(Exception): """Auxiliary exception to help stop the program in daemon mode.""" pass def start_daemon( name: str, base_uri: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, ) -> Operator: """ Spawns the Operator object. """ optional_args: T.Dict[str, T.Any] = {} if timeout is not None: optional_args["timeout"] = timeout if proxy is not None: optional_args["proxy"] = proxy optional_args["headless"] = headless optional_args["debug"] = debug logger.info("Started Operator.") logger.debug( "Operator parameters: " "{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}", base_uri, name, timeout, proxy, headless, debug, ) return Operator(base_uri=base_uri, name=name, **optional_args) def operator_drop(op: Operator) -> None: """ Stops the Operator. """ logger.debug("Stopping Operator...") logger.debug("Operator id: %s", id(op)) op.quit() logger.info("Stopped Operator.") def operator_reload(op: Operator) -> None: """ Reload the Operator using the same object. """ op_focus = op.switch_to.active_element op_focus.send_keys(Keys.CONTROL + "Escape") op.delete_all_cookies() logger.info("Operator session cleaned.") def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]: """ Get the current operator status. """ status = {} # type: T.Dict[str, str] status["is_logged_in"] = str(op.logged_in) status["is_checked_in"] = str(op.checked_in) if debug: status["interal_state"] = str(dir(op)) logger.info("Status: %s", status) return status def operator_login(op: Operator, username: str, password: str, force: bool) -> None: """ Instructs the operator to perform login. """ logger.debug("Performing login...") op.login(username, password, force) logger.info("Login done.") def operator_logout(op: Operator, force: bool) -> None: """ Instructs the operator to perform logout. """ logger.debug("Performing logout...") op.logout(force) logger.info("Logout done.") def operator_checkin(op: Operator, force: bool) -> None: """ Instructs the operator to perform the check in. """ logger.debug("Performing check in...") op.check_in(force) logger.info("Check in done.") def operator_checkout(op: Operator, force: bool) -> None: """ Instructs the operator to perform the check out. """ logger.debug("Performing check out...") op.check_out(force) logger.info("Check out done.") def daemon_process( fifo_path: str, working_dir: str, umask: int, pidfile: lockfile.FileLock, base_uri: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, debug: bool = False, foreground: bool = False, ) -> None: """ The daemon function. """ if debug: logger.setLevel(logging.DEBUG) lifo_path = os.path.join(working_dir, "botz_open_daemons.list") if os.path.exists(lifo_path): logger.warning("Lifo (%s) exists. Removing!", lifo_path) os.remove(lifo_path) starter = functools.partial( start_daemon, base_uri=base_uri, timeout=timeout, proxy=proxy, headless=headless, debug=debug, ) def _stop_all() -> None: logger.debug("SIGTERM...stopping all the clients...") clients = PLifo.All(lifo_path) with Fifo(fifo_path) as fifo: for client in clients: fifo.write(cmd_marshal(name=client, cmd="stop")) def _reload_all() -> None: logger.debug("SIGUSR1 received here.") clients = PLifo.All(lifo_path) with Fifo(fifo_path) as fifo: for client in clients: fifo.write(cmd_marshal(name=client, cmd="reload")) def _list_all() -> None: logger.debug("SIGUSR2 received here.") clients = PLifo.All(lifo_path) logger.info("Client list: %s", clients) context = daemon.DaemonContext( working_directory=working_dir, umask=umask, pidfile=pidfile ) logger.debug("context defined") context.signal_map = { signal.SIGTERM: _stop_all, signal.SIGHUP: "terminate", signal.SIGUSR1: _reload_all, signal.SIGUSR2: _list_all, } logger.debug("signal map defined") cmd_map = { "start": starter, "stop": operator_drop, "reload": operator_reload, "status": operator_status, "login": operator_login, "logout": operator_logout, "checkin": operator_checkin, "checkout": operator_checkout, } logger.debug("command map defined") if foreground: logger.debug("Started in foreground") try: listen_commands(lifo_path, fifo_path, cmd_map) except KeyboardInterrupt: logger.info("Process killed!") finally: os.remove(lifo_path) return with context: # This is necessary to trigger the daemon start. time.sleep(1) logger.info("Started in background") try: listen_commands(lifo_path, fifo_path, cmd_map) except StopDaemon: os.remove(lifo_path) return def listen_client( name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable] ) -> None: """ The listen loop for an operator instance. """ op = cmd_map["start"](name) logger.debug("Started operator id: %s", id(op)) while True: cmd = chan.get() if cmd["name"] != name: log.warning("Command sent to the wrong instance: %s\n%s", name, cmd) continue if cmd["cmd"] == "start": logger.error("Alredy started: %s", name) continue if cmd["cmd"] == "stop": logger.debug("Received command: stop") cmd_map["stop"](op) return elif cmd["cmd"] == "reload": logger.debug("Received command: reload") cmd_map["reload"](op) elif cmd["cmd"] == "status": logger.debug("Received command: status") status = cmd_map["status"](op) logger.debug("Status: %s", status) with Fifo(cmd["rfifo_path"], "w") as rfifo: rfifo.write(cmd_marshal(name=cmd["name"], cmd=status)) elif cmd["cmd"] == "login": logger.debug("Received command: login") cmd_map["login"](op, cmd["username"], cmd["password"], cmd["force"]) elif cmd["cmd"] == "logout": logger.debug("Received command: logout") cmd_map["logout"](op, cmd["force"]) elif cmd["cmd"] == "checkin": logger.debug("Received command: checkin") cmd_map["checkin"](op, cmd["force"]) elif cmd["cmd"] == "checkout": logger.debug("Received command: checkout") cmd_map["checkout"](op, cmd["force"]) def listen_commands( lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable] ) -> None: """ The main loop to listen incoming commands. """ client_chans = {} while True: with Fifo(fifopath) as fifo: for cmd_str in fifo: logger.debug("Command received in main loop: %s", cmd_str) cmd = cmd_unmarshal(cmd_str) logger.debug("Cmd unmarshalled: %s", cmd) if cmd["cmd"] == "start": if cmd["name"] not in client_chans: client_chans[cmd["name"]] = queue.Queue(1) logger.debug('Queue created for "%s".', cmd["name"]) if cmd["name"] in PLifo.All(lifopath): logger.warning("Name %s is yet used. Not proceeding.", name) continue logger.debug( '"%s" being pushed onto "%s"...', cmd["name"], lifopath ) PLifo.Push(lifopath, cmd["name"]) logger.debug("Client %s has been newly created.", cmd["name"]) chan = client_chans.get(cmd["name"], None) logger.debug("Starting new client in a thread...") client = threading.Thread( name=cmd["name"], target=functools.partial( listen_client, name=cmd["name"], chan=chan, cmd_map=cmd_map ), ) client.start() logger.debug('Client "%s" started.', cmd["name"]) continue if cmd["cmd"] == "stop-daemon": stop_all(client_chans) logger.info("Daemon clients stopped. Exiting...") raise StopDaemon logger.debug("Opening client: %s", cmd["name"]) chan = client_chans.get(cmd["name"], None) if chan is None: logger.warning( 'No client found with name "%s". Skipping.', cmd["name"] ) continue chan.put(cmd) def stop_all(client_chans: T.Dict[str, queue.Queue]) -> None: """ Send the stop command to all the clients. """ for name, chan in client_chans.items(): logger.debug('Stopping "%s"...', name) chan.put({"name": name, "cmd": "stop"}) logger.info("Stopped %s.", name)