Use typing.

This commit is contained in:
sfigato 2019-08-01 16:12:55 +02:00 committed by blallo
parent 2015bd40c2
commit 8a11946627
Signed by: blallo
GPG Key ID: 0CBE577C9B72DC3F
4 changed files with 17 additions and 7 deletions

View File

@ -6,6 +6,7 @@ Asyncio wrapper for AsyncOperator methods.
import asyncio import asyncio
from contextlib import contextmanager from contextlib import contextmanager
import typing as T
from bot_z.async_operator import AsyncOperator from bot_z.async_operator import AsyncOperator
from bot_z.exceptions import OperationFailed from bot_z.exceptions import OperationFailed
@ -19,14 +20,14 @@ def this_loop(loop):
asyncio.set_event_loop(_loop) asyncio.set_event_loop(_loop)
def init(base_url: str, name: str) -> AsyncOperator: def init(base_url: T.Text, name: T.Text) -> AsyncOperator:
""" """
Initialize the stateful object. Initialize the stateful object.
""" """
return AsyncOperator(base_url, name) return AsyncOperator(base_url, name)
async def login(op: AsyncOperator, username: str, password: str) -> bool: async def login(op: AsyncOperator, username: T.Text, password: T.Text) -> bool:
""" """
Executes the login asynchronously and returns Executes the login asynchronously and returns
the AsyncOperator object. the AsyncOperator object.

View File

@ -17,12 +17,12 @@ from api import BASE_URI
alog = logging.getLogger("api") alog = logging.getLogger("api")
routes = web.RouteTableDef() routes = web.RouteTableDef()
OPERATORS = {} OPERATORS = {} # type: T.Dict[T.Text, AsyncOperator]
async def get_set_operator( async def get_set_operator(
request: web.Request, user: T.Text, password: T.Text request: web.Request, user: T.Text, password: T.Text
) -> AsyncOperator: ) -> T.Tuple[AsyncOperator, Session]:
session = await get_session(request) session = await get_session(request)
if "async_operator" in session: if "async_operator" in session:
op = OPERATORS[session["async_operator"]] op = OPERATORS[session["async_operator"]]

View File

@ -10,6 +10,7 @@ import asyncio
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import functools import functools
import logging import logging
import typing as T
from bot_z.operator import Operator from bot_z.operator import Operator
from bot_z.exceptions import OperationFailed from bot_z.exceptions import OperationFailed
@ -18,7 +19,13 @@ from bot_z.exceptions import OperationFailed
alog = logging.getLogger("asyncio") alog = logging.getLogger("asyncio")
async def _push_to_loop(loop, executor, func, *args, **kwargs): async def _push_to_loop(
loop: asyncio.AbstractEventLoop,
executor: ThreadPoolExecutor,
func: T.Callable,
*args,
**kwargs
) -> T.Any:
sync_task = [ sync_task = [
loop.run_in_executor(executor, functools.partial(func, **kwargs), *args) loop.run_in_executor(executor, functools.partial(func, **kwargs), *args)
] ]

View File

@ -277,7 +277,7 @@ class Operator(wd.Firefox):
except NoSuchElementException: except NoSuchElementException:
pass pass
def get_movements(self) -> T.List[T.Tuple[T.Text]]: def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
self._switch_to_container() self._switch_to_container()
try: try:
result = [] # type: T.List[T.Tuple[T.Text]] result = [] # type: T.List[T.Tuple[T.Text]]
@ -286,7 +286,9 @@ class Operator(wd.Firefox):
) )
for row in movements_table: for row in movements_table:
data = row.text.strip().split("\n") # type: T.List[T.Text] data = row.text.strip().split("\n") # type: T.List[T.Text]
result.append(tuple(i.strip() for i in data if i.strip())) # noqa result.append( # type: ignore
tuple(i.strip() for i in data if i.strip())
)
except NoSuchElementException: except NoSuchElementException:
return [] return []
return result return result