223 lines
7.5 KiB
Python
223 lines
7.5 KiB
Python
# -*- encoding: utf-8 -*-
|
|
"""
|
|
The REST endpoints.
|
|
"""
|
|
|
|
from aiohttp import web
|
|
import asyncio
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import pkg_resources
|
|
import typing as T
|
|
import weakref
|
|
|
|
from aiohttp_session import new_session, get_session, Session
|
|
from passlib.hash import bcrypt
|
|
|
|
from bot_z.async_operator import AsyncOperator, push_to_loop
|
|
from api.async_bot import login, logout, checkin, checkout, status
|
|
from api import BASE_URI, DEBUG
|
|
|
|
alog = logging.getLogger("api")
|
|
routes = web.RouteTableDef()
|
|
OPERATORS = weakref.WeakKeyDictionary(
|
|
{}
|
|
) # type: weakref.WeakKeyDictionary[UserSession, AsyncOperator]
|
|
USERS = {} # type: T.Dict[T.Text, UserSession]
|
|
BASE_PATH = pkg_resources.resource_filename(__name__, "assets")
|
|
EXECUTOR = ProcessPoolExecutor()
|
|
# WARN: the default il 12 rounds; both the server and the client shall compute
|
|
# this hash. The target client is a smartphone with poor performance on
|
|
# crypto computations, so we should keep this low, as the verification step
|
|
# is optionally enforced by the client.
|
|
ROUNDS = 6
|
|
|
|
|
|
class UserSession(object):
|
|
"""
|
|
Placeholder object to manipulate session life.
|
|
"""
|
|
|
|
def __init__(self, user):
|
|
self._user = user
|
|
|
|
|
|
def _reckon_token_response(base_uri: T.Text) -> T.Text:
|
|
return bcrypt.using(rounds=ROUNDS, truncate_error=True).hash(base_uri)
|
|
|
|
|
|
async def reckon_token_response(
|
|
base_uri: T.Text, loop: asyncio.AbstractEventLoop
|
|
) -> T.Text:
|
|
"""
|
|
A client and the server should agree if the pairing is adequate.
|
|
This could be accomplished calculating on both sides an cryptographic
|
|
secret. The current implementation uses bcrypt to compute the hash on
|
|
server side and the client should verify the secret on its side.
|
|
"""
|
|
return await push_to_loop(loop, EXECUTOR, _reckon_token_response, base_uri)
|
|
|
|
|
|
async def get_set_operator(
|
|
request: web.Request, user: T.Text, password: T.Text
|
|
) -> T.Tuple[AsyncOperator, Session]:
|
|
session = await get_session(request)
|
|
op = None
|
|
if "async_operator" in session:
|
|
user_session = USERS.get(session["async_operator"])
|
|
op = OPERATORS.get(user_session)
|
|
else:
|
|
session = await new_session(request)
|
|
|
|
if op is None or session.new:
|
|
base_uri = request.app["base_uri"]
|
|
debug = request.app["debug"]
|
|
headless = request.app["headless"]
|
|
op = AsyncOperator(base_uri, name=user, headless=headless, debug=debug)
|
|
USERS[user] = UserSession(user)
|
|
session["async_operator"] = user
|
|
OPERATORS[USERS[user]] = op
|
|
|
|
return op, session
|
|
|
|
|
|
def add_static_routes(log: logging.Logger) -> None:
|
|
static_assets = [
|
|
os.path.abspath(os.path.join(BASE_PATH, path))
|
|
for path in os.listdir(BASE_PATH)
|
|
if os.path.isdir(os.path.join(BASE_PATH, path))
|
|
]
|
|
for asset in static_assets:
|
|
asset_path = os.path.relpath(asset, BASE_PATH)
|
|
log.debug(f"Linking: {asset_path} -> {asset}")
|
|
routes.static(f"/{asset_path}", asset)
|
|
|
|
|
|
@routes.get("/")
|
|
async def home_handle(request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(BASE_PATH, "index.html"))
|
|
|
|
|
|
@routes.get("/favicon.ico")
|
|
async def favicon_handle(request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(BASE_PATH, "favicon.ico"))
|
|
|
|
|
|
@routes.get("/api/login")
|
|
@routes.get("/api/badge")
|
|
@routes.get("/api/status")
|
|
async def routing_handler(request: web.Request) -> web.Response:
|
|
alog.debug("(%s) %s", request.method, request.path)
|
|
session = await get_session(request)
|
|
op = session.get("async_operator")
|
|
_logged_in = True if op else False
|
|
alog.info("%s - is%s in session", request.path, " NOT" if not _logged_in else "")
|
|
return web.json_response({"logged_in": _logged_in})
|
|
|
|
|
|
@routes.get("/api/ping")
|
|
async def ping_handler(request: web.Request) -> web.Response:
|
|
alog.debug("pinged on %s", request.path)
|
|
resp_data = await reckon_token_response(request.app["base_uri"], request.app.loop)
|
|
alog.debug("ping response: %s", resp_data)
|
|
return web.json_response({"hash": resp_data})
|
|
|
|
|
|
@routes.post("/api/login")
|
|
async def login_handler(request: web.Request) -> web.Response:
|
|
data = await request.json()
|
|
user = data.get("username")
|
|
password = data.get("password")
|
|
if not user or not password:
|
|
alog.debug("login - missing username or password: %s", data)
|
|
return web.json_response({"error": "Missing username or password"}, status=401)
|
|
op, session = await get_set_operator(request, user, password)
|
|
alog.debug("login - user: %s, password: %s", user, password)
|
|
res = await login(op, user, password)
|
|
alog.debug("login result: %s", res)
|
|
if not res:
|
|
session.invalidate()
|
|
alog.info("Login failed; session invalidated.")
|
|
return web.json_response({"logged_in": res}, status=200)
|
|
|
|
|
|
@routes.post("/api/logout")
|
|
async def logout_handler(request: web.Request) -> web.Response:
|
|
alog.debug("logout")
|
|
session = await get_session(request)
|
|
user_session = USERS.get(session["async_operator"], UserSession("NOONE"))
|
|
op = OPERATORS.get(user_session)
|
|
if not op:
|
|
return web.json_response(
|
|
{"error": "No session", "logged_in": False}, status=401
|
|
)
|
|
res = await logout(op)
|
|
session.invalidate()
|
|
alog.debug("logout result: %s", res)
|
|
# FIX: assess if better to invalidate session and dump the browser instance.
|
|
del user_session
|
|
return web.json_response({"logged_in": res}, status=200)
|
|
|
|
|
|
@routes.post("/api/checkin")
|
|
async def checkin_handler(request: web.Request) -> web.Response:
|
|
alog.debug("checkin")
|
|
session = await get_session(request)
|
|
op = OPERATORS.get(session.get("async_operator"))
|
|
if not op:
|
|
return web.json_response(
|
|
{"error": "No session", "logged_in": False}, status=401
|
|
)
|
|
res = await checkin(op)
|
|
alog.debug("checkin result: %s", res)
|
|
return web.json_response({"checked_in": res, "logged_in": True}, status=200)
|
|
|
|
|
|
@routes.post("/api/checkout")
|
|
async def checkout_handler(request: web.Request) -> web.Response:
|
|
alog.debug("checkout")
|
|
session = await get_session(request)
|
|
op = OPERATORS.get(session.get("async_operator"))
|
|
if not op:
|
|
return web.json_response(
|
|
{"error": "No session", "logged_in": False}, status=401
|
|
)
|
|
res = await checkout(op)
|
|
alog.debug("checkout result: %s", res)
|
|
return web.json_response({"checked_in": res, "logged_in": True}, status=200)
|
|
|
|
|
|
@routes.get("/api/movements")
|
|
async def movements_handle(request: web.Request) -> web.Response:
|
|
alog.debug("movements")
|
|
session = await get_session(request)
|
|
user_session = USERS.get(session.get("async_operator"), UserSession("NOONE"))
|
|
op = OPERATORS.get(user_session)
|
|
if not op:
|
|
alog.debug("Missing session")
|
|
return web.json_response(
|
|
{"error": "No session", "logged_in": False}, status=401
|
|
)
|
|
res = await status(op)
|
|
alog.debug("movements result: %s", res)
|
|
if not res:
|
|
return web.json_response(
|
|
{"error": "No movements found", "logged_in": True, "checked_in": False},
|
|
status=404,
|
|
)
|
|
movements = []
|
|
for r in res:
|
|
if r and len(r) == 2:
|
|
movements.append({"time": r[1], "type": r[0]})
|
|
resp_data: T.Dict[T.Text, T.Any] = {"movements": movements}
|
|
resp_data["logged_in"] = True
|
|
try:
|
|
last_movement = list(movements[-1])
|
|
resp_data["checked_in"] = True if last_movement == "Entrata" else False
|
|
except IndexError:
|
|
alog.info("No movements found")
|
|
|
|
return web.json_response(resp_data, status=200)
|