# -*- coding: utf-8 -*- """Console script to control the bot_z daemon""" import errno from getpass import getpass import logging import os import sys import typing as T import click import lockfile from bot_z.zdaemon import daemon_process from bot_z.utils import Fifo, PLifo, cmd_marshal logger = logging.getLogger(__name__) logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO)) sh = logging.StreamHandler(stream=sys.stdout) cli_filter = logging.Filter(__name__) sh.addFilter(cli_filter) sh.setFormatter(logging.Formatter("%(message)s")) logger.addHandler(sh) def _check_name(lifo_path: str, name: T.Optional[str]) -> str: with PLifo(lifo_path) as lifo: if name is None: return lifo.last() if name not in lifo: logger.error("Daemon not present. Exiting.") sys.exit(2) return name @click.group() @click.option("-d", "--debug", is_flag=True, default=False, help="Enable debug mode.") @click.option( "--no-headless", "headless", is_flag=True, default=True, help="Start the clients in foreground.", ) @click.option( "-v", "--verbose", is_flag=True, default=False, help="More verbose output from this cli.", ) @click.option( "-f", "--fifo", type=click.STRING, default="bot_z.cmd", help="Path to the control fifo.", ) @click.option( "-w", "--workdir", type=click.Path(exists=True, readable=True, writable=True, resolve_path=True), default="/tmp", help="The working dir where to launch the daemon and where the lockfile is put.", ) @click.pass_context def cli( ctx: click.Context, debug: bool, headless: bool, verbose: bool, fifo: str, workdir: str, ) -> None: """ Group cli. """ if verbose: logger.setLevel(logging.DEBUG) ctx.ensure_object(dict) ctx.obj["debug"] = debug ctx.obj["headless"] = headless ctx.obj["verbose"] = verbose ctx.obj["fifo"] = os.path.join(workdir, fifo) ctx.obj["workdir"] = workdir ctx.obj["lifo"] = os.path.join(workdir, "botz_open_daemons.list") @cli.command("start-daemon") @click.option( "-t", "--timeout", type=click.INT, default=20, help="Browser requests timeout." ) @click.option( "-m", "--umask", type=click.INT, default=0o002, help="The umask of the control fifo.", ) @click.option( "-p", "--proxy", type=click.STRING, default=None, help="An optional string for the proxy with the form 'address:port'.", ) @click.option( "--foreground", is_flag=True, default=False, help="Keep the process in foreground (do not daemonize).", ) @click.argument("baseuri", type=click.STRING) @click.pass_context def start_daemon_command( ctx: click.Context, baseuri: str, umask: int, timeout: int, proxy: T.Optional[str] = None, foreground: bool = False, ) -> None: """ Invokes daemon_process for the first time. """ lf = lockfile.FileLock(os.path.join(ctx.obj["workdir"], "bot_z")) logger.debug("Daemon starting. Lockfile: %s", lf) proxy_tuple = None if proxy: proxy_tuple = tuple(proxy.split(":")) daemon_process( fifo_path=ctx.obj["fifo"], working_dir=ctx.obj["workdir"], umask=umask, pidfile=lf, base_uri=baseuri, timeout=timeout, proxy=proxy_tuple, headless=ctx.obj["headless"], debug=ctx.obj["debug"], foreground=foreground, ) if not foreground: logger.info("Daemon started.") @cli.command("list") @click.option( "-s", "--status", is_flag=True, default=False, help="Show also the status of each client.", ) @click.pass_context def list_command(ctx: click.Context, status: bool) -> None: """ Lists the clients attached """ logger.debug('Invoked the "list" command.') with PLifo(ctx.obj["lifo"]) as lifo: if len(lifo) == 0: logger.info("No clients.") return for client in lifo.all(): logger.info(client) if status: _status_command(ctx, client) @cli.command("start") @click.option( "-n", "--name", type=click.STRING, prompt="Give the instance a name", help="The daemon instance name.", ) @click.pass_context def start_command(ctx: click.Context, name: str) -> None: """ Starts a client. """ logger.debug("Sending the start command down the pipe: %s", ctx.obj["fifo"]) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="start")) logger.info("Start sent.") @cli.command("stop-daemon") @click.pass_context def stop_daemon_command(ctx: click.Context) -> None: """ Stops all the clients and shuts down the daemon. """ logger.debug("Sending the stop-daemon command down the pipe: %s", ctx.obj["fifo"]) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name="", cmd="stop-daemon")) logger.info("Stop-daemon sent.") @cli.command("stop") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.pass_context def stop_command(ctx: click.Context, name: T.Optional[str]) -> None: """ Stops a client. """ logger.debug("Sending the stop command down the pipe: %s", ctx.obj["fifo"]) name = _check_name(ctx.obj["lifo"], name) logger.info("Stopping instance: %s", name) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="stop")) logger.info("Stop sent.") def prompt_for_creds(param: str, hidden: bool = False) -> str: if hidden: return getpass("Insert {}: ".format(param)) return input("Insert {}: ".format(param)) @cli.command("login") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.option( "-c", "--credfile", default=None, type=click.File(), help="The file containing username and password, on one line each.", ) @click.option("-u", "--username", default=None, help="The username to login with.") @click.option( "-p", "--password", default=None, help="[USE ONLY WHEN DEBUGGING!] The password to login with. Use --credfile.", ) @click.option( "-f", "--force", is_flag=True, default=False, help="Force logout, bypass login check.", ) @click.pass_context def login_command( ctx: click.Context, name: T.Optional[str], username: T.Optional[str], password: T.Optional[str], credfile: T.Optional[T.TextIO], force: bool, ) -> None: """ Logs a client in using the provided credentials. """ no_userpass = username is None or password is None if credfile is not None and no_userpass: username = credfile.readline() password = credfile.readline() elif credfile is not None and not no_userpass: logger.warning("Ignoring command line provided username and password.") elif credfile is None and no_userpass: logger.warning("Missing username or password and credfile.") if username is None: username = prompt_for_creds("username") if password is None: password = prompt_for_creds("password", hidden=True) else: logger.warning("Do not use command line provided password in production!") logger.debug("Sending the login command down the pipe: %s", ctx.obj["fifo"]) name = _check_name(ctx.obj["lifo"], name) logger.info('Logging in on instance "%s" with username "%s".', name, username) with open(ctx.obj["fifo"], "w") as fifo: fifo.write( cmd_marshal( name=name, cmd="login", username=username, password=password, force=force, ) ) logger.info("Login sent.") @cli.command("logout") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.option( "-f", "--force", is_flag=True, default=False, help="Force logout, bypass login check.", ) @click.pass_context def logout_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None: """ Logs a logged in client out. """ logger.debug("Sending the logout command down the pipe: %s", ctx.obj["fifo"]) name = _check_name(ctx.obj["lifo"], name) logger.info("Logging out on instance: %s", name) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="logout", force=force)) logger.info("Logout sent.") @cli.command("checkin") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.option( "-f", "--force", is_flag=True, default=False, help="Force logout, bypass login check.", ) @click.pass_context def checkin_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None: """ Checks in on a logged in client. """ logger.debug("Sending the check in command down the pipe: %s", ctx.obj["fifo"]) name = _check_name(ctx.obj["lifo"], name) logger.info("Checking in on instance: %s", name) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="checkin", force=force)) logger.info("Check in sent.") @cli.command("checkout") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.option( "-f", "--force", is_flag=True, default=False, help="Force logout, bypass login check.", ) @click.pass_context def checkout_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None: """ Checks out on a logged in client and checked in client. """ logger.debug("Sending the check out command down the pipe: %s", ctx.obj["fifo"]) name = _check_name(ctx.obj["lifo"], name) logger.info("Checking out on instance: %s", name) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="checkout", force=force)) logger.info("Check out sent.") @cli.command("reload") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.pass_context def reload_command(ctx: click.Context, name: T.Optional[str]) -> None: """ Resets a client to a clean state (discards the session). """ logger.debug("Sending the reload command down the pipe: %s", ctx.obj["fifo"]) name = _check_name(ctx.obj["lifo"], name) with open(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="reload")) logger.info("Reload sent.") @cli.command("status") @click.option("-n", "--name", default=None, help="The instance to interact with.") @click.pass_context def status_command(ctx: click.Context, name: T.Optional[str]) -> None: """ Displays basic info on the state of a client. """ try: name = _check_name(ctx.obj["lifo"], name) except IndexError: if len(PLifo.All(ctx.obj["lifo"])) != 0: raise logger.warning("No clients registered.") return _status_command(ctx, name) def _status_command(ctx: click.Context, name: str) -> None: rfifo_path = os.path.join(ctx.obj["workdir"], "rfifo-{}".format(id(ctx))) logger.debug("Awaiting response on fifo: %s", rfifo_path) try: os.mkfifo(rfifo_path) logger.debug("Response fifo newly created.") except OSError as err: if err.errno != errno.EEXIST: raise logger.debug("Response fifo exists.") with Fifo(ctx.obj["fifo"], "w") as fifo: fifo.write(cmd_marshal(name=name, cmd="status", rfifo_path=rfifo_path)) logger.debug("Awaiting response...") done = False while not done: try: with open(rfifo_path, "r") as rfifo: resp = rfifo.read() done = True except FileNotFoundError as e: logger.warning("File not found: %s", e) pass logger.info(resp) try: os.remove(rfifo_path) except OSError as e: logger.warning("Failed removing response fifo %s: %s", rfifo_path, e) pass if __name__ == "__main__": cli()