306 lines
9.0 KiB
Python
306 lines
9.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
"""Daemon for local execution of commands."""
|
|
|
|
import functools
|
|
import json
|
|
import lockfile
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import queue
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import typing as T
|
|
|
|
from bot_z.bot_z import Operator
|
|
from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal
|
|
import daemon
|
|
from selenium.webdriver.common.keys import Keys
|
|
|
|
|
|
daemon_format = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
|
|
|
|
console = logging.StreamHandler(stream=sys.stdout)
|
|
console.setFormatter(daemon_format)
|
|
console.addFilter(logging.Filter(__name__))
|
|
|
|
syslog = logging.handlers.SysLogHandler(address="/dev/log")
|
|
syslog.setFormatter(daemon_format)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
|
|
logger.addHandler(console)
|
|
logger.addHandler(syslog)
|
|
logger.debug("Init at debug")
|
|
|
|
|
|
class StopDaemon(Exception):
|
|
"""Auxiliary exception to help stop the program in daemon mode."""
|
|
|
|
pass
|
|
|
|
|
|
def start_daemon(
|
|
name: str,
|
|
base_uri: str,
|
|
timeout: int = None,
|
|
proxy: T.Optional[T.Tuple[str, int]] = None,
|
|
headless: bool = True,
|
|
debug: bool = False,
|
|
) -> Operator:
|
|
"""
|
|
Spawns the Operator object.
|
|
"""
|
|
optional_args: T.Dict[str, T.Any] = {}
|
|
if timeout is not None:
|
|
optional_args["timeout"] = timeout
|
|
if proxy is not None:
|
|
optional_args["proxy"] = proxy
|
|
optional_args["headless"] = headless
|
|
optional_args["debug"] = debug
|
|
logger.info("Started Operator.")
|
|
logger.debug(
|
|
"Operator parameters: "
|
|
"{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}",
|
|
base_uri,
|
|
name,
|
|
timeout,
|
|
proxy,
|
|
headless,
|
|
debug,
|
|
)
|
|
return Operator(base_uri=base_uri, name=name, **optional_args)
|
|
|
|
|
|
def operator_drop(op: Operator) -> None:
|
|
"""
|
|
Stops the Operator.
|
|
"""
|
|
logger.debug("Stopping Operator...")
|
|
logger.debug("Operator id: %s", id(op))
|
|
op.quit()
|
|
logger.info("Stopped Operator.")
|
|
|
|
|
|
def operator_reload(op: Operator) -> None:
|
|
"""
|
|
Reload the Operator using the same object.
|
|
"""
|
|
op_focus = op.switch_to.active_element
|
|
op_focus.send_keys(Keys.CONTROL + "Escape")
|
|
op.delete_all_cookies()
|
|
logger.info("Operator session cleaned.")
|
|
|
|
|
|
def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]:
|
|
"""
|
|
Get the current operator status.
|
|
"""
|
|
status = {} # type: T.Dict[str, str]
|
|
status["is_logged_in"] = str(op.logged_in)
|
|
status["is_checked_in"] = str(op.checked_in)
|
|
if debug:
|
|
status["interal_state"] = str(dir(op))
|
|
logger.info("Status: %s", status)
|
|
|
|
return status
|
|
|
|
|
|
def operator_login(op: Operator, username: str, password: str, force: bool) -> None:
|
|
"""
|
|
Instructs the operator to perform login.
|
|
"""
|
|
logger.debug("Performing login.")
|
|
op.login(username, password, force)
|
|
logger.info("Login done.")
|
|
|
|
|
|
def operator_logout(op: Operator, force: bool) -> None:
|
|
"""
|
|
Instructs the operator to perform logout.
|
|
"""
|
|
logger.debug("Performing logout.")
|
|
op.logout(force)
|
|
logger.info("Logout done.")
|
|
|
|
|
|
def daemon_process(
|
|
fifo_path: str,
|
|
working_dir: str,
|
|
umask: int,
|
|
pidfile: lockfile.FileLock,
|
|
base_uri: str,
|
|
timeout: int = None,
|
|
proxy: T.Optional[T.Tuple[str, int]] = None,
|
|
headless: bool = True,
|
|
debug: bool = False,
|
|
foreground: bool = False,
|
|
) -> None:
|
|
"""
|
|
The daemon function.
|
|
"""
|
|
if debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
lifo_path = os.path.join(working_dir, "botz_open_daemons.list")
|
|
if os.path.exists(lifo_path):
|
|
logger.warning("Lifo (%s) exists. Removing!", lifo_path)
|
|
os.remove(lifo_path)
|
|
|
|
starter = functools.partial(
|
|
start_daemon,
|
|
base_uri=base_uri,
|
|
timeout=timeout,
|
|
proxy=proxy,
|
|
headless=headless,
|
|
debug=debug,
|
|
)
|
|
|
|
def stop_all() -> None:
|
|
pass
|
|
|
|
def reload_all() -> None:
|
|
pass
|
|
|
|
def list_all() -> None:
|
|
pass
|
|
|
|
context = daemon.DaemonContext(
|
|
working_directory=working_dir, umask=umask, pidfile=pidfile
|
|
)
|
|
logger.debug("context defined")
|
|
context.signal_map = {
|
|
signal.SIGTERM: stop_all,
|
|
signal.SIGHUP: "terminate",
|
|
signal.SIGUSR1: reload_all,
|
|
signal.SIGUSR2: list_all,
|
|
}
|
|
logger.debug("signal map defined")
|
|
cmd_map = {
|
|
"start": starter,
|
|
"stop": operator_drop,
|
|
"reload": operator_reload,
|
|
"status": operator_status,
|
|
"login": operator_login,
|
|
"logout": operator_logout,
|
|
}
|
|
logger.debug("command map defined")
|
|
|
|
if foreground:
|
|
logger.debug("Started in foreground")
|
|
try:
|
|
listen_commands(lifo_path, fifo_path, cmd_map)
|
|
except KeyboardInterrupt:
|
|
logger.info("Process killed!")
|
|
finally:
|
|
os.remove(lifo_path)
|
|
return
|
|
with context:
|
|
logger.debug("Started in background")
|
|
try:
|
|
listen_commands(lifo_path, fifo_path, cmd_map)
|
|
except StopDaemon:
|
|
os.remove(lifo_path)
|
|
return
|
|
|
|
|
|
def listen_client(
|
|
name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable]
|
|
) -> None:
|
|
"""
|
|
The listen loop for an operator instance.
|
|
"""
|
|
op = cmd_map["start"](name)
|
|
logger.debug("Started operator id: %s", id(op))
|
|
while True:
|
|
cmd = chan.get()
|
|
if cmd["name"] != name:
|
|
log.warning("Command sent to the wrong instance: %s\n%s", name, cmd)
|
|
continue
|
|
if cmd["cmd"] == "start":
|
|
logger.error("Alredy started: %s", name)
|
|
continue
|
|
if cmd["cmd"] == "stop":
|
|
logger.debug("Received command: stop")
|
|
cmd_map["stop"](op)
|
|
return
|
|
elif cmd["cmd"] == "reload":
|
|
logger.debug("Received command: reload")
|
|
cmd_map["reload"](op)
|
|
elif cmd["cmd"] == "status":
|
|
logger.debug("Received command: status")
|
|
status = cmd_map["status"](op)
|
|
logger.debug("Status: %s", status)
|
|
with Fifo(cmd["rfifo_path"], "w") as rfifo:
|
|
rfifo.write(cmd_marshal(name=cmd["name"], cmd=status))
|
|
elif cmd["cmd"] == "login":
|
|
logger.debug("Received command: login")
|
|
cmd_map["login"](op, cmd["username"], cmd["password"], cmd["force"])
|
|
elif cmd["cmd"] == "logout":
|
|
logger.debug("Received command: logout")
|
|
cmd_map["logout"](op, cmd["force"])
|
|
|
|
|
|
def listen_commands(
|
|
lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]
|
|
) -> None:
|
|
"""
|
|
The main loop to listen incoming commands.
|
|
"""
|
|
client_chans = {}
|
|
while True:
|
|
with Fifo(fifopath) as fifo:
|
|
for cmd_str in fifo:
|
|
logger.debug("Command received in main loop: %s", cmd_str)
|
|
cmd = cmd_unmarshal(cmd_str)
|
|
logger.debug("Cmd unmarshalled: %s", cmd)
|
|
if cmd["cmd"] == "start":
|
|
if cmd["name"] not in client_chans:
|
|
client_chans[cmd["name"]] = queue.Queue(1)
|
|
logger.debug('Queue created for "%s".', cmd["name"])
|
|
if cmd["name"] in PLifo.All(lifopath):
|
|
logger.warning("Name %s is yet used. Not proceeding.", name)
|
|
continue
|
|
logger.debug(
|
|
'"%s" being pushed onto "%s"...', cmd["name"], lifopath
|
|
)
|
|
PLifo.Push(lifopath, cmd["name"])
|
|
logger.debug("Client %s has been newly created.", cmd["name"])
|
|
chan = client_chans.get(cmd["name"], None)
|
|
logger.debug("Starting new client in a thread...")
|
|
client = threading.Thread(
|
|
name=cmd["name"],
|
|
target=functools.partial(
|
|
listen_client, name=cmd["name"], chan=chan, cmd_map=cmd_map
|
|
),
|
|
)
|
|
client.start()
|
|
logger.debug('Client "%s" started.', cmd["name"])
|
|
continue
|
|
|
|
if cmd["cmd"] == "stop-daemon":
|
|
stop_all(client_chans)
|
|
logger.info("Daemon clients stopped. Exiting...")
|
|
raise StopDaemon
|
|
|
|
logger.debug("Opening client: %s", cmd["name"])
|
|
chan = client_chans.get(cmd["name"], None)
|
|
if chan is None:
|
|
logger.warning(
|
|
'No client found with name "%s". Skipping.', cmd["name"]
|
|
)
|
|
continue
|
|
|
|
chan.put(cmd)
|
|
|
|
|
|
def stop_all(client_chans: T.Dict[str, queue.Queue]) -> None:
|
|
"""
|
|
Send the stop command to all the clients.
|
|
"""
|
|
for name, chan in client_chans.items():
|
|
logger.debug('Stopping "%s"...', name)
|
|
chan.put({"name": name, "cmd": "stop"})
|
|
logger.info("Stopped %s.", name)
|