# -*- encoding: utf-8 -*- """ AsyncOperator is an async wrapper around the Operator sync operations. It puts in the loop background the execution of the same methods in the Operator. """ import asyncio from concurrent.futures import ThreadPoolExecutor, Executor import functools import logging import typing as T from bot_z.operator import Operator from bot_z.exceptions import OperationFailed alog = logging.getLogger("asyncio") async def push_to_loop( loop: asyncio.AbstractEventLoop, executor: Executor, func: T.Callable, *args, **kwargs ) -> T.Any: sync_task = [ loop.run_in_executor(executor, functools.partial(func, **kwargs), *args) ] res_set, _ = await asyncio.wait(sync_task, loop=loop) res = res_set.pop() return res.result() # TODO: make it JSON-serializable class AsyncOperator(object): """ This is the async version of the Operator. This class DOES NOT inherit from Operator: it contains an active instance of it. """ def __init__(self, base_uri: str, name: str, *args, **kwargs) -> None: self.name = name self.base_uri = base_uri self.op = Operator(base_uri, name, *args, **kwargs) self.executor = ThreadPoolExecutor(max_workers=2) self.loop = asyncio.get_event_loop() async def login(self, username: str, password: str) -> None: """Perform the login. Raise if failing.""" alog.debug("Logging in [%s]", self.name) _ = await push_to_loop( self.loop, self.executor, self.op.login, username, password ) if not self.op.logged_in: raise OperationFailed("Failed to login.") alog.info("Logged in [%s]", self.name) async def logout(self) -> None: """Perform the logout. Raise if failing.""" alog.debug("Logging out [%s]", self.name) _ = await push_to_loop(self.loop, self.executor, self.op.logout) if self.op.logged_in: raise OperationFailed("Failed to logout.") alog.info("Logged out [%s]", self.name) async def checkin(self) -> None: """Perform the checkin. Raise if failing.""" alog.debug("Checking in [%s]", self.name) _ = await push_to_loop(self.loop, self.executor, self.op.check_in) if not self.op.checked_in: raise OperationFailed("Failed to checkin.") alog.info("Checked in [%s]", self.name) async def checkout(self) -> None: """Perform the checkout. Raise if failing.""" alog.debug("Checking out [%s]", self.name) _ = await push_to_loop(self.loop, self.executor, self.op.check_out) if self.op.checked_in: raise OperationFailed("Failed to checkout.") alog.info("Checked out [%s]", self.name) async def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]: """ Retrieves the list of movements as a list of tuples. The list may be empty. """ alog.debug("Retrieving the list of movements [%s]", self.name) res = await push_to_loop(self.loop, self.executor, self.op.get_movements) alog.info("List of movements [%s]: %s", self.name, res) return res @property def logged_in(self) -> bool: return self.op.logged_in @property def checked_in(self) -> bool: return self.op.checked_in