Refactoring the daemon: one daemon, many clients.

Leonardo Barcaroli 2019-02-05 09:07:03 +01:00
parent 40b5d3f09e
commit 632d6a5460
3 changed files with 225 additions and 58 deletions

View File

@ -6,21 +6,31 @@ import errno
import logging import logging
import os import os
import sys import sys
import time
import typing as T import typing as T
import click import click
import lockfile import lockfile
from bot_z.zdaemon import daemon_process, Fifo from bot_z.zdaemon import daemon_process
from bot_z.utils import Fifo, PLifo, cmd_marshal
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
sh = logging.StreamHandler(stream=sys.stdout) sh = logging.StreamHandler(stream=sys.stdout)
sh.setFormatter(logging.Formatter("%(message)s")) sh.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(sh) logger.addHandler(sh)
def _check_name(lifo_path: str, name: T.Optional[str]) -> str:
with PLifo(lifo_path) as lifo:
if name is None:
return lifo.last()
if name not in lifo:
logger.error("Daemon not present. Exiting.")
sys.exit(2)
return name
@click.group() @click.group()
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable debug mode.") @click.option("-d", "--debug", is_flag=True, default=False, help="Enable debug mode.")
@click.option( @click.option(
@ -58,9 +68,10 @@ def cli(
ctx.obj["verbose"] = verbose ctx.obj["verbose"] = verbose
ctx.obj["fifo"] = fifo ctx.obj["fifo"] = fifo
ctx.obj["workdir"] = workdir ctx.obj["workdir"] = workdir
ctx.obj["lifo"] = os.path.join(workdir, "botz_open_daemons.list")
@cli.command("start") @cli.command("start-daemon")
@click.option( @click.option(
"-t", "--timeout", type=click.INT, default=20, help="Browser requests timeout." "-t", "--timeout", type=click.INT, default=20, help="Browser requests timeout."
) )
@ -71,13 +82,6 @@ def cli(
default=0o002, default=0o002,
help="The umask of the control fifo.", help="The umask of the control fifo.",
) )
@click.option(
"-n",
"--name",
type=click.STRING,
default="default_instance",
help="The daemon instance name.",
)
@click.option( @click.option(
"-p", "-p",
"--proxy", "--proxy",
@ -93,10 +97,9 @@ def cli(
) )
@click.argument("baseuri", type=click.STRING) @click.argument("baseuri", type=click.STRING)
@click.pass_context @click.pass_context
def start_command( def start_daemon_command(
ctx: click.Context, ctx: click.Context,
baseuri: str, baseuri: str,
name: str,
umask: int, umask: int,
timeout: int, timeout: int,
proxy: T.Optional[str] = None, proxy: T.Optional[str] = None,
@ -117,7 +120,6 @@ def start_command(
umask=umask, umask=umask,
pidfile=lf, pidfile=lf,
base_uri=baseuri, base_uri=baseuri,
name=name,
timeout=timeout, timeout=timeout,
proxy=proxy_tuple, proxy=proxy_tuple,
headless=not ctx.obj["debug"], headless=not ctx.obj["debug"],
@ -127,38 +129,86 @@ def start_command(
logger.info("Daemon started.") logger.info("Daemon started.")
@cli.command("stop") @cli.command("list")
@click.option("-s", "--status", is_flag=True, default=False, help="Show also the status of each client.")
@click.pass_context @click.pass_context
def stop_command(ctx: click.Context): def list_command(ctx: click.Context, status: bool) -> None:
"""
Shows the clients attached
"""
logger.debug("Invoked the \"list\" command.")
with PLifo(ctx.obj["lifo"]) as lifo:
if len(lifo) == 0:
logger.info("No clients.")
return
for client in lifo.all():
logger.info(client)
if status:
_status_command(ctx, client)
@cli.command("start")
@click.option(
"-n",
"--name",
type=click.STRING,
prompt="Give the instance a name",
help="The daemon instance name.",
)
@click.pass_context
def start_command(ctx: click.Context, name: str) -> None:
"""
Writes on the fifo. Invokes the start of a client.
"""
logger.debug("Sending the start command down the pipe: %s", ctx.obj["fifo"])
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="start"))
logger.info("Start sent.")
@cli.command("stop")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def stop_command(ctx: click.Context, name: T.Optional[str]) -> None:
""" """
Writes on the fifo. Invokes the stop. Writes on the fifo. Invokes the stop.
""" """
logger.debug("Sending the stop command down the pipe: %s", ctx.obj["fifo"]) logger.debug("Sending the stop command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logging.info("Stopping instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo: with open(ctx.obj["fifo"], "w") as fifo:
fifo.write("stop") fifo.write(cmd_marshal(name=name, cmd="stop"))
fifo.flush() fifo.flush()
logger.info("Stop sent.") logger.info("Stop sent.")
@cli.command("reload") @cli.command("reload")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context @click.pass_context
def reload_command(ctx: click.Context): def reload_command(ctx: click.Context, name: T.Optional[str]) -> None:
""" """
Writes on the fifo. Invokes the reload. Writes on the fifo. Invokes the reload.
""" """
logger.debug("Sending the reload command down the pipe: %s", ctx.obj["fifo"]) logger.debug("Sending the reload command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
with open(ctx.obj["fifo"], "w") as fifo: with open(ctx.obj["fifo"], "w") as fifo:
fifo.write("reload") fifo.write(cmd_marshal(name=name, cmd="reload"))
fifo.flush() fifo.flush()
logger.info("Reload sent.") logger.info("Reload sent.")
@cli.command("status") @cli.command("status")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context @click.pass_context
def status_command(ctx: click.Context): def status_command(ctx: click.Context, name: T.Optional[str]) -> None:
""" """
Writes on the fifo. Queries the status. Writes on the fifo. Queries the status.
""" """
name = _check_name(ctx.obj["lifo"], name)
_status_command(ctx, name)
def _status_command(ctx: click.Context, name: str) -> None:
rfifo_path = os.path.join(ctx.obj["workdir"], "rfifo-{}".format(id(ctx))) rfifo_path = os.path.join(ctx.obj["workdir"], "rfifo-{}".format(id(ctx)))
logger.debug("Awaiting response on fifo: %s", rfifo_path) logger.debug("Awaiting response on fifo: %s", rfifo_path)
try: try:
@ -170,7 +220,7 @@ def status_command(ctx: click.Context):
logger.debug("Response fifo exists.") logger.debug("Response fifo exists.")
with Fifo(ctx.obj["fifo"], "w") as fifo: with Fifo(ctx.obj["fifo"], "w") as fifo:
fifo.write("status|{}".format(rfifo_path)) fifo.write(cmd_marshal(name=name, cmd="status", rfifo_path=rfifo_path))
logger.debug("Awaiting response...") logger.debug("Awaiting response...")
done = False done = False
while not done: while not done:

View File

@ -4,12 +4,33 @@
import errno import errno
import logging import logging
import json
import os import os
import typing as T import typing as T
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ENC = json.JSONEncoder()
DEC = json.JSONDecoder()
def cmd_marshal(name: str, cmd: str, **kwargs) -> str:
"""
Serializes a command (a python dict) in a JSON object string.
"""
cmd_struct = dict(name=name, cmd=cmd, **kwargs)
logger.debug("Command to be marshalled: %s", cmd_struct)
return ENC.encode(cmd_struct)
def cmd_unmarshal(cmd: str) -> T.Dict[str, T.Any]:
"""
Deserializes a command (a python dict) in a JSON object string.
"""
logger.debug("Command to be unmarshalled: %s", cmd)
return DEC.decode(cmd)
class Fifo: class Fifo:
""" """
@ -46,6 +67,7 @@ class Fifo:
self.fh.close() self.fh.close()
def write(self, data) -> None: def write(self, data) -> None:
logger.debug("%s ~ Writing: %s", self.path, data)
self.fh.write(data) self.fh.write(data)
self.fh.flush() self.fh.flush()
@ -58,6 +80,9 @@ class PLifo:
def __init__(self, path: str) -> None: def __init__(self, path: str) -> None:
self.path = path self.path = path
if not os.path.exists(path):
with open(path, "w") as lifo:
lifo.write("")
self.content = [] self.content = []
self.fh = None self.fh = None

View File

@ -2,17 +2,19 @@
"""Daemon for local execution of commands.""" """Daemon for local execution of commands."""
import errno
import functools import functools
import json import json
import lockfile import lockfile
import logging import logging
import logging.handlers import logging.handlers
import os import os
import queue
import signal import signal
import threading
import typing as T import typing as T
from bot_z.bot_z import Operator from bot_z.bot_z import Operator
from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal
import daemon import daemon
from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.keys import Keys
@ -32,8 +34,8 @@ logger.debug("Init at debug")
def start_daemon( def start_daemon(
base_uri: str,
name: str, name: str,
base_uri: str,
timeout: int = None, timeout: int = None,
proxy: T.Optional[T.Tuple[str, int]] = None, proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True, headless: bool = True,
@ -42,7 +44,7 @@ def start_daemon(
""" """
Spawns the Operator object. Spawns the Operator object.
""" """
optional_args = {} # type: T.Dict[str, T.Any] optional_args: T.Dict[str, T.Any] = {}
if timeout is not None: if timeout is not None:
optional_args["timeout"] = timeout optional_args["timeout"] = timeout
if proxy is not None: if proxy is not None:
@ -63,7 +65,7 @@ def start_daemon(
return Operator(base_uri=base_uri, name=name, **optional_args) return Operator(base_uri=base_uri, name=name, **optional_args)
def drop_operator(op: Operator) -> None: def operator_drop(op: Operator) -> None:
""" """
Stops the Operator. Stops the Operator.
""" """
@ -73,7 +75,7 @@ def drop_operator(op: Operator) -> None:
logger.info("Stopped Operator.") logger.info("Stopped Operator.")
def reload_operator(op: Operator) -> None: def operator_reload(op: Operator) -> None:
""" """
Reload the Operator using the same object. Reload the Operator using the same object.
""" """
@ -97,13 +99,30 @@ def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]:
return status return status
def operator_login(op: Operator, username: str, password: str, force: bool) -> None:
"""
Instructs the operator to perform login.
"""
logger.debug("Performing login.")
op.login(username, password, force)
logger.info("Login done.")
def operator_logout(op: Operator, username: str, force: bool) -> None:
"""
Instructs the operator to perform logout.
"""
logger.debug("Performing logout.")
op.logout(username, password, force)
logger.info("Logout done.")
def daemon_process( def daemon_process(
fifo_path: str, fifo_path: str,
working_dir: str, working_dir: str,
umask: int, umask: int,
pidfile: lockfile.FileLock, pidfile: lockfile.FileLock,
base_uri: str, base_uri: str,
name: str,
timeout: int = None, timeout: int = None,
proxy: T.Optional[T.Tuple[str, int]] = None, proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True, headless: bool = True,
@ -113,50 +132,123 @@ def daemon_process(
""" """
The daemon function. The daemon function.
""" """
op = start_daemon(base_uri, name, timeout, proxy, headless, debug) lifo_path = os.path.join(working_dir, "botz_open_daemons.list")
logger.debug("Started operator id: %s", id(op))
starter = functools.partial(
start_daemon,
base_uri=base_uri,
timeout=timeout,
proxy=proxy,
headless=headless,
debug=debug,
)
def stop_all() -> None:
pass
def reload_all() -> None:
pass
def list_all() -> None:
pass
context = daemon.DaemonContext( context = daemon.DaemonContext(
working_directory=working_dir, umask=umask, pidfile=pidfile working_directory=working_dir, umask=umask, pidfile=pidfile
) )
_stop = functools.partial(drop_operator, op) logger.debug("context defined")
_reload = functools.partial(reload_operator, op)
_status = functools.partial(operator_status, op)
context.signal_map = { context.signal_map = {
signal.SIGTERM: _stop, signal.SIGTERM: stop_all,
signal.SIGHUP: "terminate", signal.SIGHUP: "terminate",
signal.SIGUSR1: _reload, signal.SIGUSR1: reload_all,
signal.SIGUSR2: _status, signal.SIGUSR2: list_all,
} }
cmd_map = {"stop": _stop, "reload": _reload, "status": _status} logger.debug("signal map defined")
cmd_map = {
"start": starter,
"stop": operator_drop,
"reload": operator_reload,
"status": operator_status,
"login": operator_login,
"logout": operator_logout,
}
logger.debug("command map defined")
if foreground: if foreground:
listen_commands(fifo_path, cmd_map) logger.debug("Started in foreground")
try:
listen_commands(lifo_path, fifo_path, cmd_map)
except KeyboardInterrupt:
logger.info("Process killed!")
finally:
os.remove(lifo_path)
return return
with context: with context:
listen_commands(fifo_path, cmd_map) logger.debug("Started in background")
listen_commands(lifo_path, fifo_path, cmd_map)
os.remove(lifo_path)
def listen_commands(name: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]) -> None: def listen_client(
name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable]
) -> None:
"""
The listen loop for an operator instance.
"""
op = cmd_map["start"](name)
logger.debug("Started operator id: %s", id(op))
while True:
cmd = chan.get()
if cmd["name"] != name:
log.warning("Command sent to the wrong instance: %s\n%s", name, cmd)
continue
if cmd["cmd"] == "start":
logger.error("Alredy started: %s", name)
continue
if cmd["cmd"] == "stop":
logger.debug("Received command: stop")
cmd_map["stop"](op)
return
elif cmd["cmd"] == "reload":
logger.debug("Received command: reload")
cmd_map["reload"](op)
elif cmd["cmd"] == "status":
logger.debug("Received command: status")
status = cmd_map["status"](op)
logger.debug("Status: %s", status)
with Fifo(cmd["rfifo_path"], "w") as rfifo:
rfifo.write(cmd_marshal(name=cmd["name"], cmd=status))
def listen_commands(
lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]
) -> None:
""" """
The main loop to listen incoming commands. The main loop to listen incoming commands.
""" """
client_chans = {}
while True: while True:
fifo = Fifo(fifopath, "r") with Fifo(fifopath) as fifo:
for cmd_str in fifo: for cmd_str in fifo:
rcpt, command = cmd_str.split('\u2193') logger.debug("Command received in main loop: %s", cmd_str)
if rcpt != name: cmd = cmd_unmarshal(cmd_str)
continue if cmd["name"] not in client_chans:
if command == "stop": client_chans[cmd["name"]] = queue.Queue(1)
logger.debug("Received command: %s", "stop") if cmd["name"] in PLifo.All(lifopath):
cmd_map["stop"]() logger.warning("Name %s is yet used. Not proceeding.", name)
return continue
elif command == "reload": PLifo.Push(lifopath, cmd["name"])
logger.debug("Received command: %s", "reload") logger.debug("Client %s has been newly created.", cmd["name"])
cmd_map["reload"]() logger.debug("Opening client: %s", cmd["name"])
elif command.startswith("status"): q = client_chans[cmd["name"]]
logger.debug("Received command: %s", "status") if cmd["cmd"] == "start":
status = cmd_map["status"]() logger.debug("Starting new client in a thread...")
logger.debug("Status: %s", status) client = threading.Thread(
jenc = json.JSONEncoder() name=cmd["name"],
with Fifo(command.split("|")[-1], "w") as rfifo: target=functools.partial(
rfifo.write(jenc.encode(status)) listen_client, name=cmd["name"], chan=q, cmd_map=cmd_map
),
)
client.start()
logger.debug('Client "%s" started.', cmd["name"])
continue
q.put(cmd)