# -*- encoding: utf-8 -*- """ The REST endpoints. """ from aiohttp import web import asyncio from concurrent.futures import ProcessPoolExecutor import datetime import logging import os import pkg_resources import typing as T import weakref from aiohttp_session import new_session, get_session, Session from passlib.hash import bcrypt from bot_z.async_operator import AsyncOperator, push_to_loop from api.async_bot import login, logout, checkin, checkout, status from api import BASE_URI, DEBUG alog = logging.getLogger("api") routes = web.RouteTableDef() OPERATORS = weakref.WeakKeyDictionary( {} ) # type: weakref.WeakKeyDictionary[UserSession, AsyncOperator] USERS = {} # type: T.Dict[T.Text, UserSession] BASE_PATH = pkg_resources.resource_filename(__name__, "assets") EXECUTOR = ProcessPoolExecutor() # WARN: the default il 12 rounds; both the server and the client shall compute # this hash. The target client is a smartphone with poor performance on # crypto computations, so we should keep this low, as the verification step # is optionally enforced by the client. ROUNDS = 6 class UserSession(object): """ Placeholder object to manipulate session life. """ def __init__(self, user): self._user = user def _reckon_token_response(base_uri: T.Text) -> T.Text: return bcrypt.using(rounds=ROUNDS, truncate_error=True).hash(base_uri) async def reckon_token_response( base_uri: T.Text, loop: asyncio.AbstractEventLoop ) -> T.Text: """ A client and the server should agree if the pairing is adequate. This could be accomplished calculating on both sides an cryptographic secret. The current implementation uses bcrypt to compute the hash on server side and the client should verify the secret on its side. """ return await push_to_loop(loop, EXECUTOR, _reckon_token_response, base_uri) async def get_set_operator( request: web.Request, user: T.Text, password: T.Text ) -> T.Tuple[AsyncOperator, Session]: session = await get_session(request) op = None if "async_operator" in session: user_session = USERS.get(session["async_operator"]) op = OPERATORS.get(user_session) else: session = await new_session(request) if op is None or session.new: base_uri = request.app["base_uri"] debug = request.app["debug"] headless = request.app["headless"] op = AsyncOperator(base_uri, name=user, headless=headless, debug=debug) USERS[user] = UserSession(user) session["async_operator"] = user OPERATORS[USERS[user]] = op return op, session def add_static_routes(log: logging.Logger) -> None: static_assets = [ os.path.abspath(os.path.join(BASE_PATH, path)) for path in os.listdir(BASE_PATH) if os.path.isdir(os.path.join(BASE_PATH, path)) ] for asset in static_assets: asset_path = os.path.relpath(asset, BASE_PATH) log.debug(f"Linking: {asset_path} -> {asset}") routes.static(f"/{asset_path}", asset) @routes.get("/") async def home_handle(request: web.Request) -> web.Response: return web.FileResponse(os.path.join(BASE_PATH, "index.html")) @routes.get("/favicon.ico") async def favicon_handle(request: web.Request) -> web.Response: return web.FileResponse(os.path.join(BASE_PATH, "favicon.ico")) @routes.get("/api/login") @routes.get("/api/badge") @routes.get("/api/status") async def routing_handler(request: web.Request) -> web.Response: alog.debug("(%s) %s", request.method, request.path) session = await get_session(request) op = session.get("async_operator") _logged_in = True if op else False alog.info("%s - is%s in session", request.path, " NOT" if not _logged_in else "") return web.json_response({"logged_in": _logged_in}) @routes.get("/api/ping") async def ping_handler(request: web.Request) -> web.Response: alog.debug("pinged on %s", request.path) resp_data = await reckon_token_response(request.app["base_uri"], request.app.loop) alog.debug("ping response: %s", resp_data) return web.json_response({"hash": resp_data}) @routes.post("/api/login") async def login_handler(request: web.Request) -> web.Response: data = await request.json() user = data.get("username") password = data.get("password") if not user or not password: alog.debug("login - missing username or password: %s", data) return web.json_response({"error": "Missing username or password"}, status=401) op, session = await get_set_operator(request, user, password) alog.debug("login - user: %s, password: %s", user, password) res = await login(op, user, password) alog.debug("login result: %s", res) if not res: session.invalidate() alog.info("Login failed; session invalidated.") return web.json_response({"logged_in": res}, status=200) @routes.post("/api/logout") async def logout_handler(request: web.Request) -> web.Response: alog.debug("logout") session = await get_session(request) user_session = USERS.get(session["async_operator"], UserSession("NOONE")) op = OPERATORS.get(user_session) if not op: return web.json_response( {"error": "No session", "logged_in": False}, status=401 ) res = await logout(op) session.invalidate() alog.debug("logout result: %s", res) # FIX: assess if better to invalidate session and dump the browser instance. del user_session return web.json_response({"logged_in": res}, status=200) @routes.post("/api/checkin") async def checkin_handler(request: web.Request) -> web.Response: alog.debug("checkin") session = await get_session(request) user_session = USERS.get(session["async_operator"], UserSession("NOONE")) op = OPERATORS.get(user_session) if not op: return web.json_response( {"error": "No session", "logged_in": False}, status=401 ) res = await checkin(op) alog.debug("checkin result: %s", res) return web.json_response({"checked_in": res, "logged_in": True}, status=200) @routes.post("/api/checkout") async def checkout_handler(request: web.Request) -> web.Response: alog.debug("checkout") session = await get_session(request) user_session = USERS.get(session["async_operator"], UserSession("NOONE")) op = OPERATORS.get(user_session) if not op: return web.json_response( {"error": "No session", "logged_in": False}, status=401 ) res = await checkout(op) alog.debug("checkout result: %s", res) return web.json_response({"checked_in": res, "logged_in": True}, status=200) @routes.get("/api/movements") async def movements_handle(request: web.Request) -> web.Response: alog.debug("movements") session = await get_session(request) user_session = USERS.get(session.get("async_operator"), UserSession("NOONE")) op = OPERATORS.get(user_session) if not op: alog.debug("Missing session") return web.json_response( {"error": "No session", "logged_in": False}, status=401 ) res = await status(op) alog.debug("movements result: %s", res) if not res: return web.json_response( {"error": "No movements found", "logged_in": True, "checked_in": False}, status=404, ) movements = [] for r in res: if r and len(r) == 2: movements.append({"time": r[1], "type": r[0]}) resp_data: T.Dict[T.Text, T.Any] = {"movements": movements} resp_data["logged_in"] = True try: last_movement = list(movements[-1]) resp_data["checked_in"] = True if last_movement == "Entrata" else False except IndexError: alog.info("No movements found") return web.json_response(resp_data, status=200)