diff --git a/bot_z/async_operator.py b/bot_z/async_operator.py new file mode 100644 index 0000000..559a6ec --- /dev/null +++ b/bot_z/async_operator.py @@ -0,0 +1,83 @@ +# -*- encoding: utf-8 -*- + +""" +AsyncOperator is an async wrapper around the Operator sync operations. +It puts in the loop background the execution of the same methods in +the Operator. +""" + +import asyncio +from concurrent.futures import ThreadPoolExecutor +import functools +import logging + +from bot_z.operator import Operator + + +alog = logging.getLogger("asyncio") + + +async def _push_to_loop(loop, executor, func, *args, **kwargs): + sync_task = [ + loop.run_in_executor(executor, functools.partial(func, **kwargs), *args) + ] + res_set, _ = await asyncio.wait(sync_task, loop=loop) + res = res_set.pop() + return res.result() + + +class AsyncOperator(object): + """ + This is the async version of the Operator. + This class DOES NOT inherit from Operator: it contains an + active instance of it. + """ + + def __init__(self, base_uri: str, name: str, *args, **kwargs) -> None: + self.name = name + self.base_uri = base_uri + self.op = Operator(base_uri, name, *args, **kwargs) + self.executor = ThreadPoolExecutor(max_workers=2) + self.loop = asyncio.get_event_loop() + + async def login(self, username: str, password: str) -> None: + """Perform the login. Raise if failing.""" + alog.debug("Logging in [%s]", self.name) + _ = await _push_to_loop( + self.loop, self.executor, self.op.login, username, password + ) + if not self.op.logged_in: + raise RuntimeError("Failed to login.") + alog.info("Logged in [%s]", self.name) + + async def logout(self) -> None: + """Perform the logout. Raise if failing.""" + alog.debug("Logging out [%s]", self.name) + _ = await _push_to_loop(self.loop, self.executor, self.op.logout) + if self.op.logged_in: + raise RuntimeError("Failed to logout.") + alog.info("Logged out [%s]", self.name) + + async def checkin(self) -> None: + """Perform the checkin. Raise if failing.""" + alog.debug("Checking in [%s]", self.name) + _ = await _push_to_loop(self.loop, self.executor, self.op.check_in) + if not self.op.checked_in: + raise RuntimeError("Failed to checkin.") + alog.info("Checked in [%s]", self.name) + + async def checkout(self) -> None: + """Perform the checkout. Raise if failing.""" + alog.debug("Checking out [%s]", self.name) + _ = await _push_to_loop(self.loop, self.executor, self.op.check_out) + if self.op.checked_in: + raise RuntimeError("Failed to checkout.") + alog.info("Checked out [%s]", self.name) + + @property + def logged_in(self) -> bool: + return self.op.logged_in + + @property + def checked_in(self) -> bool: + return self.op.checked_in