196 lines
5.2 KiB
Python
196 lines
5.2 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
"""Daemon for local execution of commands."""
|
||
|
|
||
|
import errno
|
||
|
import functools
|
||
|
import json
|
||
|
import lockfile
|
||
|
import logging
|
||
|
import logging.handlers
|
||
|
import os
|
||
|
import signal
|
||
|
import typing as T
|
||
|
|
||
|
from bot_z.bot_z import Operator
|
||
|
import daemon
|
||
|
from selenium.webdriver.common.keys import Keys
|
||
|
|
||
|
|
||
|
logging.basicConfig(
|
||
|
level=os.environ.get("BOTZ_LOGLEVEL", logging.INFO),
|
||
|
format="%(levelname)s: [%(name)s] -> %(message)s",
|
||
|
)
|
||
|
|
||
|
syslog = logging.handlers.SysLogHandler()
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
logger.addHandler(syslog)
|
||
|
logger.debug("Init at debug")
|
||
|
|
||
|
|
||
|
def start_daemon(
|
||
|
base_uri: str,
|
||
|
name: str,
|
||
|
timeout: int = None,
|
||
|
proxy: T.Optional[T.Tuple[str, int]] = None,
|
||
|
headless: bool = True,
|
||
|
debug: bool = False,
|
||
|
) -> Operator:
|
||
|
"""
|
||
|
Spawns the Operator object.
|
||
|
"""
|
||
|
optional_args = {} # type: T.Dict[str, T.Any]
|
||
|
if timeout is not None:
|
||
|
optional_args["timeout"] = timeout
|
||
|
if proxy is not None:
|
||
|
optional_args["proxy"] = proxy
|
||
|
optional_args["headless"] = headless
|
||
|
optional_args["debug"] = debug
|
||
|
logger.info("Started Operator.")
|
||
|
logger.debug(
|
||
|
"Operator parameters: "
|
||
|
"{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}",
|
||
|
base_uri,
|
||
|
name,
|
||
|
timeout,
|
||
|
proxy,
|
||
|
headless,
|
||
|
debug,
|
||
|
)
|
||
|
return Operator(base_uri=base_uri, name=name, **optional_args)
|
||
|
|
||
|
|
||
|
def drop_operator(op: Operator) -> None:
|
||
|
"""
|
||
|
Stops the Operator.
|
||
|
"""
|
||
|
logger.debug("Stopping Operator...")
|
||
|
logger.debug("Operator id: %s", id(op))
|
||
|
op.quit()
|
||
|
logger.info("Stopped Operator.")
|
||
|
|
||
|
|
||
|
def reload_operator(op: Operator) -> None:
|
||
|
"""
|
||
|
Reload the Operator using the same object.
|
||
|
"""
|
||
|
op_focus = op.switch_to.active_element
|
||
|
op_focus.send_keys(Keys.CONTROL + "Escape")
|
||
|
op.delete_all_cookies()
|
||
|
logger.info("Operator session cleaned.")
|
||
|
|
||
|
|
||
|
def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]:
|
||
|
"""
|
||
|
Get the current operator status.
|
||
|
"""
|
||
|
status = {} # type: T.Dict[str, str]
|
||
|
status["is_logged_in"] = str(op.logged_in)
|
||
|
status["is_checked_in"] = str(op.checked_in)
|
||
|
if debug:
|
||
|
status["interal_state"] = str(dir(op))
|
||
|
logger.info("Status: %s", status)
|
||
|
|
||
|
return status
|
||
|
|
||
|
|
||
|
def daemon_process(
|
||
|
fifo_path: str,
|
||
|
working_dir: str,
|
||
|
umask: int,
|
||
|
pidfile: lockfile.FileLock,
|
||
|
base_uri: str,
|
||
|
name: str,
|
||
|
timeout: int = None,
|
||
|
proxy: T.Optional[T.Tuple[str, int]] = None,
|
||
|
headless: bool = True,
|
||
|
debug: bool = False,
|
||
|
foreground: bool = False,
|
||
|
) -> None:
|
||
|
"""
|
||
|
The daemon function.
|
||
|
"""
|
||
|
op = start_daemon(base_uri, name, timeout, proxy, headless, debug)
|
||
|
logger.debug("Started operator id: %s", id(op))
|
||
|
context = daemon.DaemonContext(
|
||
|
working_directory=working_dir, umask=umask, pidfile=pidfile
|
||
|
)
|
||
|
_stop = functools.partial(drop_operator, op)
|
||
|
_reload = functools.partial(reload_operator, op)
|
||
|
_status = functools.partial(operator_status, op)
|
||
|
context.signal_map = {
|
||
|
signal.SIGTERM: _stop,
|
||
|
signal.SIGHUP: "terminate",
|
||
|
signal.SIGUSR1: _reload,
|
||
|
signal.SIGUSR2: _status,
|
||
|
}
|
||
|
cmd_map = {"stop": _stop, "reload": _reload, "status": _status}
|
||
|
|
||
|
if foreground:
|
||
|
listen_commands(fifo_path, cmd_map)
|
||
|
return
|
||
|
with context:
|
||
|
listen_commands(fifo_path, cmd_map)
|
||
|
|
||
|
|
||
|
class Fifo(object):
|
||
|
"""
|
||
|
Iterator to continuously read the command fifo.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, fifopath: str, mode: str = "r") -> None:
|
||
|
try:
|
||
|
os.mkfifo(fifopath)
|
||
|
logger.info("Control pipe opened at: %s", fifopath)
|
||
|
except OSError as err:
|
||
|
if err.errno != errno.EEXIST:
|
||
|
logger.critical("Could not open control pipe at: %s", fifopath)
|
||
|
raise
|
||
|
self.fh = open(fifopath, mode)
|
||
|
|
||
|
def __iter__(self): # pragma: noqa
|
||
|
return self
|
||
|
|
||
|
def __next__(self) -> str:
|
||
|
data = self.fh.read()
|
||
|
# This is a never ending iterator. We should exit the
|
||
|
# loop independently.
|
||
|
return data
|
||
|
|
||
|
def __del__(self) -> None:
|
||
|
self.fh.close()
|
||
|
|
||
|
def __enter__(self) -> T.TextIO:
|
||
|
return self.fh
|
||
|
|
||
|
def __exit__(self, *args) -> None:
|
||
|
self.fh.close()
|
||
|
|
||
|
def write(self, data) -> None:
|
||
|
self.fh.write(data)
|
||
|
self.fh.flush()
|
||
|
|
||
|
|
||
|
def listen_commands(fifopath: str, cmd_map: T.Dict[str, T.Callable]) -> None:
|
||
|
"""
|
||
|
The main loop to listen incoming commands.
|
||
|
"""
|
||
|
while True:
|
||
|
fifo = Fifo(fifopath, "r")
|
||
|
for command in fifo:
|
||
|
if command == "stop":
|
||
|
logger.debug("Received command: %s", "stop")
|
||
|
cmd_map["stop"]()
|
||
|
return
|
||
|
elif command == "reload":
|
||
|
logger.debug("Received command: %s", "reload")
|
||
|
cmd_map["reload"]()
|
||
|
elif command.startswith("status"):
|
||
|
logger.debug("Received command: %s", "status")
|
||
|
status = cmd_map["status"]()
|
||
|
logger.debug("Status: %s", status)
|
||
|
jenc = json.JSONEncoder()
|
||
|
with Fifo(command.split("|")[-1], "w") as rfifo:
|
||
|
rfifo.write(jenc.encode(status))
|