BotZ/api/rest.py

223 lines
7.5 KiB
Python

# -*- encoding: utf-8 -*-
"""
The REST endpoints.
"""
from aiohttp import web
import asyncio
from concurrent.futures import ProcessPoolExecutor
import datetime
import logging
import os
import pkg_resources
import typing as T
import weakref
from aiohttp_session import new_session, get_session, Session
from passlib.hash import bcrypt
from bot_z.async_operator import AsyncOperator, push_to_loop
from api.async_bot import login, logout, checkin, checkout, status
from api import BASE_URI, DEBUG
alog = logging.getLogger("api")
routes = web.RouteTableDef()
OPERATORS = weakref.WeakKeyDictionary(
{}
) # type: weakref.WeakKeyDictionary[UserSession, AsyncOperator]
USERS = {} # type: T.Dict[T.Text, UserSession]
BASE_PATH = pkg_resources.resource_filename(__name__, "assets")
EXECUTOR = ProcessPoolExecutor()
# WARN: the default il 12 rounds; both the server and the client shall compute
# this hash. The target client is a smartphone with poor performance on
# crypto computations, so we should keep this low, as the verification step
# is optionally enforced by the client.
ROUNDS = 6
class UserSession(object):
"""
Placeholder object to manipulate session life.
"""
def __init__(self, user):
self._user = user
def _reckon_token_response(base_uri: T.Text) -> T.Text:
return bcrypt.using(rounds=ROUNDS, truncate_error=True).hash(base_uri)
async def reckon_token_response(
base_uri: T.Text, loop: asyncio.AbstractEventLoop
) -> T.Text:
"""
A client and the server should agree if the pairing is adequate.
This could be accomplished calculating on both sides an cryptographic
secret. The current implementation uses bcrypt to compute the hash on
server side and the client should verify the secret on its side.
"""
return await push_to_loop(loop, EXECUTOR, _reckon_token_response, base_uri)
async def get_set_operator(
request: web.Request, user: T.Text, password: T.Text
) -> T.Tuple[AsyncOperator, Session]:
session = await get_session(request)
op = None
if "async_operator" in session:
user_session = USERS.get(session["async_operator"])
op = OPERATORS.get(user_session)
else:
session = await new_session(request)
if op is None or session.new:
base_uri = request.app["base_uri"]
debug = request.app["debug"]
headless = request.app["headless"]
op = AsyncOperator(base_uri, name=user, headless=headless, debug=debug)
USERS[user] = UserSession(user)
session["async_operator"] = user
OPERATORS[USERS[user]] = op
return op, session
def add_static_routes(log: logging.Logger) -> None:
static_assets = [
os.path.abspath(os.path.join(BASE_PATH, path))
for path in os.listdir(BASE_PATH)
if os.path.isdir(os.path.join(BASE_PATH, path))
]
for asset in static_assets:
asset_path = os.path.relpath(asset, BASE_PATH)
log.debug(f"Linking: {asset_path} -> {asset}")
routes.static(f"/{asset_path}", asset)
@routes.get("/")
async def home_handle(request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(BASE_PATH, "index.html"))
@routes.get("/favicon.ico")
async def favicon_handle(request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(BASE_PATH, "favicon.ico"))
@routes.get("/api/login")
@routes.get("/api/badge")
@routes.get("/api/status")
async def routing_handler(request: web.Request) -> web.Response:
alog.debug("(%s) %s", request.method, request.path)
session = await get_session(request)
op = session.get("async_operator")
_logged_in = True if op else False
alog.info("%s - is%s in session", request.path, " NOT" if not _logged_in else "")
return web.json_response({"logged_in": _logged_in})
@routes.get("/api/ping")
async def ping_handler(request: web.Request) -> web.Response:
alog.debug("pinged on %s", request.path)
resp_data = await reckon_token_response(request.app["base_uri"], request.app.loop)
alog.debug("ping response: %s", resp_data)
return web.json_response({"hash": resp_data})
@routes.post("/api/login")
async def login_handler(request: web.Request) -> web.Response:
data = await request.json()
user = data.get("username")
password = data.get("password")
if not user or not password:
alog.debug("login - missing username or password: %s", data)
return web.json_response({"error": "Missing username or password"}, status=401)
op, session = await get_set_operator(request, user, password)
alog.debug("login - user: %s, password: %s", user, password)
res = await login(op, user, password)
alog.debug("login result: %s", res)
if not res:
session.invalidate()
alog.info("Login failed; session invalidated.")
return web.json_response({"logged_in": res}, status=200)
@routes.post("/api/logout")
async def logout_handler(request: web.Request) -> web.Response:
alog.debug("logout")
session = await get_session(request)
user_session = USERS.get(session["async_operator"], UserSession("NOONE"))
op = OPERATORS.get(user_session)
if not op:
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await logout(op)
session.invalidate()
alog.debug("logout result: %s", res)
# FIX: assess if better to invalidate session and dump the browser instance.
del user_session
return web.json_response({"logged_in": res}, status=200)
@routes.post("/api/checkin")
async def checkin_handler(request: web.Request) -> web.Response:
alog.debug("checkin")
session = await get_session(request)
op = OPERATORS.get(session.get("async_operator"))
if not op:
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await checkin(op)
alog.debug("checkin result: %s", res)
return web.json_response({"checked_in": res, "logged_in": True}, status=200)
@routes.post("/api/checkout")
async def checkout_handler(request: web.Request) -> web.Response:
alog.debug("checkout")
session = await get_session(request)
op = OPERATORS.get(session.get("async_operator"))
if not op:
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await checkout(op)
alog.debug("checkout result: %s", res)
return web.json_response({"checked_in": res, "logged_in": True}, status=200)
@routes.get("/api/movements")
async def movements_handle(request: web.Request) -> web.Response:
alog.debug("movements")
session = await get_session(request)
user_session = USERS.get(session.get("async_operator"), UserSession("NOONE"))
op = OPERATORS.get(user_session)
if not op:
alog.debug("Missing session")
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await status(op)
alog.debug("movements result: %s", res)
if not res:
return web.json_response(
{"error": "No movements found", "logged_in": True, "checked_in": False},
status=404,
)
movements = []
for r in res:
if r and len(r) == 2:
movements.append({"time": r[1], "type": r[0]})
resp_data: T.Dict[T.Text, T.Any] = {"movements": movements}
resp_data["logged_in"] = True
try:
last_movement = list(movements[-1])
resp_data["checked_in"] = True if last_movement == "Entrata" else False
except IndexError:
alog.info("No movements found")
return web.json_response(resp_data, status=200)