Moving utils to bot_z.utils

master
blallo 2019-01-29 15:56:00 +01:00 committed by blallo
parent 7290ec6883
commit 1805327d39
Signed by: blallo
GPG Key ID: 0CBE577C9B72DC3F
2 changed files with 156 additions and 41 deletions

151
bot_z/utils.py 100644
View File

@ -0,0 +1,151 @@
#! -*- encoding: utf-8 -*-
"""Utilities to better handle the project."""
import errno
import logging
import os
import typing as T
logger = logging.getLogger(__name__)
class Fifo:
"""
Iterator to continuously read the command fifo.
Supports also write operations if opened with mode="w".
"""
def __init__(self, fifopath: str, mode: str = "r") -> None:
try:
os.mkfifo(fifopath)
logger.info("Control pipe opened at: %s", fifopath)
except OSError as err:
if err.errno != errno.EEXIST:
logger.critical("Could not open control pipe at: %s", fifopath)
raise
self.fh = open(fifopath, mode)
def __iter__(self): # pragma: noqa
return self
def __next__(self) -> str:
data = self.fh.read()
# This is a never ending iterator. We should exit the
# loop independently.
return data
def __del__(self) -> None:
self.fh.close()
def __enter__(self) -> T.TextIO:
return self.fh
def __exit__(self, *args) -> None:
self.fh.close()
def write(self, data) -> None:
self.fh.write(data)
self.fh.flush()
class PLifo:
"""
Implements an on-disk persistent LIFO.
Casts everything to str.
"""
def __init__(self, path: str) -> None:
self.path = path
self.content = []
self.fh = None
def __enter__(self) -> T.TextIO:
with open(self.path, "r") as fr:
self.content = [line.strip("\n") for line in fr.readlines()]
self.fh = open(self.path, "w")
return self
def __exit__(self, *args) -> None:
for line in self.content:
self.fh.write("{}\n".format(line.strip("\n")))
self.fh.close()
self.fh = None
def push(self, value: T.Any) -> None:
"""
Pushes on the stack. It is an in-memory structure.
The variations are written on disk when exiting the context.
"""
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
self.content.append(str(value))
def pop(self) -> str:
"""
Pops on the stack. It is an in-memory structure.
The variations are written on disk when exiting the context.
"""
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content.pop()
@classmethod
def Push(cls, path: str, value: T.Any) -> None:
"""
Deploys the context and pushes onto the stack, then exits the
context.
"""
with cls(path) as _lifo:
_lifo.push(value)
@classmethod
def Pop(cls, path: str) -> str:
"""
Deploys the context and pops from the stack, then exits the
context.
"""
with cls(path) as _lifo:
_value = _lifo.pop()
return _value
def all(self) -> T.Iterable[str]:
"""
Returns
"""
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content
@classmethod
def All(cls, path: str) -> T.Iterable[str]:
"""
Deploys the context and returns the full list of elements,
then exits the context.
"""
with cls(path) as _lifo:
return _lifo.content
def __contains__(self, *args) -> bool:
return [el in self.content for el in args]
def __len__(self) -> int:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return len(self.content)
def __iter__(self) -> T.Iterable:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content
def __next__(self) -> str:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return next(self.content)
def last(self) -> str:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content[-1]

View File

@ -136,52 +136,16 @@ def daemon_process(
listen_commands(fifo_path, cmd_map)
class Fifo(object):
"""
Iterator to continuously read the command fifo.
Supports also write operations if opened with mode="r".
"""
def __init__(self, fifopath: str, mode: str = "r") -> None:
try:
os.mkfifo(fifopath)
logger.info("Control pipe opened at: %s", fifopath)
except OSError as err:
if err.errno != errno.EEXIST:
logger.critical("Could not open control pipe at: %s", fifopath)
raise
self.fh = open(fifopath, mode)
def __iter__(self): # pragma: noqa
return self
def __next__(self) -> str:
data = self.fh.read()
# This is a never ending iterator. We should exit the
# loop independently.
return data
def __del__(self) -> None:
self.fh.close()
def __enter__(self) -> T.TextIO:
return self.fh
def __exit__(self, *args) -> None:
self.fh.close()
def write(self, data) -> None:
self.fh.write(data)
self.fh.flush()
def listen_commands(fifopath: str, cmd_map: T.Dict[str, T.Callable]) -> None:
def listen_commands(name: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]) -> None:
"""
The main loop to listen incoming commands.
"""
while True:
fifo = Fifo(fifopath, "r")
for command in fifo:
for cmd_str in fifo:
rcpt, command = cmd_str.split('\u2193')
if rcpt != name:
continue
if command == "stop":
logger.debug("Received command: %s", "stop")
cmd_map["stop"]()