BotZ/bot_z/zdaemon.py

345 lines
10 KiB
Python

# -*- coding: utf-8 -*-
"""Daemon for local execution of commands."""
import functools
import json
import lockfile
import logging
import logging.handlers
import os
import queue
import signal
import sys
import threading
import time
import typing as T
from bot_z.operator import Operator
from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal
import daemon
from selenium.webdriver.common.keys import Keys
daemon_format = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(daemon_format)
console.addFilter(logging.Filter(__name__))
syslog = logging.handlers.SysLogHandler(address="/dev/log")
syslog.setFormatter(daemon_format)
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
logger.addHandler(console)
logger.addHandler(syslog)
logger.debug("Init at debug")
class StopDaemon(Exception):
"""Auxiliary exception to help stop the program in daemon mode."""
pass
def start_daemon(
name: str,
base_uri: str,
timeout: int = None,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
) -> Operator:
"""
Spawns the Operator object.
"""
optional_args: T.Dict[str, T.Any] = {}
if timeout is not None:
optional_args["timeout"] = timeout
if proxy is not None:
optional_args["proxy"] = proxy
optional_args["headless"] = headless
optional_args["debug"] = debug
logger.info("Started Operator.")
logger.debug(
"Operator parameters: "
"{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}",
base_uri,
name,
timeout,
proxy,
headless,
debug,
)
return Operator(base_uri=base_uri, name=name, **optional_args)
def operator_drop(op: Operator) -> None:
"""
Stops the Operator.
"""
logger.debug("Stopping Operator...")
logger.debug("Operator id: %s", id(op))
op.quit()
logger.info("Stopped Operator.")
def operator_reload(op: Operator) -> None:
"""
Reload the Operator using the same object.
"""
op_focus = op.switch_to.active_element
op_focus.send_keys(Keys.CONTROL + "Escape")
op.delete_all_cookies()
logger.info("Operator session cleaned.")
def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]:
"""
Get the current operator status.
"""
status = {} # type: T.Dict[str, str]
status["is_logged_in"] = str(op.logged_in)
status["is_checked_in"] = str(op.checked_in)
if debug:
status["interal_state"] = str(dir(op))
logger.info("Status: %s", status)
return status
def operator_login(op: Operator, username: str, password: str, force: bool) -> None:
"""
Instructs the operator to perform login.
"""
logger.debug("Performing login...")
op.login(username, password, force)
logger.info("Login done.")
def operator_logout(op: Operator, force: bool) -> None:
"""
Instructs the operator to perform logout.
"""
logger.debug("Performing logout...")
op.logout(force)
logger.info("Logout done.")
def operator_checkin(op: Operator, force: bool) -> None:
"""
Instructs the operator to perform the check in.
"""
logger.debug("Performing check in...")
op.check_in(force)
logger.info("Check in done.")
def operator_checkout(op: Operator, force: bool) -> None:
"""
Instructs the operator to perform the check out.
"""
logger.debug("Performing check out...")
op.check_out(force)
logger.info("Check out done.")
def daemon_process(
fifo_path: str,
working_dir: str,
umask: int,
pidfile: lockfile.FileLock,
base_uri: str,
timeout: int = None,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
foreground: bool = False,
) -> None:
"""
The daemon function.
"""
if debug:
logger.setLevel(logging.DEBUG)
lifo_path = os.path.join(working_dir, "botz_open_daemons.list")
if os.path.exists(lifo_path):
logger.warning("Lifo (%s) exists. Removing!", lifo_path)
os.remove(lifo_path)
starter = functools.partial(
start_daemon,
base_uri=base_uri,
timeout=timeout,
proxy=proxy,
headless=headless,
debug=debug,
)
def _stop_all() -> None:
logger.debug("SIGTERM...stopping all the clients...")
clients = PLifo.All(lifo_path)
with Fifo(fifo_path) as fifo:
for client in clients:
fifo.write(cmd_marshal(name=client, cmd="stop"))
def _reload_all() -> None:
logger.debug("SIGUSR1 received here.")
clients = PLifo.All(lifo_path)
with Fifo(fifo_path) as fifo:
for client in clients:
fifo.write(cmd_marshal(name=client, cmd="reload"))
def _list_all() -> None:
logger.debug("SIGUSR2 received here.")
clients = PLifo.All(lifo_path)
logger.info("Client list: %s", clients)
context = daemon.DaemonContext(
working_directory=working_dir, umask=umask, pidfile=pidfile
)
logger.debug("context defined")
context.signal_map = {
signal.SIGTERM: _stop_all,
signal.SIGHUP: "terminate",
signal.SIGUSR1: _reload_all,
signal.SIGUSR2: _list_all,
}
logger.debug("signal map defined")
cmd_map = {
"start": starter,
"stop": operator_drop,
"reload": operator_reload,
"status": operator_status,
"login": operator_login,
"logout": operator_logout,
"checkin": operator_checkin,
"checkout": operator_checkout,
}
logger.debug("command map defined")
if foreground:
logger.debug("Started in foreground")
try:
listen_commands(lifo_path, fifo_path, cmd_map)
except KeyboardInterrupt:
logger.info("Process killed!")
finally:
os.remove(lifo_path)
return
with context:
# This is necessary to trigger the daemon start.
time.sleep(1)
logger.info("Started in background")
try:
listen_commands(lifo_path, fifo_path, cmd_map)
except StopDaemon:
os.remove(lifo_path)
return
def listen_client(
name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable]
) -> None:
"""
The listen loop for an operator instance.
"""
op = cmd_map["start"](name)
logger.debug("Started operator id: %s", id(op))
while True:
cmd = chan.get()
if cmd["name"] != name:
log.warning("Command sent to the wrong instance: %s\n%s", name, cmd)
continue
if cmd["cmd"] == "start":
logger.error("Alredy started: %s", name)
continue
if cmd["cmd"] == "stop":
logger.debug("Received command: stop")
cmd_map["stop"](op)
return
elif cmd["cmd"] == "reload":
logger.debug("Received command: reload")
cmd_map["reload"](op)
elif cmd["cmd"] == "status":
logger.debug("Received command: status")
status = cmd_map["status"](op)
logger.debug("Status: %s", status)
with Fifo(cmd["rfifo_path"], "w") as rfifo:
rfifo.write(cmd_marshal(name=cmd["name"], cmd=status))
elif cmd["cmd"] == "login":
logger.debug("Received command: login")
cmd_map["login"](op, cmd["username"], cmd["password"], cmd["force"])
elif cmd["cmd"] == "logout":
logger.debug("Received command: logout")
cmd_map["logout"](op, cmd["force"])
elif cmd["cmd"] == "checkin":
logger.debug("Received command: checkin")
cmd_map["checkin"](op, cmd["force"])
elif cmd["cmd"] == "checkout":
logger.debug("Received command: checkout")
cmd_map["checkout"](op, cmd["force"])
def listen_commands(
lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]
) -> None:
"""
The main loop to listen incoming commands.
"""
client_chans = {}
while True:
with Fifo(fifopath) as fifo:
for cmd_str in fifo:
logger.debug("Command received in main loop: %s", cmd_str)
cmd = cmd_unmarshal(cmd_str)
logger.debug("Cmd unmarshalled: %s", cmd)
if cmd["cmd"] == "start":
if cmd["name"] not in client_chans:
client_chans[cmd["name"]] = queue.Queue(1)
logger.debug('Queue created for "%s".', cmd["name"])
if cmd["name"] in PLifo.All(lifopath):
logger.warning("Name %s is yet used. Not proceeding.", name)
continue
logger.debug(
'"%s" being pushed onto "%s"...', cmd["name"], lifopath
)
PLifo.Push(lifopath, cmd["name"])
logger.debug("Client %s has been newly created.", cmd["name"])
chan = client_chans.get(cmd["name"], None)
logger.debug("Starting new client in a thread...")
client = threading.Thread(
name=cmd["name"],
target=functools.partial(
listen_client, name=cmd["name"], chan=chan, cmd_map=cmd_map
),
)
client.start()
logger.debug('Client "%s" started.', cmd["name"])
continue
if cmd["cmd"] == "stop-daemon":
stop_all(client_chans)
logger.info("Daemon clients stopped. Exiting...")
raise StopDaemon
logger.debug("Opening client: %s", cmd["name"])
chan = client_chans.get(cmd["name"], None)
if chan is None:
logger.warning(
'No client found with name "%s". Skipping.', cmd["name"]
)
continue
chan.put(cmd)
def stop_all(client_chans: T.Dict[str, queue.Queue]) -> None:
"""
Send the stop command to all the clients.
"""
for name, chan in client_chans.items():
logger.debug('Stopping "%s"...', name)
chan.put({"name": name, "cmd": "stop"})
logger.info("Stopped %s.", name)