BotZ/bot_z/async_operator.py

103 lines
3.3 KiB
Python

# -*- encoding: utf-8 -*-
"""
AsyncOperator is an async wrapper around the Operator sync operations.
It puts in the loop background the execution of the same methods in
the Operator.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor, Executor
import functools
import logging
import typing as T
from bot_z.operator import Operator
from bot_z.exceptions import OperationFailed
alog = logging.getLogger("asyncio")
async def push_to_loop(
loop: asyncio.AbstractEventLoop,
executor: Executor,
func: T.Callable,
*args,
**kwargs
) -> T.Any:
sync_task = [
loop.run_in_executor(executor, functools.partial(func, **kwargs), *args)
]
res_set, _ = await asyncio.wait(sync_task, loop=loop)
res = res_set.pop()
return res.result()
# TODO: make it JSON-serializable
class AsyncOperator(object):
"""
This is the async version of the Operator.
This class DOES NOT inherit from Operator: it contains an
active instance of it.
"""
def __init__(self, base_uri: str, name: str, *args, **kwargs) -> None:
self.name = name
self.base_uri = base_uri
self.op = Operator(base_uri, name, *args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=2)
self.loop = asyncio.get_event_loop()
async def login(self, username: str, password: str) -> None:
"""Perform the login. Raise if failing."""
alog.debug("Logging in [%s]", self.name)
_ = await push_to_loop(
self.loop, self.executor, self.op.login, username, password
)
if not self.op.logged_in:
raise OperationFailed("Failed to login.")
alog.info("Logged in [%s]", self.name)
async def logout(self) -> None:
"""Perform the logout. Raise if failing."""
alog.debug("Logging out [%s]", self.name)
_ = await push_to_loop(self.loop, self.executor, self.op.logout)
if self.op.logged_in:
raise OperationFailed("Failed to logout.")
alog.info("Logged out [%s]", self.name)
async def checkin(self) -> None:
"""Perform the checkin. Raise if failing."""
alog.debug("Checking in [%s]", self.name)
_ = await push_to_loop(self.loop, self.executor, self.op.check_in)
if not self.op.checked_in:
raise OperationFailed("Failed to checkin.")
alog.info("Checked in [%s]", self.name)
async def checkout(self) -> None:
"""Perform the checkout. Raise if failing."""
alog.debug("Checking out [%s]", self.name)
_ = await push_to_loop(self.loop, self.executor, self.op.check_out)
if self.op.checked_in:
raise OperationFailed("Failed to checkout.")
alog.info("Checked out [%s]", self.name)
async def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
"""
Retrieves the list of movements as a list of tuples.
The list may be empty.
"""
alog.debug("Retrieving the list of movements [%s]", self.name)
res = await push_to_loop(self.loop, self.executor, self.op.get_movements)
alog.info("List of movements [%s]: %s", self.name, res)
return res
@property
def logged_in(self) -> bool:
return self.op.logged_in
@property
def checked_in(self) -> bool:
return self.op.checked_in