BotZ/bot_z/cli.py

378 lines
10 KiB
Python

# -*- coding: utf-8 -*-
"""Console script to control the bot_z daemon"""
import errno
from getpass import getpass
import logging
import os
import sys
import typing as T
import click
import lockfile
from bot_z.zdaemon import daemon_process
from bot_z.utils import Fifo, PLifo, cmd_marshal
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
sh = logging.StreamHandler(stream=sys.stdout)
cli_filter = logging.Filter(__name__)
sh.addFilter(cli_filter)
sh.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(sh)
def _check_name(lifo_path: str, name: T.Optional[str]) -> str:
with PLifo(lifo_path) as lifo:
if name is None:
return lifo.last()
if name not in lifo:
logger.error("Daemon not present. Exiting.")
sys.exit(2)
return name
@click.group()
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable debug mode.")
@click.option(
"--no-headless",
"headless",
is_flag=True,
default=True,
help="Start the clients in foreground.",
)
@click.option(
"-v",
"--verbose",
is_flag=True,
default=False,
help="More verbose output from this cli.",
)
@click.option(
"-f",
"--fifo",
type=click.STRING,
default="bot_z.cmd",
help="Path to the control fifo.",
)
@click.option(
"-w",
"--workdir",
type=click.Path(exists=True, readable=True, writable=True, resolve_path=True),
default="/tmp",
help="The working dir where to launch the daemon and where the lockfile is put.",
)
@click.pass_context
def cli(
ctx: click.Context,
debug: bool,
headless: bool,
verbose: bool,
fifo: str,
workdir: str,
) -> None:
"""
Group cli.
"""
if verbose:
logger.setLevel(logging.DEBUG)
ctx.ensure_object(dict)
ctx.obj["debug"] = debug
ctx.obj["headless"] = headless
ctx.obj["verbose"] = verbose
ctx.obj["fifo"] = os.path.join(workdir, fifo)
ctx.obj["workdir"] = workdir
ctx.obj["lifo"] = os.path.join(workdir, "botz_open_daemons.list")
@cli.command("start-daemon")
@click.option(
"-t", "--timeout", type=click.INT, default=20, help="Browser requests timeout."
)
@click.option(
"-m",
"--umask",
type=click.INT,
default=0o002,
help="The umask of the control fifo.",
)
@click.option(
"-p",
"--proxy",
type=click.STRING,
default=None,
help="An optional string for the proxy with the form 'address:port'.",
)
@click.option(
"--foreground",
is_flag=True,
default=False,
help="Keep the process in foreground (do not daemonize).",
)
@click.argument("baseuri", type=click.STRING)
@click.pass_context
def start_daemon_command(
ctx: click.Context,
baseuri: str,
umask: int,
timeout: int,
proxy: T.Optional[str] = None,
foreground: bool = False,
) -> None:
"""
Invokes daemon_process for the first time.
"""
lf = lockfile.FileLock(os.path.join(ctx.obj["workdir"], "bot_z"))
logger.debug("Daemon starting. Lockfile: %s", lf)
proxy_tuple = None
if proxy:
proxy_tuple = tuple(proxy.split(":"))
daemon_process(
fifo_path=ctx.obj["fifo"],
working_dir=ctx.obj["workdir"],
umask=umask,
pidfile=lf,
base_uri=baseuri,
timeout=timeout,
proxy=proxy_tuple,
headless=ctx.obj["headless"],
debug=ctx.obj["debug"],
foreground=foreground,
)
if not foreground:
logger.info("Daemon started.")
@cli.command("list")
@click.option(
"-s",
"--status",
is_flag=True,
default=False,
help="Show also the status of each client.",
)
@click.pass_context
def list_command(ctx: click.Context, status: bool) -> None:
"""
Shows the clients attached
"""
logger.debug('Invoked the "list" command.')
with PLifo(ctx.obj["lifo"]) as lifo:
if len(lifo) == 0:
logger.info("No clients.")
return
for client in lifo.all():
logger.info(client)
if status:
_status_command(ctx, client)
@cli.command("start")
@click.option(
"-n",
"--name",
type=click.STRING,
prompt="Give the instance a name",
help="The daemon instance name.",
)
@click.pass_context
def start_command(ctx: click.Context, name: str) -> None:
"""
Writes on the fifo. Invokes the start of a client.
"""
logger.debug("Sending the start command down the pipe: %s", ctx.obj["fifo"])
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="start"))
logger.info("Start sent.")
@cli.command("stop-daemon")
@click.pass_context
def stop_daemon_command(ctx: click.Context) -> None:
"""
Writes on the fifo. Invokes the stop of all the clients and the
subsequent shutdown of the daemon.
"""
logger.debug("Sending the stop-daemon command down the pipe: %s", ctx.obj["fifo"])
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name="", cmd="stop-daemon"))
logger.info("Stop-daemon sent.")
@cli.command("stop")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def stop_command(ctx: click.Context, name: T.Optional[str]) -> None:
"""
Writes on the fifo. Invokes the stop.
"""
logger.debug("Sending the stop command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logging.info("Stopping instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="stop"))
logger.info("Stop sent.")
def prompt_for_creds(param: str, hidden: bool = False) -> str:
if hidden:
return getpass("Insert {}: ".format(param))
return input("Insert {}: ".format(param))
@cli.command("login")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.option(
"-c",
"--credfile",
default=None,
type=click.File(),
help="The file containing username and password, on one line each.",
)
@click.option("-u", "--username", default=None, help="The username to login with.")
@click.option(
"-p",
"--password",
default=None,
help="[USE ONLY WHEN DEBUGGING!] The password to login with. Use --credfile.",
)
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force logout, bypass login check.",
)
@click.pass_context
def login_command(
ctx: click.Context,
name: T.Optional[str],
username: T.Optional[str],
password: T.Optional[str],
credfile: T.Optional[T.TextIO],
force: bool,
) -> None:
"""
Writes on the fifo. Invokes the stop.
"""
no_userpass = username is None or password is None
if credfile is not None and no_userpass:
username = credfile.readline()
password = credfile.readline()
elif credfile is not None and not no_userpass:
logger.warning("Ignoring command line provided username and password.")
elif credfile is None and no_userpass:
logger.warning("Missing username or password and credfile.")
if username is None:
username = prompt_for_creds("username")
if password is None:
password = prompt_for_creds("password", hidden=True)
else:
logger.warning("Do not use command line provided password in production!")
logger.debug("Sending the login command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logging.info('Logging in on instance "%s" with username "%s".', name, username)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(
cmd_marshal(
name=name,
cmd="login",
username=username,
password=password,
force=force,
)
)
logger.info("Login sent.")
@cli.command("logout")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force logout, bypass login check.",
)
@click.pass_context
def logout_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None:
"""
Writes on the fifo. Invokes the stop.
"""
logger.debug("Sending the logout command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logging.info("Logging out on instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="logout", force=force))
logger.info("Logout sent.")
@cli.command("reload")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def reload_command(ctx: click.Context, name: T.Optional[str]) -> None:
"""
Writes on the fifo. Invokes the reload.
"""
logger.debug("Sending the reload command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="reload"))
logger.info("Reload sent.")
@cli.command("status")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def status_command(ctx: click.Context, name: T.Optional[str]) -> None:
"""
Writes on the fifo. Queries the status.
"""
try:
name = _check_name(ctx.obj["lifo"], name)
except IndexError:
if len(PLifo.All(ctx.obj["lifo"])) != 0:
raise
logger.warning("No clients registered.")
return
_status_command(ctx, name)
def _status_command(ctx: click.Context, name: str) -> None:
rfifo_path = os.path.join(ctx.obj["workdir"], "rfifo-{}".format(id(ctx)))
logger.debug("Awaiting response on fifo: %s", rfifo_path)
try:
os.mkfifo(rfifo_path)
logger.debug("Response fifo newly created.")
except OSError as err:
if err.errno != errno.EEXIST:
raise
logger.debug("Response fifo exists.")
with Fifo(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="status", rfifo_path=rfifo_path))
logger.debug("Awaiting response...")
done = False
while not done:
try:
with open(rfifo_path, "r") as rfifo:
resp = rfifo.read()
done = True
except FileNotFoundError as e:
logger.warning("File not found: %s", e)
pass
logger.info(resp)
try:
os.remove(rfifo_path)
except OSError as e:
logger.warning("Failed removing response fifo %s: %s", rfifo_path, e)
pass
if __name__ == "__main__":
cli()