diff --git a/bot_z/cli.py b/bot_z/cli.py index 30033e7..1142d67 100644 --- a/bot_z/cli.py +++ b/bot_z/cli.py @@ -6,21 +6,31 @@ import errno import logging import os import sys -import time import typing as T import click import lockfile -from bot_z.zdaemon import daemon_process, Fifo +from bot_z.zdaemon import daemon_process +from bot_z.utils import Fifo, PLifo, cmd_marshal logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) +logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO)) sh = logging.StreamHandler(stream=sys.stdout) sh.setFormatter(logging.Formatter("%(message)s")) logger.addHandler(sh) +def _check_name(lifo_path: str, name: T.Optional[str]) -> str: + with PLifo(lifo_path) as lifo: + if name is None: + return lifo.last() + if name not in lifo: + logger.error("Daemon not present. Exiting.") + sys.exit(2) + return name + + @click.group() @click.option("-d", "--debug", is_flag=True, default=False, help="Enable debug mode.") @click.option( @@ -58,9 +68,10 @@ def cli( ctx.obj["verbose"] = verbose ctx.obj["fifo"] = fifo ctx.obj["workdir"] = workdir + ctx.obj["lifo"] = os.path.join(workdir, "botz_open_daemons.list") -@cli.command("start") +@cli.command("start-daemon") @click.option( "-t", "--timeout", type=click.INT, default=20, help="Browser requests timeout." ) @@ -71,13 +82,6 @@ def cli( default=0o002, help="The umask of the control fifo.", ) -@click.option( - "-n", - "--name", - type=click.STRING, - default="default_instance", - help="The daemon instance name.", -) @click.option( "-p", "--proxy", @@ -93,10 +97,9 @@ def cli( ) @click.argument("baseuri", type=click.STRING) @click.pass_context -def start_command( +def start_daemon_command( ctx: click.Context, baseuri: str, - name: str, umask: int, timeout: int, proxy: T.Optional[str] = None, @@ -117,7 +120,6 @@ def start_command( umask=umask, pidfile=lf, base_uri=baseuri, - name=name, timeout=timeout, proxy=proxy_tuple, headless=not ctx.obj["debug"], @@ -127,38 +129,86 @@ def start_command( logger.info("Daemon started.") -@cli.command("stop") +@cli.command("list") +@click.option("-s", "--status", is_flag=True, default=False, help="Show also the status of each client.") @click.pass_context -def stop_command(ctx: click.Context): +def list_command(ctx: click.Context, status: bool) -> None: + """ + Shows the clients attached + """ + logger.debug("Invoked the \"list\" command.") + with PLifo(ctx.obj["lifo"]) as lifo: + if len(lifo) == 0: + logger.info("No clients.") + return + for client in lifo.all(): + logger.info(client) + if status: + _status_command(ctx, client) + + +@cli.command("start") +@click.option( + "-n", + "--name", + type=click.STRING, + prompt="Give the instance a name", + help="The daemon instance name.", +) +@click.pass_context +def start_command(ctx: click.Context, name: str) -> None: + """ + Writes on the fifo. Invokes the start of a client. + """ + logger.debug("Sending the start command down the pipe: %s", ctx.obj["fifo"]) + with open(ctx.obj["fifo"], "w") as fifo: + fifo.write(cmd_marshal(name=name, cmd="start")) + logger.info("Start sent.") + + +@cli.command("stop") +@click.option("-n", "--name", default=None, help="The instance to interact with.") +@click.pass_context +def stop_command(ctx: click.Context, name: T.Optional[str]) -> None: """ Writes on the fifo. Invokes the stop. """ logger.debug("Sending the stop command down the pipe: %s", ctx.obj["fifo"]) + name = _check_name(ctx.obj["lifo"], name) + logging.info("Stopping instance: %s", name) with open(ctx.obj["fifo"], "w") as fifo: - fifo.write("stop") + fifo.write(cmd_marshal(name=name, cmd="stop")) fifo.flush() logger.info("Stop sent.") @cli.command("reload") +@click.option("-n", "--name", default=None, help="The instance to interact with.") @click.pass_context -def reload_command(ctx: click.Context): +def reload_command(ctx: click.Context, name: T.Optional[str]) -> None: """ Writes on the fifo. Invokes the reload. """ logger.debug("Sending the reload command down the pipe: %s", ctx.obj["fifo"]) + name = _check_name(ctx.obj["lifo"], name) with open(ctx.obj["fifo"], "w") as fifo: - fifo.write("reload") + fifo.write(cmd_marshal(name=name, cmd="reload")) fifo.flush() logger.info("Reload sent.") @cli.command("status") +@click.option("-n", "--name", default=None, help="The instance to interact with.") @click.pass_context -def status_command(ctx: click.Context): +def status_command(ctx: click.Context, name: T.Optional[str]) -> None: """ Writes on the fifo. Queries the status. """ + name = _check_name(ctx.obj["lifo"], name) + _status_command(ctx, name) + + +def _status_command(ctx: click.Context, name: str) -> None: rfifo_path = os.path.join(ctx.obj["workdir"], "rfifo-{}".format(id(ctx))) logger.debug("Awaiting response on fifo: %s", rfifo_path) try: @@ -170,7 +220,7 @@ def status_command(ctx: click.Context): logger.debug("Response fifo exists.") with Fifo(ctx.obj["fifo"], "w") as fifo: - fifo.write("status|{}".format(rfifo_path)) + fifo.write(cmd_marshal(name=name, cmd="status", rfifo_path=rfifo_path)) logger.debug("Awaiting response...") done = False while not done: diff --git a/bot_z/utils.py b/bot_z/utils.py index b1b4c49..821a8ba 100644 --- a/bot_z/utils.py +++ b/bot_z/utils.py @@ -4,12 +4,33 @@ import errno import logging +import json import os import typing as T logger = logging.getLogger(__name__) +ENC = json.JSONEncoder() +DEC = json.JSONDecoder() + + +def cmd_marshal(name: str, cmd: str, **kwargs) -> str: + """ + Serializes a command (a python dict) in a JSON object string. + """ + cmd_struct = dict(name=name, cmd=cmd, **kwargs) + logger.debug("Command to be marshalled: %s", cmd_struct) + return ENC.encode(cmd_struct) + + +def cmd_unmarshal(cmd: str) -> T.Dict[str, T.Any]: + """ + Deserializes a command (a python dict) in a JSON object string. + """ + logger.debug("Command to be unmarshalled: %s", cmd) + return DEC.decode(cmd) + class Fifo: """ @@ -46,6 +67,7 @@ class Fifo: self.fh.close() def write(self, data) -> None: + logger.debug("%s ~ Writing: %s", self.path, data) self.fh.write(data) self.fh.flush() @@ -58,6 +80,9 @@ class PLifo: def __init__(self, path: str) -> None: self.path = path + if not os.path.exists(path): + with open(path, "w") as lifo: + lifo.write("") self.content = [] self.fh = None diff --git a/bot_z/zdaemon.py b/bot_z/zdaemon.py index acf2cdf..138076b 100644 --- a/bot_z/zdaemon.py +++ b/bot_z/zdaemon.py @@ -2,17 +2,19 @@ """Daemon for local execution of commands.""" -import errno import functools import json import lockfile import logging import logging.handlers import os +import queue import signal +import threading import typing as T from bot_z.bot_z import Operator +from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal import daemon from selenium.webdriver.common.keys import Keys @@ -32,8 +34,8 @@ logger.debug("Init at debug") def start_daemon( - base_uri: str, name: str, + base_uri: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, @@ -42,7 +44,7 @@ def start_daemon( """ Spawns the Operator object. """ - optional_args = {} # type: T.Dict[str, T.Any] + optional_args: T.Dict[str, T.Any] = {} if timeout is not None: optional_args["timeout"] = timeout if proxy is not None: @@ -63,7 +65,7 @@ def start_daemon( return Operator(base_uri=base_uri, name=name, **optional_args) -def drop_operator(op: Operator) -> None: +def operator_drop(op: Operator) -> None: """ Stops the Operator. """ @@ -73,7 +75,7 @@ def drop_operator(op: Operator) -> None: logger.info("Stopped Operator.") -def reload_operator(op: Operator) -> None: +def operator_reload(op: Operator) -> None: """ Reload the Operator using the same object. """ @@ -97,13 +99,30 @@ def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]: return status +def operator_login(op: Operator, username: str, password: str, force: bool) -> None: + """ + Instructs the operator to perform login. + """ + logger.debug("Performing login.") + op.login(username, password, force) + logger.info("Login done.") + + +def operator_logout(op: Operator, username: str, force: bool) -> None: + """ + Instructs the operator to perform logout. + """ + logger.debug("Performing logout.") + op.logout(username, password, force) + logger.info("Logout done.") + + def daemon_process( fifo_path: str, working_dir: str, umask: int, pidfile: lockfile.FileLock, base_uri: str, - name: str, timeout: int = None, proxy: T.Optional[T.Tuple[str, int]] = None, headless: bool = True, @@ -113,50 +132,123 @@ def daemon_process( """ The daemon function. """ - op = start_daemon(base_uri, name, timeout, proxy, headless, debug) - logger.debug("Started operator id: %s", id(op)) + lifo_path = os.path.join(working_dir, "botz_open_daemons.list") + + starter = functools.partial( + start_daemon, + base_uri=base_uri, + timeout=timeout, + proxy=proxy, + headless=headless, + debug=debug, + ) + + def stop_all() -> None: + pass + + def reload_all() -> None: + pass + + def list_all() -> None: + pass + context = daemon.DaemonContext( working_directory=working_dir, umask=umask, pidfile=pidfile ) - _stop = functools.partial(drop_operator, op) - _reload = functools.partial(reload_operator, op) - _status = functools.partial(operator_status, op) + logger.debug("context defined") context.signal_map = { - signal.SIGTERM: _stop, + signal.SIGTERM: stop_all, signal.SIGHUP: "terminate", - signal.SIGUSR1: _reload, - signal.SIGUSR2: _status, + signal.SIGUSR1: reload_all, + signal.SIGUSR2: list_all, } - cmd_map = {"stop": _stop, "reload": _reload, "status": _status} + logger.debug("signal map defined") + cmd_map = { + "start": starter, + "stop": operator_drop, + "reload": operator_reload, + "status": operator_status, + "login": operator_login, + "logout": operator_logout, + } + logger.debug("command map defined") if foreground: - listen_commands(fifo_path, cmd_map) + logger.debug("Started in foreground") + try: + listen_commands(lifo_path, fifo_path, cmd_map) + except KeyboardInterrupt: + logger.info("Process killed!") + finally: + os.remove(lifo_path) return with context: - listen_commands(fifo_path, cmd_map) + logger.debug("Started in background") + listen_commands(lifo_path, fifo_path, cmd_map) + os.remove(lifo_path) -def listen_commands(name: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]) -> None: +def listen_client( + name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable] +) -> None: + """ + The listen loop for an operator instance. + """ + op = cmd_map["start"](name) + logger.debug("Started operator id: %s", id(op)) + while True: + cmd = chan.get() + if cmd["name"] != name: + log.warning("Command sent to the wrong instance: %s\n%s", name, cmd) + continue + if cmd["cmd"] == "start": + logger.error("Alredy started: %s", name) + continue + if cmd["cmd"] == "stop": + logger.debug("Received command: stop") + cmd_map["stop"](op) + return + elif cmd["cmd"] == "reload": + logger.debug("Received command: reload") + cmd_map["reload"](op) + elif cmd["cmd"] == "status": + logger.debug("Received command: status") + status = cmd_map["status"](op) + logger.debug("Status: %s", status) + with Fifo(cmd["rfifo_path"], "w") as rfifo: + rfifo.write(cmd_marshal(name=cmd["name"], cmd=status)) + + +def listen_commands( + lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable] +) -> None: """ The main loop to listen incoming commands. """ + client_chans = {} while True: - fifo = Fifo(fifopath, "r") - for cmd_str in fifo: - rcpt, command = cmd_str.split('\u2193') - if rcpt != name: - continue - if command == "stop": - logger.debug("Received command: %s", "stop") - cmd_map["stop"]() - return - elif command == "reload": - logger.debug("Received command: %s", "reload") - cmd_map["reload"]() - elif command.startswith("status"): - logger.debug("Received command: %s", "status") - status = cmd_map["status"]() - logger.debug("Status: %s", status) - jenc = json.JSONEncoder() - with Fifo(command.split("|")[-1], "w") as rfifo: - rfifo.write(jenc.encode(status)) + with Fifo(fifopath) as fifo: + for cmd_str in fifo: + logger.debug("Command received in main loop: %s", cmd_str) + cmd = cmd_unmarshal(cmd_str) + if cmd["name"] not in client_chans: + client_chans[cmd["name"]] = queue.Queue(1) + if cmd["name"] in PLifo.All(lifopath): + logger.warning("Name %s is yet used. Not proceeding.", name) + continue + PLifo.Push(lifopath, cmd["name"]) + logger.debug("Client %s has been newly created.", cmd["name"]) + logger.debug("Opening client: %s", cmd["name"]) + q = client_chans[cmd["name"]] + if cmd["cmd"] == "start": + logger.debug("Starting new client in a thread...") + client = threading.Thread( + name=cmd["name"], + target=functools.partial( + listen_client, name=cmd["name"], chan=q, cmd_map=cmd_map + ), + ) + client.start() + logger.debug('Client "%s" started.', cmd["name"]) + continue + q.put(cmd)