185 lines
5.1 KiB
Python
185 lines
5.1 KiB
Python
#! -*- encoding: utf-8 -*-
|
|
|
|
"""Utilities to better handle the project."""
|
|
|
|
import errno
|
|
import logging
|
|
import json
|
|
import os
|
|
import sys
|
|
import typing as T
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
|
|
sh = logging.StreamHandler(stream=sys.stdout)
|
|
cli_filter = logging.Filter(__name__)
|
|
sh.addFilter(cli_filter)
|
|
sh.setFormatter(logging.Formatter("%(message)s"))
|
|
logger.addHandler(sh)
|
|
|
|
ENC = json.JSONEncoder()
|
|
DEC = json.JSONDecoder()
|
|
|
|
|
|
def cmd_marshal(name: str, cmd: str, **kwargs) -> str:
|
|
"""
|
|
Serializes a command (a python dict) in a JSON object string.
|
|
"""
|
|
cmd_struct = dict(name=name, cmd=cmd, **kwargs)
|
|
logger.debug("Command to be marshalled: %s", cmd_struct)
|
|
return ENC.encode(cmd_struct)
|
|
|
|
|
|
def cmd_unmarshal(cmd: str) -> T.Dict[str, T.Any]:
|
|
"""
|
|
Deserializes a command (a python dict) in a JSON object string.
|
|
"""
|
|
logger.debug("Command to be unmarshalled: %s", cmd)
|
|
return DEC.decode(cmd)
|
|
|
|
|
|
class Fifo:
|
|
"""
|
|
Iterator to continuously read the command fifo.
|
|
Supports also write operations if opened with mode="w".
|
|
"""
|
|
|
|
def __init__(self, fifopath: str, mode: str = "r") -> None:
|
|
try:
|
|
os.mkfifo(fifopath)
|
|
logger.info("Control pipe opened at: %s", fifopath)
|
|
except OSError as err:
|
|
if err.errno != errno.EEXIST:
|
|
logger.critical("Could not open control pipe at: %s", fifopath)
|
|
raise
|
|
self.path = fifopath
|
|
self.fh = open(fifopath, mode)
|
|
|
|
def __iter__(self): # pragma: noqa
|
|
return self
|
|
|
|
def __next__(self) -> str:
|
|
data = self.fh.read()
|
|
# This is a never ending iterator. We should exit the
|
|
# loop independently.
|
|
return data
|
|
|
|
def __del__(self) -> None:
|
|
self.fh.close()
|
|
|
|
def __enter__(self) -> T.TextIO:
|
|
return self.fh
|
|
|
|
def __exit__(self, *args) -> None:
|
|
self.fh.close()
|
|
|
|
def write(self, data) -> None:
|
|
logger.debug("%s ~ Writing: %s", self.path, data)
|
|
self.fh.write(data)
|
|
self.fh.flush()
|
|
|
|
|
|
class PLifo:
|
|
"""
|
|
Implements an on-disk persistent LIFO.
|
|
Casts everything to str.
|
|
"""
|
|
|
|
def __init__(self, path: str) -> None:
|
|
self.path = path
|
|
if not os.path.exists(path):
|
|
with open(path, "w") as lifo:
|
|
lifo.write("")
|
|
self.content = []
|
|
self.fh = None
|
|
|
|
def __enter__(self) -> T.TextIO:
|
|
with open(self.path, "r") as fr:
|
|
self.content = [line.strip("\n") for line in fr.readlines()]
|
|
self.fh = open(self.path, "w")
|
|
return self
|
|
|
|
def __exit__(self, *args) -> None:
|
|
for line in self.content:
|
|
self.fh.write("{}\n".format(line.strip("\n")))
|
|
self.fh.close()
|
|
self.fh = None
|
|
|
|
def push(self, value: T.Any) -> None:
|
|
"""
|
|
Pushes on the stack. It is an in-memory structure.
|
|
The variations are written on disk when exiting the context.
|
|
"""
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
self.content.append(str(value))
|
|
|
|
def pop(self) -> str:
|
|
"""
|
|
Pops on the stack. It is an in-memory structure.
|
|
The variations are written on disk when exiting the context.
|
|
"""
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
return self.content.pop()
|
|
|
|
@classmethod
|
|
def Push(cls, path: str, value: T.Any) -> None:
|
|
"""
|
|
Deploys the context and pushes onto the stack, then exits the
|
|
context.
|
|
"""
|
|
with cls(path) as _lifo:
|
|
_lifo.push(value)
|
|
|
|
@classmethod
|
|
def Pop(cls, path: str) -> str:
|
|
"""
|
|
Deploys the context and pops from the stack, then exits the
|
|
context.
|
|
"""
|
|
with cls(path) as _lifo:
|
|
_value = _lifo.pop()
|
|
return _value
|
|
|
|
def all(self) -> T.Iterable[str]:
|
|
"""
|
|
Returns
|
|
"""
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
return self.content
|
|
|
|
@classmethod
|
|
def All(cls, path: str) -> T.Iterable[str]:
|
|
"""
|
|
Deploys the context and returns the full list of elements,
|
|
then exits the context.
|
|
"""
|
|
with cls(path) as _lifo:
|
|
return _lifo.content
|
|
|
|
def __contains__(self, *args) -> bool:
|
|
return [el in self.content for el in args]
|
|
|
|
def __len__(self) -> int:
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
return len(self.content)
|
|
|
|
def __iter__(self) -> T.Iterable:
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
return self.content
|
|
|
|
def __next__(self) -> str:
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
return next(self.content)
|
|
|
|
def last(self) -> str:
|
|
if self.fh is None:
|
|
raise RuntimeError("Called out of context. Open the PLifo firts.")
|
|
return self.content[-1]
|