# -*- encoding: utf-8 -*- """ AsyncOperator is an async wrapper around the Operator sync operations. It puts in the loop background the execution of the same methods in the Operator. """ import asyncio from concurrent.futures import ThreadPoolExecutor import functools import logging from bot_z.operator import Operator alog = logging.getLogger("asyncio") async def _push_to_loop(loop, executor, func, *args, **kwargs): sync_task = [ loop.run_in_executor(executor, functools.partial(func, **kwargs), *args) ] res_set, _ = await asyncio.wait(sync_task, loop=loop) res = res_set.pop() return res.result() class AsyncOperator(object): """ This is the async version of the Operator. This class DOES NOT inherit from Operator: it contains an active instance of it. """ def __init__(self, base_uri: str, name: str, *args, **kwargs) -> None: self.name = name self.base_uri = base_uri self.op = Operator(base_uri, name, *args, **kwargs) self.executor = ThreadPoolExecutor(max_workers=2) self.loop = asyncio.get_event_loop() async def login(self, username: str, password: str) -> None: """Perform the login. Raise if failing.""" alog.debug("Logging in [%s]", self.name) _ = await _push_to_loop( self.loop, self.executor, self.op.login, username, password ) if not self.op.logged_in: raise RuntimeError("Failed to login.") alog.info("Logged in [%s]", self.name) async def logout(self) -> None: """Perform the logout. Raise if failing.""" alog.debug("Logging out [%s]", self.name) _ = await _push_to_loop(self.loop, self.executor, self.op.logout) if self.op.logged_in: raise RuntimeError("Failed to logout.") alog.info("Logged out [%s]", self.name) async def checkin(self) -> None: """Perform the checkin. Raise if failing.""" alog.debug("Checking in [%s]", self.name) _ = await _push_to_loop(self.loop, self.executor, self.op.check_in) if not self.op.checked_in: raise RuntimeError("Failed to checkin.") alog.info("Checked in [%s]", self.name) async def checkout(self) -> None: """Perform the checkout. Raise if failing.""" alog.debug("Checking out [%s]", self.name) _ = await _push_to_loop(self.loop, self.executor, self.op.check_out) if self.op.checked_in: raise RuntimeError("Failed to checkout.") alog.info("Checked out [%s]", self.name) @property def logged_in(self) -> bool: return self.op.logged_in @property def checked_in(self) -> bool: return self.op.checked_in