Compare commits

...

No commits in common. "0.1.0" and "master" have entirely different histories.

48 changed files with 13621 additions and 502 deletions

5
.dockerignore 100644
View File

@ -0,0 +1,5 @@
/dist/*
/bot.z_web/node_modules/*
/.*yaml
/.*yml
/Makefile

View File

@ -1,15 +0,0 @@
* Bot_Z version:
* Python version:
* Operating System:
### Description
Describe what you were trying to get done.
Tell us what happened, what went wrong, and what you expected to happen.
### What I Did
```
Paste the command(s) you ran and the output.
If there was a crash, please include the traceback here.
```

5
.gitignore vendored
View File

@ -1,5 +1,8 @@
# Postinstall bineries
# Postinstall binaries and assets
/bot_z/bin/*
/api/assets/*
# debug config file
/.botz.yaml
# Byte-compiled / optimized / DLL files
__pycache__/

3
.pylintrc 100644
View File

@ -0,0 +1,3 @@
[FORMAT]
#py3k
max-line-length=88

36
Dockerfile 100644
View File

@ -0,0 +1,36 @@
FROM node AS node
FROM selenium/standalone-firefox AS ff
FROM python:3.7-buster
LABEL author="Blallo"
LABEL email="blallo@autistici.org"
LABEL io.troubles.botz.release-date="2019-09-25"
LABEL io.troubles.botz.version="1.1.3"
ENV DEBIAN_FRONTEND=noninteractive
COPY --from=node /usr/local /usr/local
COPY --from=node /opt /opt
COPY --from=ff /opt/firefox-latest /opt/firefox-latest
COPY --from=ff /opt/geckodriver* /opt/
COPY . /app
COPY entrypoint.sh /srv/
WORKDIR /app/bot.z_web
RUN apt-get update \
&& apt-get install -y $(apt-cache depends firefox-esr| awk '/Depends:/{print$2}') \
&& ln -s /opt/firefox-latest/firefox /usr/bin/firefox \
&& ln -s /opt/geckodriver* /usr/bin/geckodriver \
&& yarn install \
&& yarn build \
&& mkdir -p /app/api/assets \
&& cp -r build/* /app/api/assets/ \
&& rm -r node_modules/ /var/lib/apt/lists/* /var/cache/apt/* /tmp/*
WORKDIR /app
RUN python setup.py develop
EXPOSE 3003
VOLUME ["/app/bot.z_web/node_modules"]
ENTRYPOINT ["/srv/entrypoint.sh"]

View File

@ -1,40 +1,16 @@
.PHONY: clean clean-test clean-pyc clean-build docs help
.DEFAULT_GOAL := help
define BROWSER_PYSCRIPT
import os, webbrowser, sys
try:
from urllib import pathname2url
except:
from urllib.request import pathname2url
.PHONY: clean clean-test clean-pyc clean-build
webbrowser.open("file://" + pathname2url(os.path.abspath(sys.argv[1])))
endef
export BROWSER_PYSCRIPT
define PRINT_HELP_PYSCRIPT
import re, sys
for line in sys.stdin:
match = re.match(r'^([a-zA-Z_-]+):.*?## (.*)$$', line)
if match:
target, help = match.groups()
print("%-20s %s" % (target, help))
endef
export PRINT_HELP_PYSCRIPT
BROWSER := python -c "$$BROWSER_PYSCRIPT"
help:
@python -c "$$PRINT_HELP_PYSCRIPT" < $(MAKEFILE_LIST)
VERSION = 1.1.3
clean: clean-build clean-pyc clean-test ## remove all build, test, coverage and Python artifacts
clean-build: ## remove build artifacts
rm -fr build/
rm -fr dist/
rm -fr .eggs/
find . -name '*.egg-info' -exec rm -fr {} +
find . -name '*.egg' -exec rm -f {} +
rm -fr bot_z.web/build/
clean-pyc: ## remove Python file artifacts
find . -name '*.pyc' -exec rm -f {} +
@ -47,41 +23,41 @@ clean-test: ## remove test and coverage artifacts
rm -f .coverage
rm -fr htmlcov/
lint: ## check style with flake8
flake8 bot_z tests
release: clean build docker-release
test: ## run tests quickly with the default Python
python setup.py test
test-all: ## run tests on every Python version with tox
tox
coverage: ## check code coverage quickly with the default Python
coverage run --source bot_z setup.py test
coverage report -m
coverage html
$(BROWSER) htmlcov/index.html
docs: ## generate Sphinx HTML documentation, including API docs
rm -f docs/bot_z.rst
rm -f docs/modules.rst
sphinx-apidoc -o docs/ bot_z
$(MAKE) -C docs clean
$(MAKE) -C docs html
$(BROWSER) docs/_build/html/index.html
servedocs: docs ## compile the docs watching for changes
watchmedo shell-command -p '*.rst' -c '$(MAKE) -C docs html' -R -D .
release: clean ## package and upload a release
python setup.py sdist upload
python setup.py bdist_wheel upload
dist: clean ## builds source and wheel package
sdist:
python setup.py sdist
python setup.py bdist_wheel
ls -l dist
build: sdist build-linux64 build-linux32 build-win32 build-win64 build-macos
build-linux64:
python setup.py bdist_wheel --plat-name manylinux1-x86_64
sdist/bot_z-$(VERSION)-py3-none-manylinux1_x86_64.whl: build-linux64
build-linux32:
python setup.py bdist_wheel --plat-name manylinux1-i686
sdist/bot_z-$(VERSION)-py3-none-manylinux1_i686.whl: build-linux32
build-win32:
python setup.py bdist_wheel --plat-name win32
sdist/bot_z-$(VERSION)-py3-none-win32.whl: build-win32
build-win64:
python setup.py bdist_wheel --plat-name win-amd64
sdist/bot_z-$(VERSION)-py3-none-manylinux1_win-amd64.whl: build-win64
build-macos:
python setup.py bdist_wheel --plat-name macosx
sdist/bot_z-$(VERSION)-py3-none-macosx.whl: build-macos
install: clean ## install the package to the active Python's site-packages
python setup.py install
docker-release:
docker build -t botz:latest .
docker tag botz:latest botz:$(VERSION)

View File

@ -13,6 +13,62 @@ Features
* Login/Logout
* Check in/Check out
* web ui
Develop
-------
To allow local development you either need a configuration file named
`.botz.yaml` in the root of the project with something like this:
```
---
base_uri: "<your_target_uri>"
debug: true
headless: true
log:
level: DEBUG
syslog: false
http:
bind_addr:
- "127.0.0.1"
port: 3003
cookie_secure: false
cors_allow: "http://localhost:3000"
```
Read the docstring in `api/conf.py` to understand the menaning of the
various parameters.
*Do not `pip install -e .`*. It will miss the geckodriver download step.
Take a look at the provided `Dockerfile`.
You can either run it:
```
$ docker build -t botz:latest .
$ docker run -v $PWD:/app -p "3003:3003" botz
```
and find a working app at `http://localhost:3003`.
Or you can `python setup.py bdist_wheel && python setup.py develop`.
You will need:
- `python >= 3.7`
- `yarn` (`npm` support coming soon...)
If you want to develop the ui, you can also serve it via yarn (as it
supports hot reload):
```
$ cd bot.z_web
$ yarn start
```
You will find a working ui at `localhost:3000` (but you need the last two
lines of the previous example config file).
Install
@ -33,7 +89,7 @@ You can either proceed in one of the following ways:
TODO
----
[x] Check in/out
[ ] systemd {unit, timer}
[ ] APIs
[ ] Mailer
- [x] Check in/out
- [ ] systemd {unit, timer}
- [x] APIs
- [ ] Mailer

7
api/__init__.py 100644
View File

@ -0,0 +1,7 @@
# -*- encoding: utf-8 -*-
from distutils.util import strtobool
import os
# TODO: create config module and put this constant there.
BASE_URI = os.environ.get("BOTZ_BASE_URI", "http://localhost")
DEBUG = strtobool(os.environ.get("BOTZ_DEBUG", "False"))

108
api/app.py 100644
View File

@ -0,0 +1,108 @@
# -*- encoding: utf-8 -*-
"""
The application entrypoint.
"""
from aiohttp import web
import base64
from cryptography import fernet
import logging
import logging.handlers
import os
import typing as T
import click
from aiohttp_session import setup, session_middleware
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from bot_z.async_operator import AsyncOperator
from api.rest import routes, add_static_routes
from api.conf import read_conf
def setup_log(level: str, syslog: bool) -> logging.Logger:
alog = logging.getLogger("api")
alog.setLevel(os.environ.get("BOTZ_LOGLEVEL", level))
h = logging.StreamHandler()
f = logging.Formatter("%(levelname)s [%(name)s] -> %(message)s")
h.setFormatter(f)
alog.addHandler(h)
if syslog:
sh = logging.handlers.SysLogHandler()
alog.addHandler(sh)
return alog
def init_secret() -> bytes:
fernet_key = fernet.Fernet.generate_key()
return base64.urlsafe_b64decode(fernet_key)
def setup_session(app: web.Application, secure: bool, max_age: int):
secret = init_secret()
setup(
app,
EncryptedCookieStorage(
secret_key=secret,
cookie_name="BOTZ_SESSION",
httponly=False,
secure=secure,
max_age=max_age,
),
)
def run(
address: T.Optional[T.Text], port: T.Optional[int], conf_path: T.Optional[T.Text]
) -> None:
"""Application entrypoint."""
conf = read_conf(conf_path)
# This closure is needed to intercept the to-be-prepared response
# and add the right CORS headers
async def on_prepare_cors(request, response):
response.headers["Access-Control-Allow-Origin"] = conf["http"].get("cors_allow")
response.headers["Access-Control-Allow-Credentials"] = "true"
alog = setup_log(conf["log"]["level"], conf["log"]["syslog"])
alog.debug("conf %s", conf)
app = web.Application(logger=alog)
app["base_uri"] = conf["base_uri"]
app["debug"] = conf["debug"]
app["headless"] = conf["headless"]
if conf["http"].get("cors_allow"):
app.on_response_prepare.append(on_prepare_cors)
setup_session(app, conf["http"]["cookie_secure"], conf["http"]["session_timeout"])
add_static_routes(alog)
app.add_routes(routes)
addr = []
if address is not None:
addr = [address]
elif conf["http"].get("bind_addr"):
addr.extend(conf["http"]["bind_addr"])
else:
addr = ["127.0.0.1"]
if port is None:
port = conf["http"]["port"]
alog.debug("Starting app with: addr -> %s, port -> %d", addr, port)
web.run_app(app, host=addr, port=port)
@click.command()
@click.option(
"-a", "--address", type=click.STRING, help="Address to bind the server to."
)
@click.option("-p", "--port", type=click.INT, help="Port to bind to.")
@click.option(
"-c",
"--conf",
type=click.Path(exists=False),
help="A path to a configuration file.",
)
def cli(
address: T.Optional[T.Text],
port: T.Optional[int] = None,
conf: T.Optional[T.Text] = None,
) -> None:
run(address, port, conf)

92
api/async_bot.py 100644
View File

@ -0,0 +1,92 @@
# -*- encoding: utf-8 -*-
"""
Asyncio wrapper for AsyncOperator methods.
"""
import asyncio
from contextlib import contextmanager
import typing as T
from bot_z.async_operator import AsyncOperator
from bot_z.exceptions import OperationFailed
@contextmanager
def this_loop(loop):
_loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
yield
asyncio.set_event_loop(_loop)
def init(base_url: T.Text, name: T.Text) -> AsyncOperator:
"""
Initialize the stateful object.
"""
return AsyncOperator(base_url, name)
async def login(op: AsyncOperator, username: T.Text, password: T.Text) -> bool:
"""
Executes the login asynchronously and returns
the AsyncOperator object.
"""
if not op.logged_in:
try:
await op.login(username, password)
except OperationFailed:
pass
return op.logged_in
async def logout(op: AsyncOperator) -> bool:
"""
Executes the logout asynchronously and returns
a bool showing the success or failure of the operation.
"""
if op.logged_in:
try:
await op.logout()
except OperationFailed:
pass
return op.logged_in
async def checkin(op: AsyncOperator) -> bool:
"""
Executes the check in asynchronously and returns
a bool showing the success or failure of the operation.
"""
if op.logged_in and not op.checked_in:
try:
await op.checkin()
except OperationFailed:
pass
return op.checked_in
async def checkout(op: AsyncOperator) -> bool:
"""
Executes the checkout asynchronously and returns
a bool showing the success or failure of the operation.
"""
if op.logged_in and op.checked_in:
try:
await op.checkout()
except OperationFailed:
pass
return op.checked_in
async def status(op: AsyncOperator) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
"""
Asks the list of movements asynchronously and returns a list
of tuples containing the succession of movements. If the operation
fails, the list is empty.
"""
movements = []
if op.logged_in:
res = await op.get_movements()
movements.extend(res)
return movements

99
api/conf.py 100644
View File

@ -0,0 +1,99 @@
# -*- encoding: utf-8 -*-
import os
from pprint import pprint
import typing as T
import yaml
from api import DEBUG, BASE_URI
def read_conf(path: T.Optional[T.Text]) -> T.Dict:
"""
Read the configuration from the provided path.
Such configuration may provide the following information:
---
base_uri: <base uri of the target instance>
debug: <bool, set debug on, defaults to false>
headless: <bool, use headless mode, defaults to true>
log:
level: <may be DEBUG, INFO, WARN, ..., defaults at INFO>
syslog: <bool, whether to redirect to standard syslog, defaults to false>
http:
bind_addr: <a list of addresses to bind to>
port: <int, the port to bind to>
cookie_name: <defaults to BOTZ_SESSION>
cookie_secure: <bool, whether to set Secure cookie flag, defaults to true>
session_timeout: <int, the expiration time of the session ins secs, defaults to 300>
cors_allow: <an optional single allowed Cross Origin domain>
"""
if path is None:
path = seek_path()
if path is not None:
with open(path) as f:
conf = yaml.safe_load(f)
else:
conf = {}
if "base_uri" not in conf:
if BASE_URI is None:
raise RuntimeError("Missing base_uri")
conf["base_uri"] = BASE_URI
if "debug" not in conf:
conf["debug"] = DEBUG
if "headless" not in conf:
conf["headless"] = True
conf = validate_log_conf(conf)
conf = validate_http_log(conf)
pprint(conf)
return conf
def seek_path() -> T.Optional[T.Text]:
"""
Seeks the path to a config file, in the following order:
- $BOTZ_CONFIG
- $PWD/.botz.yaml
- ~/.botz.yaml
- /etc/botz/conf.yaml
"""
paths = [
os.path.join(os.path.curdir, ".botz.yaml"),
os.path.expanduser("~/.botz.yaml"),
"/etc/botz/conf.yaml",
]
env = os.environ.get("BOTZ_CONFIG")
if env is not None:
paths.insert(0, env)
for path in paths:
if os.path.exists(path):
return path
return None
def validate_log_conf(conf: T.Dict[T.Text, T.Any]) -> T.Dict[T.Text, T.Any]:
if "log" not in conf:
conf["log"] = {}
if conf["log"].get("level") is None:
conf["log"]["level"] = "INFO"
if conf["log"].get("syslog") is None:
conf["log"]["syslog"] = False
return conf
def validate_http_log(conf: T.Dict[T.Text, T.Any]) -> T.Dict[T.Text, T.Any]:
if "http" not in conf:
conf["http"] = {}
if conf["http"].get("bind_addr") is None:
conf["http"]["bind_addr"] = ["127.0.0.1"]
if conf["http"].get("port") is None:
conf["http"]["port"] = 3003
if conf["http"].get("cookie_name") is None:
conf["http"]["cookie_name"] = "BOTZ_SESSION"
if conf["http"].get("cookie_secure") is None:
conf["http"]["cookie_secure"] = True
if conf["http"].get("session_timeout") is None:
conf["http"]["session_timeout"] = 300
elif isinstance(conf["http"]["session_timeout"], str):
conf["http"]["session_timeout"] = int(conf["http"]["session_timeout"])
return conf

82
api/dev_server.py 100644
View File

@ -0,0 +1,82 @@
# -*- encoding: utf-8 -*-
from aiohttp import web
import logging
import click
logging.basicConfig(
format="%(message)s", level=logging.INFO, handlers=[logging.StreamHandler()]
)
alog = logging.getLogger(__name__)
routes = web.RouteTableDef()
@routes.get("/{tail:.*}")
async def get_handler(request: web.Request) -> web.Response:
alog.info("GET -> [%s]: %s", request.path, request.query)
return web.json_response(
{"method": request.method, "path": request.path},
headers={"Access-Control-Allow-Origin": "*"},
)
@routes.post("/{tail:.*}")
async def post_handler(request: web.Request) -> web.Response:
data = await request.post()
alog.info("POST -> [%s]: %s", request.path, data)
return web.json_response(
{"method": request.method, "path": request.path},
headers={"Access-Control-Allow-Origin": "*"},
)
@routes.put("/{tail:.*}")
async def put_handler(request: web.Request) -> web.Response:
data = await request.post()
alog.info("PUT -> [%s]: %s", request.path, data)
return web.json_response(
{"method": request.method, "path": request.path},
headers={"Access-Control-Allow-Origin": "*"},
)
@routes.delete("/{tail:.*}")
async def delete_handler(request: web.Request) -> web.Response:
data = await request.post()
alog.info("DELETE -> [%s]: %s", request.path, data)
return web.json_response(
{"method": request.method, "path": request.path},
headers={"Access-Control-Allow-Origin": "*"},
)
async def options_handler(request: web.Request) -> web.Response:
alog.info("OPTIONS -> [%s]", request.path)
return web.Response(status=200)
def run(address: str, port: int) -> None:
"""Application entrypoint."""
app = web.Application(logger=alog)
app.add_routes(routes)
app.router.add_route("OPTIONS", "/{tail:.*}", options_handler)
web.run_app(app, host=address, port=port)
@click.command()
@click.option(
"-a",
"--address",
type=click.STRING,
help="Address to bind the server to.",
default="127.0.0.1",
)
@click.option("-p", "--port", type=click.INT, help="Port to bind to", default=3003)
def cli(address: str, port: int) -> None:
run(address, port)
if __name__ == "__main__":
cli()

224
api/rest.py 100644
View File

@ -0,0 +1,224 @@
# -*- encoding: utf-8 -*-
"""
The REST endpoints.
"""
from aiohttp import web
import asyncio
from concurrent.futures import ProcessPoolExecutor
import datetime
import logging
import os
import pkg_resources
import typing as T
import weakref
from aiohttp_session import new_session, get_session, Session
from passlib.hash import bcrypt
from bot_z.async_operator import AsyncOperator, push_to_loop
from api.async_bot import login, logout, checkin, checkout, status
from api import BASE_URI, DEBUG
alog = logging.getLogger("api")
routes = web.RouteTableDef()
OPERATORS = weakref.WeakKeyDictionary(
{}
) # type: weakref.WeakKeyDictionary[UserSession, AsyncOperator]
USERS = {} # type: T.Dict[T.Text, UserSession]
BASE_PATH = pkg_resources.resource_filename(__name__, "assets")
EXECUTOR = ProcessPoolExecutor()
# WARN: the default il 12 rounds; both the server and the client shall compute
# this hash. The target client is a smartphone with poor performance on
# crypto computations, so we should keep this low, as the verification step
# is optionally enforced by the client.
ROUNDS = 6
class UserSession(object):
"""
Placeholder object to manipulate session life.
"""
def __init__(self, user):
self._user = user
def _reckon_token_response(base_uri: T.Text) -> T.Text:
return bcrypt.using(rounds=ROUNDS, truncate_error=True).hash(base_uri)
async def reckon_token_response(
base_uri: T.Text, loop: asyncio.AbstractEventLoop
) -> T.Text:
"""
A client and the server should agree if the pairing is adequate.
This could be accomplished calculating on both sides an cryptographic
secret. The current implementation uses bcrypt to compute the hash on
server side and the client should verify the secret on its side.
"""
return await push_to_loop(loop, EXECUTOR, _reckon_token_response, base_uri)
async def get_set_operator(
request: web.Request, user: T.Text, password: T.Text
) -> T.Tuple[AsyncOperator, Session]:
session = await get_session(request)
op = None
if "async_operator" in session:
user_session = USERS.get(session["async_operator"])
op = OPERATORS.get(user_session)
else:
session = await new_session(request)
if op is None or session.new:
base_uri = request.app["base_uri"]
debug = request.app["debug"]
headless = request.app["headless"]
op = AsyncOperator(base_uri, name=user, headless=headless, debug=debug)
USERS[user] = UserSession(user)
session["async_operator"] = user
OPERATORS[USERS[user]] = op
return op, session
def add_static_routes(log: logging.Logger) -> None:
static_assets = [
os.path.abspath(os.path.join(BASE_PATH, path))
for path in os.listdir(BASE_PATH)
if os.path.isdir(os.path.join(BASE_PATH, path))
]
for asset in static_assets:
asset_path = os.path.relpath(asset, BASE_PATH)
log.debug(f"Linking: {asset_path} -> {asset}")
routes.static(f"/{asset_path}", asset)
@routes.get("/")
async def home_handle(request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(BASE_PATH, "index.html"))
@routes.get("/favicon.ico")
async def favicon_handle(request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(BASE_PATH, "favicon.ico"))
@routes.get("/api/login")
@routes.get("/api/badge")
@routes.get("/api/status")
async def routing_handler(request: web.Request) -> web.Response:
alog.debug("(%s) %s", request.method, request.path)
session = await get_session(request)
op = session.get("async_operator")
_logged_in = True if op else False
alog.info("%s - is%s in session", request.path, " NOT" if not _logged_in else "")
return web.json_response({"logged_in": _logged_in})
@routes.get("/api/ping")
async def ping_handler(request: web.Request) -> web.Response:
alog.debug("pinged on %s", request.path)
resp_data = await reckon_token_response(request.app["base_uri"], request.app.loop)
alog.debug("ping response: %s", resp_data)
return web.json_response({"hash": resp_data})
@routes.post("/api/login")
async def login_handler(request: web.Request) -> web.Response:
data = await request.json()
user = data.get("username")
password = data.get("password")
if not user or not password:
alog.debug("login - missing username or password: %s", data)
return web.json_response({"error": "Missing username or password"}, status=401)
op, session = await get_set_operator(request, user, password)
alog.debug("login - user: %s, password: %s", user, password)
res = await login(op, user, password)
alog.debug("login result: %s", res)
if not res:
session.invalidate()
alog.info("Login failed; session invalidated.")
return web.json_response({"logged_in": res}, status=200)
@routes.post("/api/logout")
async def logout_handler(request: web.Request) -> web.Response:
alog.debug("logout")
session = await get_session(request)
user_session = USERS.get(session["async_operator"], UserSession("NOONE"))
op = OPERATORS.get(user_session)
if not op:
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await logout(op)
session.invalidate()
alog.debug("logout result: %s", res)
# FIX: assess if better to invalidate session and dump the browser instance.
del user_session
return web.json_response({"logged_in": res}, status=200)
@routes.post("/api/checkin")
async def checkin_handler(request: web.Request) -> web.Response:
alog.debug("checkin")
session = await get_session(request)
user_session = USERS.get(session["async_operator"], UserSession("NOONE"))
op = OPERATORS.get(user_session)
if not op:
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await checkin(op)
alog.debug("checkin result: %s", res)
return web.json_response({"checked_in": res, "logged_in": True}, status=200)
@routes.post("/api/checkout")
async def checkout_handler(request: web.Request) -> web.Response:
alog.debug("checkout")
session = await get_session(request)
user_session = USERS.get(session["async_operator"], UserSession("NOONE"))
op = OPERATORS.get(user_session)
if not op:
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await checkout(op)
alog.debug("checkout result: %s", res)
return web.json_response({"checked_in": res, "logged_in": True}, status=200)
@routes.get("/api/movements")
async def movements_handle(request: web.Request) -> web.Response:
alog.debug("movements")
session = await get_session(request)
user_session = USERS.get(session.get("async_operator"), UserSession("NOONE"))
op = OPERATORS.get(user_session)
if not op:
alog.debug("Missing session")
return web.json_response(
{"error": "No session", "logged_in": False}, status=401
)
res = await status(op)
alog.debug("movements result: %s", res)
if not res:
return web.json_response(
{"error": "No movements found", "logged_in": True, "checked_in": False},
status=404,
)
movements = []
for r in res:
if r and len(r) == 2:
movements.append({"time": r[1], "type": r[0]})
resp_data: T.Dict[T.Text, T.Any] = {"movements": movements}
resp_data["logged_in"] = True
try:
last_movement = list(movements[-1])
resp_data["checked_in"] = True if last_movement == "Entrata" else False
except IndexError:
alog.info("No movements found")
return web.json_response(resp_data, status=200)

23
bot.z_web/.gitignore vendored 100644
View File

@ -0,0 +1,23 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
# dependencies
/node_modules
/.pnp
.pnp.js
# testing
/coverage
# production
/build
# misc
.DS_Store
.env.local
.env.development.local
.env.test.local
.env.production.local
npm-debug.log*
yarn-debug.log*
yarn-error.log*

View File

@ -0,0 +1,68 @@
This project was bootstrapped with [Create React App](https://github.com/facebook/create-react-app).
## Available Scripts
In the project directory, you can run:
### `npm start`
Runs the app in the development mode.<br>
Open [http://localhost:3000](http://localhost:3000) to view it in the browser.
The page will reload if you make edits.<br>
You will also see any lint errors in the console.
### `npm test`
Launches the test runner in the interactive watch mode.<br>
See the section about [running tests](https://facebook.github.io/create-react-app/docs/running-tests) for more information.
### `npm run build`
Builds the app for production to the `build` folder.<br>
It correctly bundles React in production mode and optimizes the build for the best performance.
The build is minified and the filenames include the hashes.<br>
Your app is ready to be deployed!
See the section about [deployment](https://facebook.github.io/create-react-app/docs/deployment) for more information.
### `npm run eject`
**Note: this is a one-way operation. Once you `eject`, you cant go back!**
If you arent satisfied with the build tool and configuration choices, you can `eject` at any time. This command will remove the single build dependency from your project.
Instead, it will copy all the configuration files and the transitive dependencies (Webpack, Babel, ESLint, etc) right into your project so you have full control over them. All of the commands except `eject` will still work, but they will point to the copied scripts so you can tweak them. At this point youre on your own.
You dont have to ever use `eject`. The curated feature set is suitable for small and middle deployments, and you shouldnt feel obligated to use this feature. However we understand that this tool wouldnt be useful if you couldnt customize it when you are ready for it.
## Learn More
You can learn more in the [Create React App documentation](https://facebook.github.io/create-react-app/docs/getting-started).
To learn React, check out the [React documentation](https://reactjs.org/).
### Code Splitting
This section has moved here: https://facebook.github.io/create-react-app/docs/code-splitting
### Analyzing the Bundle Size
This section has moved here: https://facebook.github.io/create-react-app/docs/analyzing-the-bundle-size
### Making a Progressive Web App
This section has moved here: https://facebook.github.io/create-react-app/docs/making-a-progressive-web-app
### Advanced Configuration
This section has moved here: https://facebook.github.io/create-react-app/docs/advanced-configuration
### Deployment
This section has moved here: https://facebook.github.io/create-react-app/docs/deployment
### `npm run build` fails to minify
This section has moved here: https://facebook.github.io/create-react-app/docs/troubleshooting#npm-run-build-fails-to-minify

View File

@ -0,0 +1,34 @@
{
"name": "bot.z_web",
"version": "1.0.0",
"private": true,
"dependencies": {
"bootstrap": "^4.3.1",
"react": "^16.8.6",
"react-bootstrap": "^1.0.0-beta.10",
"react-dom": "^16.8.6",
"react-router-dom": "^5.0.1",
"react-scripts": "3.0.1"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"eslintConfig": {
"extends": "react-app"
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

View File

@ -0,0 +1,38 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<link rel="shortcut icon" href="%PUBLIC_URL%/favicon.ico" />
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<meta name="theme-color" content="#000000" />
<!--
manifest.json provides metadata used when your web app is installed on a
user's mobile device or desktop. See https://developers.google.com/web/fundamentals/web-app-manifest/
-->
<link rel="manifest" href="%PUBLIC_URL%/manifest.json" />
<!--
Notice the use of %PUBLIC_URL% in the tags above.
It will be replaced with the URL of the `public` folder during the build.
Only files inside the `public` folder can be referenced from the HTML.
Unlike "/favicon.ico" or "favicon.ico", "%PUBLIC_URL%/favicon.ico" will
work correctly both with client-side routing and a non-root public URL.
Learn how to configure a non-root public URL by running `npm run build`.
-->
<title>BotZ</title>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>
<div id="root"></div>
<!--
This HTML file is a template.
If you open it directly in the browser, you will see an empty page.
You can add webfonts, meta tags, or analytics to this file.
The build step will place the bundled scripts into the <body> tag.
To begin the development, run `npm start` or `yarn start`.
To create a production bundle, use `npm run build` or `yarn build`.
-->
</body>
</html>

View File

@ -0,0 +1,15 @@
{
"short_name": "BotZ",
"name": "BotZ web interface",
"icons": [
{
"src": "favicon.ico",
"sizes": "64x64 32x32 24x24 16x16",
"type": "image/x-icon"
}
],
"start_url": ".",
"display": "standalone",
"theme_color": "#000000",
"background_color": "#ffffff"
}

View File

@ -0,0 +1,45 @@
body {
padding-top: 15%;
padding-bottom: 20%;
background-color: #eee;
}
#main-form {
max-width: 330px;
padding: 15px;
margin: 0 auto;
}
.App {
text-align: center;
}
.App-logo {
animation: App-logo-spin infinite 20s linear;
height: 40vmin;
pointer-events: none;
}
.App-header {
background-color: #282c34;
min-height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
font-size: calc(10px + 2vmin);
color: white;
}
.App-link {
color: #61dafb;
}
@keyframes App-logo-spin {
from {
transform: rotate(0deg);
}
to {
transform: rotate(360deg);
}
}

View File

@ -0,0 +1,28 @@
import React from 'react';
import IndexPage from './IndexPage.js';
class App extends React.Component {
constructor(props) {
super(props)
this.state = {targetUrl: props.targetUrl, loggedIn: false}
}
componentDidMount() {
fetch(this.targetUrl + '/status', {credentials: 'include'})
.then(response => response.json())
.then(data => this.setState({loggedIn: data.logged_in}));
}
render() {
return (
<div id="main" className="container">
<IndexPage targetUrl={this.state.targetUrl} loggedIn={this.state.loggedIn} />
</div>
);
}
}
export default App;

View File

@ -0,0 +1,9 @@
import React from 'react';
import ReactDOM from 'react-dom';
import App from './App';
it('renders without crashing', () => {
const div = document.createElement('div');
ReactDOM.render(<App />, div);
ReactDOM.unmountComponentAtNode(div);
});

View File

@ -0,0 +1,101 @@
import React from 'react';
import ModalBox from './ModalBox.js';
import { Button, Spinner } from 'react-bootstrap';
import './App.css';
function isCheckedIn(movements) {
if (movements.length === 0) {
return false;
}
const lastMov = movements[movements.length - 1].type;
if (lastMov === "Entrata") {
return true;
} else {
return false;
}
}
class BadgePage extends React.Component {
constructor(props) {
super(props);
this.state = {loggedIn: props.loggedIn, checkedIn: false, loading: false};
}
_isMounted = false
componentDidMount() {
this._isMounted = true;
fetch(this.props.targetUrl + '/movements', {credentials: 'include'})
.then(response => response.json())
.then(data => {if (this._isMounted) {this.setState({
checkedIn: isCheckedIn(data.movements), loggedIn: data.logged_in
})}})
.catch(error => console.log(error));
}
componentWillUnmount() {
this._isMounted = false;
}
handleToggle = (event) => {
this.setState({loading: true})
var path;
if (this.state.checkedIn) {
path = '/checkout';
} else {
path = '/checkin';
}
fetch(this.props.targetUrl + path, {
method: 'POST',
credentials: 'include'
})
.then(response => response.json())
.then(data => this.setState({
checkedIn: data.checked_in,
loggedIn: data.logged_in,
loading: false
}));
}
buttonValue() {
if (this.state.checkedIn) {
return "CHECKOUT";
} else {
return "CHECKIN";
}
}
dynButton() {
const badge = (
<ModalBox
title="Badge"
data={<Button variant="primary" type="button" onClick={this.handleToggle}>{this.buttonValue()}</Button>}
/>
);
const loading =
<ModalBox
title="Badge"
data=<Spinner animation="border" variant="primary" />
/>;
if (this.state.loading) {
return loading;
} else {
return badge;
}
}
render() {
if (this.state.loggedIn) {
return this.dynButton();
} else {
return (<ModalBox title="Badge" data=<h2>You have to login first!</h2> />);
}
}
}
export default BadgePage;

View File

@ -0,0 +1,69 @@
import React from 'react';
import { BrowserRouter as Router, Route, Link, Switch } from "react-router-dom";
import { Nav, Navbar, Container } from "react-bootstrap";
import LoginForm from './LoginForm.js';
import MovementsPage from './Movements.js';
import BadgePage from './Badge.js';
import LogoffPage from './Logoff.js';
class IndexPage extends React.Component {
constructor(props) {
super(props);
this.state = {loggedIn: props.loggedIn};
}
componentDidMount() {
fetch(this.props.targetUrl + '/status', {credentials: 'include'})
.then(response => response.json())
.then(data => this.setState({loggedIn: data.logged_in}))
.catch(error => {console.log(error)});
}
render() {
return (
<Router>
<Navbar bg="light" expand="lg">
<Navbar.Brand href="/">BotZ</Navbar.Brand>
<Navbar.Toggle aria-controls="basic-navbar-nav" />
<Navbar.Collapse id="basic-navbar-nav">
<Nav variant="tabs" className="mr-auto">
<Nav.Item>
<Nav.Link as='div' eventKey="login">
<Link to='/login'>Login</Link>
</Nav.Link>
</Nav.Item>
<Nav.Item>
<Nav.Link as='div' eventKey="movements">
<Link to='/movements'>Movements</Link>
</Nav.Link>
</Nav.Item>
<Nav.Item>
<Nav.Link as='div' eventKey="badge">
<Link to='/badge'>Badge</Link>
</Nav.Link>
</Nav.Item>
<Nav.Item>
<Nav.Link as='div' eventKey="logout">
<Link to='/logout'>Logout</Link>
</Nav.Link>
</Nav.Item>
</Nav>
</Navbar.Collapse>
</Navbar>
<div>
<Container>
<Switch>
<Route path="/login" component={() => <LoginForm targetUrl={this.props.targetUrl} loggedIn={this.state.loggedIn} />} />
<Route path="/movements" component={() => <MovementsPage targetUrl={this.props.targetUrl} loggedIn={this.state.loggedIn} />} />
<Route path="/badge" component={() => <BadgePage targetUrl={this.props.targetUrl} loggedIn={this.state.loggedIn} />} />
<Route path="/logout" component={() => <LogoffPage targetUrl={this.props.targetUrl} loggedIn={this.state.loggedIn} />} />
</Switch>
</Container>
</div>
</Router>
);
}
}
export default IndexPage;

View File

@ -0,0 +1,111 @@
import React from 'react';
import { Form, Button, Spinner } from 'react-bootstrap';
import ModalBox from './ModalBox.js';
import './App.css';
class LoginForm extends React.Component {
constructor(props) {
super(props);
this.state = {
user: '', password: '',
loggedIn: props.loggedIn,
loading: false,
error: false
};
}
componentDidUpdate() {
if (this.state.error) {
console.log(this.state);
this.timeout = setTimeout(this.resetComponentState, 3000);
}
}
componentDidMount() {
fetch(this.props.targetUrl + '/status', {credentials: 'include'})
.then(response => response.json())
.then(data => this.setState({
loggedIn: data.logged_in
})
);
}
componentWillUnmount() {
clearTimeout(this.timeout)
}
handleUserChange = (event) => {
this.setState({user: event.target.value});
};
handlePasswordChange = (event) => {
this.setState({password: event.target.value});
};
handleSubmit = (event) => {
event.preventDefault();
this.setState({loading: true});
const formData = event.target.elements;
var data = {username: formData["username"].value, password: formData["password"].value};
fetch(this.props.targetUrl + '/login', {
method: 'POST',
body: JSON.stringify(data),
credentials: 'include'
})
.then(response => response.json())
.then(jsonData => this.setState({loggedIn: jsonData.logged_in, loading: false}))
.catch((error) => {console.log(error); this.setState({error: true})})
}
dynForm() {
const form =
<ModalBox
title=<h2>Login Form</h2>
data=
<Form onSubmit={this.handleSubmit}>
<Form.Group controlId="username">
<Form.Control onChange={this.handleUserChange} type="text" name="username" placeholder="Username" />
</Form.Group>
<Form.Group controlId="password">
<Form.Control onChange={this.handlePasswordChange} type="password" name="password" placeholder="Password" />
</Form.Group>
<Button variant="primary" type="submit">Send</Button>
</Form>
/>;
const loading =
<ModalBox
title="Login"
data=<Spinner animation="border" variant="primary" />
/>;
if (this.state.loading) {
return loading;
} else {
return form;
}
}
resetComponentState = () => {
this.setState({error: false, loading: false, username: '', password: ''})
}
render() {
if (this.state.loggedIn) {
return (<ModalBox title="Login" data=<h2>Yet logged in!</h2> />);
}
if (this.state.error) {
return (<div><h2>Error!</h2></div>);
} else {
return this.dynForm();
}
}
}
export default LoginForm;

View File

@ -0,0 +1,73 @@
import React from 'react';
import { Spinner, Button } from 'react-bootstrap';
import ModalBox from './ModalBox.js';
import './App.css';
import './bootstrap.min.css';
class LogoffPage extends React.Component {
constructor(props) {
super(props);
this.state = {logged_in: props.loggedIn, loading: false};
}
componentDidMount() {
fetch(this.props.targetUrl + '/status', {credentials: 'include'})
.then(response => response.json())
.then(data => this.setState({
loggedIn: data.logged_in
}));
}
handleToggle = (event) => {
this.setState({loading: true})
fetch(this.props.targetUrl + '/logout', {
method: 'POST',
credentials: 'include'
})
.then(response => response.json())
.then(data => this.setState({
loggedIn: data.logged_in, loading: false
}));
}
buttonValue() {
if (this.state.loggedIn) {
return "LOGOUT";
} else {
return "LOGIN";
}
}
dynButton() {
const logout = (
<ModalBox
title="Logoff"
data=<Button variant="primary" type="button" onClick={this.handleToggle}>{this.buttonValue()}</Button>
/>
);
const loading =
<ModalBox
title="Logout"
data=<Spinner animation="border" variant="primary" />
/>;
if (this.state.loading) {
return loading;
} else {
return logout;
}
}
render() {
if (this.state.loggedIn) {
return this.dynButton();
} else {
return (<ModalBox title="Logout" data=<h2>You have to login first!</h2> />);
}
}
}
export default LogoffPage;

View File

@ -0,0 +1,22 @@
import React from 'react';
import { Modal } from 'react-bootstrap';
import './App.css';
class ModalBox extends React.Component {
render() {
return (
<Modal.Dialog>
<Modal.Header>
<Modal.Title>{this.props.title}</Modal.Title>
</Modal.Header>
<Modal.Body>
{this.props.data}
</Modal.Body>
</Modal.Dialog>
);
}
}
export default ModalBox;

View File

@ -0,0 +1,51 @@
import React from 'react';
import ModalBox from './ModalBox.js';
import './App.css';
const Empty = <ModalBox title="Movements" data=<h3>Empty</h3> />;
const Error = <ModalBox title="Movements" data=<h3>Error</h3> />;
class MovementsPage extends React.Component {
constructor(props) {
super(props);
this.state = {loggedIn: props.loggedIn, movements: []};
}
componentDidMount() {
fetch(this.props.targetUrl + '/movements', {credentials: 'include'})
.then(response => response.json())
.then(data => this.setState({movements: data.movements, loggedIn: data.logged_in}))
.catch(error => {console.log(error); this.setState({error: true})});
}
componentWillUnmount() {
this.setState({error: false, movements: []});
}
handleTable() {
if (this.state.movements !== undefined) {
const table =
<ul>{this.state.movements.map(movs => <li key={movs.time}>{movs.time}&nbsp; - &nbsp;{movs.type}</li>)}</ul>;
return <ModalBox title="Movements" data={table} />;
} else {
return Empty;
}
}
render() {
const movements = this.handleTable();
if (this.state.error) {
return Error;
}
if (this.state.loggedIn) {
return movements;
} else {
return (<ModalBox title="Movements" data=<h2>You have to login first!</h2> />);
}
}
}
export default MovementsPage;

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,13 @@
body {
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "Roboto", "Oxygen",
"Ubuntu", "Cantarell", "Fira Sans", "Droid Sans", "Helvetica Neue",
sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
code {
font-family: source-code-pro, Menlo, Monaco, Consolas, "Courier New",
monospace;
}

View File

@ -0,0 +1,18 @@
import 'bootstrap/dist/css/bootstrap.css';
import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import App from './App';
import * as serviceWorker from './serviceWorker';
const targetUrl = "/api";
// // Dev address //
// const targetUrl = "http://localhost:3003/api";
ReactDOM.render(<App targetUrl={targetUrl}/>, document.getElementById('root'));
// If you want your app to work offline and load faster, you can change
// unregister() to register() below. Note this comes with some pitfalls.
// Learn more about service workers: https://bit.ly/CRA-PWA
serviceWorker.unregister();

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 2.6 KiB

View File

@ -0,0 +1,135 @@
// This optional code is used to register a service worker.
// register() is not called by default.
// This lets the app load faster on subsequent visits in production, and gives
// it offline capabilities. However, it also means that developers (and users)
// will only see deployed updates on subsequent visits to a page, after all the
// existing tabs open on the page have been closed, since previously cached
// resources are updated in the background.
// To learn more about the benefits of this model and instructions on how to
// opt-in, read https://bit.ly/CRA-PWA
const isLocalhost = Boolean(
window.location.hostname === 'localhost' ||
// [::1] is the IPv6 localhost address.
window.location.hostname === '[::1]' ||
// 127.0.0.1/8 is considered localhost for IPv4.
window.location.hostname.match(
/^127(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}$/
)
);
export function register(config) {
if (process.env.NODE_ENV === 'production' && 'serviceWorker' in navigator) {
// The URL constructor is available in all browsers that support SW.
const publicUrl = new URL(process.env.PUBLIC_URL, window.location.href);
if (publicUrl.origin !== window.location.origin) {
// Our service worker won't work if PUBLIC_URL is on a different origin
// from what our page is served on. This might happen if a CDN is used to
// serve assets; see https://github.com/facebook/create-react-app/issues/2374
return;
}
window.addEventListener('load', () => {
const swUrl = `${process.env.PUBLIC_URL}/service-worker.js`;
if (isLocalhost) {
// This is running on localhost. Let's check if a service worker still exists or not.
checkValidServiceWorker(swUrl, config);
// Add some additional logging to localhost, pointing developers to the
// service worker/PWA documentation.
navigator.serviceWorker.ready.then(() => {
console.log(
'This web app is being served cache-first by a service ' +
'worker. To learn more, visit https://bit.ly/CRA-PWA'
);
});
} else {
// Is not localhost. Just register service worker
registerValidSW(swUrl, config);
}
});
}
}
function registerValidSW(swUrl, config) {
navigator.serviceWorker
.register(swUrl)
.then(registration => {
registration.onupdatefound = () => {
const installingWorker = registration.installing;
if (installingWorker == null) {
return;
}
installingWorker.onstatechange = () => {
if (installingWorker.state === 'installed') {
if (navigator.serviceWorker.controller) {
// At this point, the updated precached content has been fetched,
// but the previous service worker will still serve the older
// content until all client tabs are closed.
console.log(
'New content is available and will be used when all ' +
'tabs for this page are closed. See https://bit.ly/CRA-PWA.'
);
// Execute callback
if (config && config.onUpdate) {
config.onUpdate(registration);
}
} else {
// At this point, everything has been precached.
// It's the perfect time to display a
// "Content is cached for offline use." message.
console.log('Content is cached for offline use.');
// Execute callback
if (config && config.onSuccess) {
config.onSuccess(registration);
}
}
}
};
};
})
.catch(error => {
console.error('Error during service worker registration:', error);
});
}
function checkValidServiceWorker(swUrl, config) {
// Check if the service worker can be found. If it can't reload the page.
fetch(swUrl)
.then(response => {
// Ensure service worker exists, and that we really are getting a JS file.
const contentType = response.headers.get('content-type');
if (
response.status === 404 ||
(contentType != null && contentType.indexOf('javascript') === -1)
) {
// No service worker found. Probably a different app. Reload the page.
navigator.serviceWorker.ready.then(registration => {
registration.unregister().then(() => {
window.location.reload();
});
});
} else {
// Service worker found. Proceed as normal.
registerValidSW(swUrl, config);
}
})
.catch(() => {
console.log(
'No internet connection found. App is running in offline mode.'
);
});
}
export function unregister() {
if ('serviceWorker' in navigator) {
navigator.serviceWorker.ready.then(registration => {
registration.unregister();
});
}
}

10291
bot.z_web/yarn.lock 100644

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,6 @@
"""Top-level package for Bot_Z."""
__author__ = """Leonardo Barcaroli"""
__email__ = 'leonardo.barcaroli@deustechnology.com'
__version__ = '0.1.0'
__author__ = """Blallo"""
__email__ = "blallo@autistici.org"
__version__ = "1.1.3"

View File

@ -0,0 +1,102 @@
# -*- encoding: utf-8 -*-
"""
AsyncOperator is an async wrapper around the Operator sync operations.
It puts in the loop background the execution of the same methods in
the Operator.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor, Executor
import functools
import logging
import typing as T
from bot_z.operator import Operator
from bot_z.exceptions import OperationFailed
alog = logging.getLogger("asyncio")
async def push_to_loop(
loop: asyncio.AbstractEventLoop,
executor: Executor,
func: T.Callable,
*args,
**kwargs
) -> T.Any:
sync_task = [
loop.run_in_executor(executor, functools.partial(func, **kwargs), *args)
]
res_set, _ = await asyncio.wait(sync_task, loop=loop)
res = res_set.pop()
return res.result()
# TODO: make it JSON-serializable
class AsyncOperator(object):
"""
This is the async version of the Operator.
This class DOES NOT inherit from Operator: it contains an
active instance of it.
"""
def __init__(self, base_uri: str, name: str, *args, **kwargs) -> None:
self.name = name
self.base_uri = base_uri
self.op = Operator(base_uri, name, *args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=2)
self.loop = asyncio.get_event_loop()
async def login(self, username: str, password: str) -> None:
"""Perform the login. Raise if failing."""
alog.debug("Logging in [%s]", self.name)
_ = await push_to_loop(
self.loop, self.executor, self.op.login, username, password
)
if not self.op.logged_in:
raise OperationFailed("Failed to login.")
alog.info("Logged in [%s]", self.name)
async def logout(self) -> None:
"""Perform the logout. Raise if failing."""
alog.debug("Logging out [%s]", self.name)
_ = await push_to_loop(self.loop, self.executor, self.op.logout)
if self.op.logged_in:
raise OperationFailed("Failed to logout.")
alog.info("Logged out [%s]", self.name)
async def checkin(self) -> None:
"""Perform the checkin. Raise if failing."""
alog.debug("Checking in [%s]", self.name)
_ = await push_to_loop(self.loop, self.executor, self.op.check_in)
if not self.op.checked_in:
raise OperationFailed("Failed to checkin.")
alog.info("Checked in [%s]", self.name)
async def checkout(self) -> None:
"""Perform the checkout. Raise if failing."""
alog.debug("Checking out [%s]", self.name)
_ = await push_to_loop(self.loop, self.executor, self.op.check_out)
if self.op.checked_in:
raise OperationFailed("Failed to checkout.")
alog.info("Checked out [%s]", self.name)
async def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
"""
Retrieves the list of movements as a list of tuples.
The list may be empty.
"""
alog.debug("Retrieving the list of movements [%s]", self.name)
res = await push_to_loop(self.loop, self.executor, self.op.get_movements)
alog.info("List of movements [%s]: %s", self.name, res)
return res
@property
def logged_in(self) -> bool:
return self.op.logged_in
@property
def checked_in(self) -> bool:
return self.op.checked_in

View File

@ -1,255 +0,0 @@
# -*- coding: utf-8 -*-
"""
Operator is the main object that interacts with the foreign
service. It exposes methods to login, logout and check in and out.
"""
from datetime import datetime, timedelta
import logging
import os
import pkg_resources
import shutil
import sys
import time
import typing as T
from urllib.parse import urlparse
geckoexe = shutil.which('geckodriver')
if geckoexe is None:
local_path = pkg_resources.resource_filename(__name__, 'bin')
try:
os.stat(os.path.join(local_path, 'geckodriver'))
os.environ['PATH'] = os.environ['PATH'] + \
':' + local_path
except FileNotFoundError:
print("Missing geckodriver executable in path", file=sys.stderr)
raise
from selenium import webdriver as wd # noqa
from selenium.common.exceptions import WebDriverException, NoSuchElementException # noqa
logging.basicConfig(
level=os.environ.get('BOTZ_LOGLEVEL', logging.INFO),
format='%(levelname)s: [%(name)s] -> %(message)s'
)
m_logger = logging.getLogger(__name__)
m_logger.debug("Init at debug")
def safely(f: T.Callable) -> T.Callable:
def _protection(self, *args, **kwargs):
try:
f(self, *args, **kwargs)
except WebDriverException as e:
self.logger.error("Something went wrong: %s", e)
finally:
self.switch_to.default_content()
return _protection
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
try:
driver.find_element_by_xpath(xpath)
return True
except NoSuchElementException:
return False
def is_present(driver: wd.Firefox,
xpath: str,
timeout: T.Optional[timedelta]=None
) -> bool:
"""
Helper function. If an element is present in the DOM tree,
returns true. False otherwise.
"""
if timeout is None:
return _is_present(driver, xpath)
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while _elapsed < timeout:
m_logger.debug("Not yet present: %s", xpath)
if _is_present(driver, xpath):
m_logger.debug("Present: %s", xpath)
return True
time.sleep(0.5)
_elapsed = datetime.now() - _now
return False
class Operator(wd.Firefox):
def __init__(
self,
base_uri: str,
name: str = None,
timeout: int=20,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
*args, **kwargs) -> None:
"""
Adds some configuration to Firefox.
"""
self.profile = wd.FirefoxProfile()
# Do not send telemetry
self.profile.set_preference('datareporting.policy.dataSubmissionEnabled', False)
self.profile.set_preference('datareporting.healthreport.service.enabled', False)
self.profile.set_preference('datareporting.healthreport.uploadEnabled', False)
self.opts = wd.firefox.options.Options()
self.opts.headless = headless
self.debug = debug
self.base_uri = base_uri
self.uri = urlparse(base_uri)
self.timeout = timedelta(seconds=timeout)
if proxy:
self.profile.set_preference('network.proxy.type', 1)
self.profile.set_preference('network.proxy.http', proxy[0])
self.profile.set_preference('network.proxy.http_port', proxy[1])
self.profile.set_preference('network.proxy.ssl', proxy[0])
self.profile.set_preference('network.proxy.ssl_port', proxy[1])
super().__init__(firefox_profile=self.profile, options=self.opts, *args, **kwargs)
self.fullscreen_window()
self.z_name = name if name is not None else __name__
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.debug("Debug level")
self._logged_in = False
self._checked_in = False
@safely
def login(self, user: str, password: str, force: bool=False) -> None:
"""
Do the login and proceed.
"""
if self._logged_in:
self.logger.warning("Already logged in: %s", user)
if not force:
return
self.logger.info("Forcing login: %s", user)
# Retrieve login page
self.get(self.base_uri)
_correct_url = 'cpccchk' in self.current_url
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while not _correct_url:
self.logger.debug("Not yet on login page: %s", self.current_url)
time.sleep(0.5)
_correct_url = 'cpccchk' in self.current_url
_elapsed = datetime.now() - _now
if _elapsed > self.timeout:
break
self.logger.debug("After login get: %s", self.current_url)
time.sleep(1)
# Username
user_form = self.find_element_by_name('m_cUserName')
# Password
pass_form = self.find_element_by_name('m_cPassword')
# Login button
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
# Compile and submit
user_form.send_keys(user)
pass_form.send_keys(password)
do_it = True
if self.debug and not force:
_do_it = input("Really do the login? [y/n] ").lower()
do_it = True if _do_it == "y" else False
if do_it:
login_butt.submit()
time.sleep(5)
self.logger.debug("Login result: %s", self.title)
if 'Routine window' in self.title:
self.logger.debug("Reloading...")
self.refresh()
self.switch_to.alert.accept()
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
self._logged_in = True
self.logger.info("Login success for user: %s", user)
else:
self.logger.error("Login failed: %s", user)
@safely
def logout(self, user: str, force: bool=False) -> None:
"""
Do the logout.
"""
if not self._logged_in:
self.logger.warning("Not yet logged in for user: %s", user)
if not force:
return
self.logger.info("Forcing logout: %s", user)
# Find the Profile menu and open it
profile_butt = self.find_element_by_xpath('//span[contains(@id, "imgNoPhotoLabel")]')
profile_butt.click()
time.sleep(1)
# Find the logout button
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
logout_butt.click()
if "jsp/usut_wapplogout_portlet.jsp" in self.current_url:
self.logger.info("User successfully logged out: %s", user)
else:
self.logger.warning("Logout failed: %s", user)
@property
def logged_in(self) -> bool:
"""
Check if already logged in. Checks if page is '/jsp/home.jsp'
and if login cookie is set (and not expired).
"""
_base_domain = '.'.join(self.uri.netloc.split('.')[-2:])
cookies = [c['name'] for c in self.get_cookies() if _base_domain in c['domain']]
_right_url = "/jsp/home.jsp" in self.current_url
_cookies = "dtLatC" in cookies
return _right_url and _cookies
@safely
def check_in(self, force: bool=False) -> None:
"""
Click the check in button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if self._checked_in:
self.logger.warn("Already checked in!")
if not force:
return
iframe = self.find_element_by_xpath('//iframe[contains(@id, "gsmd_container.jsp")]')
self.switch_to.frame(iframe)
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
enter_butt.click()
# Click the check in button and change
# self._checked_in state in case of success
pass
@safely
def check_out(self, force: bool=False) -> None:
"""
Click the check out button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if not self._checked_in:
self.logger.warn("Not yet checked in!")
if not force:
return
iframe = self.find_element_by_xpath('//iframe[contains(@id, "gsmd_container.jsp")]')
self.switch_to.frame(iframe)
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
exit_butt.click()
# Click the check in button and change
# self._checked_in state in case of success
pass
def __del__(self) -> None:
self.quit()

View File

@ -1,17 +1,420 @@
# -*- coding: utf-8 -*-
"""Console script for bot_z."""
"""Console script to control the bot_z daemon"""
import errno
from getpass import getpass
import logging
import os
import sys
import typing as T
import click
import lockfile
from bot_z.zdaemon import daemon_process
from bot_z.utils import Fifo, PLifo, cmd_marshal
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
sh = logging.StreamHandler(stream=sys.stdout)
cli_filter = logging.Filter(__name__)
sh.addFilter(cli_filter)
sh.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(sh)
@click.command()
def main(args=None):
"""Console script for bot_z."""
click.echo("Replace this message by putting your code into "
"bot_z.cli.main")
click.echo("See click documentation at http://click.pocoo.org/")
def _check_name(lifo_path: str, name: T.Optional[str]) -> str:
with PLifo(lifo_path) as lifo:
if name is None:
return lifo.last()
if name not in lifo:
logger.error("Daemon not present. Exiting.")
sys.exit(2)
return name
@click.group()
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable debug mode.")
@click.option(
"--no-headless",
"headless",
is_flag=True,
default=True,
help="Start the clients in foreground.",
)
@click.option(
"-v",
"--verbose",
is_flag=True,
default=False,
help="More verbose output from this cli.",
)
@click.option(
"-f",
"--fifo",
type=click.STRING,
default="bot_z.cmd",
help="Path to the control fifo.",
)
@click.option(
"-w",
"--workdir",
type=click.Path(exists=True, readable=True, writable=True, resolve_path=True),
default="/tmp",
help="The working dir where to launch the daemon and where the lockfile is put.",
)
@click.pass_context
def cli(
ctx: click.Context,
debug: bool,
headless: bool,
verbose: bool,
fifo: str,
workdir: str,
) -> None:
"""
Group cli.
"""
if verbose:
logger.setLevel(logging.DEBUG)
ctx.ensure_object(dict)
ctx.obj["debug"] = debug
ctx.obj["headless"] = headless
ctx.obj["verbose"] = verbose
ctx.obj["fifo"] = os.path.join(workdir, fifo)
ctx.obj["workdir"] = workdir
ctx.obj["lifo"] = os.path.join(workdir, "botz_open_daemons.list")
@cli.command("start-daemon")
@click.option(
"-t", "--timeout", type=click.INT, default=20, help="Browser requests timeout."
)
@click.option(
"-m",
"--umask",
type=click.INT,
default=0o002,
help="The umask of the control fifo.",
)
@click.option(
"-p",
"--proxy",
type=click.STRING,
default=None,
help="An optional string for the proxy with the form 'address:port'.",
)
@click.option(
"--foreground",
is_flag=True,
default=False,
help="Keep the process in foreground (do not daemonize).",
)
@click.argument("baseuri", type=click.STRING)
@click.pass_context
def start_daemon_command(
ctx: click.Context,
baseuri: str,
umask: int,
timeout: int,
proxy: T.Optional[str] = None,
foreground: bool = False,
) -> None:
"""
Invokes daemon_process for the first time.
"""
lf = lockfile.FileLock(os.path.join(ctx.obj["workdir"], "bot_z"))
logger.debug("Daemon starting. Lockfile: %s", lf)
proxy_tuple = None
if proxy:
proxy_tuple = tuple(proxy.split(":"))
daemon_process(
fifo_path=ctx.obj["fifo"],
working_dir=ctx.obj["workdir"],
umask=umask,
pidfile=lf,
base_uri=baseuri,
timeout=timeout,
proxy=proxy_tuple,
headless=ctx.obj["headless"],
debug=ctx.obj["debug"],
foreground=foreground,
)
if not foreground:
logger.info("Daemon started.")
@cli.command("list")
@click.option(
"-s",
"--status",
is_flag=True,
default=False,
help="Show also the status of each client.",
)
@click.pass_context
def list_command(ctx: click.Context, status: bool) -> None:
"""
Lists the clients attached
"""
logger.debug('Invoked the "list" command.')
with PLifo(ctx.obj["lifo"]) as lifo:
if len(lifo) == 0:
logger.info("No clients.")
return
for client in lifo.all():
logger.info(client)
if status:
_status_command(ctx, client)
@cli.command("start")
@click.option(
"-n",
"--name",
type=click.STRING,
prompt="Give the instance a name",
help="The daemon instance name.",
)
@click.pass_context
def start_command(ctx: click.Context, name: str) -> None:
"""
Starts a client.
"""
logger.debug("Sending the start command down the pipe: %s", ctx.obj["fifo"])
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="start"))
logger.info("Start sent.")
@cli.command("stop-daemon")
@click.pass_context
def stop_daemon_command(ctx: click.Context) -> None:
"""
Stops all the clients and shuts down the daemon.
"""
logger.debug("Sending the stop-daemon command down the pipe: %s", ctx.obj["fifo"])
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name="", cmd="stop-daemon"))
logger.info("Stop-daemon sent.")
@cli.command("stop")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def stop_command(ctx: click.Context, name: T.Optional[str]) -> None:
"""
Stops a client.
"""
logger.debug("Sending the stop command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logger.info("Stopping instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="stop"))
logger.info("Stop sent.")
def prompt_for_creds(param: str, hidden: bool = False) -> str:
if hidden:
return getpass("Insert {}: ".format(param))
return input("Insert {}: ".format(param))
@cli.command("login")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.option(
"-c",
"--credfile",
default=None,
type=click.File(),
help="The file containing username and password, on one line each.",
)
@click.option("-u", "--username", default=None, help="The username to login with.")
@click.option(
"-p",
"--password",
default=None,
help="[USE ONLY WHEN DEBUGGING!] The password to login with. Use --credfile.",
)
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force logout, bypass login check.",
)
@click.pass_context
def login_command(
ctx: click.Context,
name: T.Optional[str],
username: T.Optional[str],
password: T.Optional[str],
credfile: T.Optional[T.TextIO],
force: bool,
) -> None:
"""
Logs a client in using the provided credentials.
"""
no_userpass = username is None or password is None
if credfile is not None and no_userpass:
username = credfile.readline()
password = credfile.readline()
elif credfile is not None and not no_userpass:
logger.warning("Ignoring command line provided username and password.")
elif credfile is None and no_userpass:
logger.warning("Missing username or password and credfile.")
if username is None:
username = prompt_for_creds("username")
if password is None:
password = prompt_for_creds("password", hidden=True)
else:
logger.warning("Do not use command line provided password in production!")
logger.debug("Sending the login command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logger.info('Logging in on instance "%s" with username "%s".', name, username)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(
cmd_marshal(
name=name,
cmd="login",
username=username,
password=password,
force=force,
)
)
logger.info("Login sent.")
@cli.command("logout")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force logout, bypass login check.",
)
@click.pass_context
def logout_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None:
"""
Logs a logged in client out.
"""
logger.debug("Sending the logout command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logger.info("Logging out on instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="logout", force=force))
logger.info("Logout sent.")
@cli.command("checkin")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force logout, bypass login check.",
)
@click.pass_context
def checkin_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None:
"""
Checks in on a logged in client.
"""
logger.debug("Sending the check in command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logger.info("Checking in on instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="checkin", force=force))
logger.info("Check in sent.")
@cli.command("checkout")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force logout, bypass login check.",
)
@click.pass_context
def checkout_command(ctx: click.Context, name: T.Optional[str], force: bool) -> None:
"""
Checks out on a logged in client and checked in client.
"""
logger.debug("Sending the check out command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
logger.info("Checking out on instance: %s", name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="checkout", force=force))
logger.info("Check out sent.")
@cli.command("reload")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def reload_command(ctx: click.Context, name: T.Optional[str]) -> None:
"""
Resets a client to a clean state (discards the session).
"""
logger.debug("Sending the reload command down the pipe: %s", ctx.obj["fifo"])
name = _check_name(ctx.obj["lifo"], name)
with open(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="reload"))
logger.info("Reload sent.")
@cli.command("status")
@click.option("-n", "--name", default=None, help="The instance to interact with.")
@click.pass_context
def status_command(ctx: click.Context, name: T.Optional[str]) -> None:
"""
Displays basic info on the state of a client.
"""
try:
name = _check_name(ctx.obj["lifo"], name)
except IndexError:
if len(PLifo.All(ctx.obj["lifo"])) != 0:
raise
logger.warning("No clients registered.")
return
_status_command(ctx, name)
def _status_command(ctx: click.Context, name: str) -> None:
rfifo_path = os.path.join(ctx.obj["workdir"], "rfifo-{}".format(id(ctx)))
logger.debug("Awaiting response on fifo: %s", rfifo_path)
try:
os.mkfifo(rfifo_path)
logger.debug("Response fifo newly created.")
except OSError as err:
if err.errno != errno.EEXIST:
raise
logger.debug("Response fifo exists.")
with Fifo(ctx.obj["fifo"], "w") as fifo:
fifo.write(cmd_marshal(name=name, cmd="status", rfifo_path=rfifo_path))
logger.debug("Awaiting response...")
done = False
while not done:
try:
with open(rfifo_path, "r") as rfifo:
resp = rfifo.read()
done = True
except FileNotFoundError as e:
logger.warning("File not found: %s", e)
pass
logger.info(resp)
try:
os.remove(rfifo_path)
except OSError as e:
logger.warning("Failed removing response fifo %s: %s", rfifo_path, e)
pass
if __name__ == "__main__":
main()
cli()

View File

@ -0,0 +1,2 @@
class OperationFailed(RuntimeError):
pass

387
bot_z/operator.py 100644
View File

@ -0,0 +1,387 @@
# -*- coding: utf-8 -*-
"""
Operator is the main object that interacts with the foreign
service. It exposes methods to login, logout and check in and out.
"""
from datetime import datetime, timedelta
import logging
import os
import platform
import pkg_resources
import shutil
import sys
import time
import typing as T
from urllib.parse import urlparse
def path_from_platform() -> T.Text:
res = None
_plat = sys.platform
_arch = platform.architecture()
if _plat == "darwin":
return "macos"
if _plat == "win32":
res = "win"
elif _plat.startswith("linux"):
res = "linux"
if res is None:
raise RuntimeError("Platform unknown: {}".format(_plat))
if "64" in _arch[0]:
res += "64"
elif "32" in _arch[0]:
res += "32"
return res
geckoname = "geckodriver"
geckoexe = shutil.which(geckoname)
if geckoexe is None:
_plat_path = path_from_platform()
local_path = pkg_resources.resource_filename(
__name__, os.path.join("bin", _plat_path)
)
try:
if "win" in _plat_path:
geckoname = f"{geckoname}.exe"
os.stat(os.path.join(local_path, geckoname))
os.environ["PATH"] = os.environ["PATH"] + ":" + local_path
except FileNotFoundError:
print("Missing geckodriver executable in path", file=sys.stderr)
raise
from selenium import webdriver as wd # noqa
from selenium.common.exceptions import (
WebDriverException,
NoSuchElementException,
) # noqa
# TODO: read it from configuration.
RETRIES = 3
log_fmt = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(log_fmt)
logger = logging.getLogger(__name__)
logger.addHandler(console)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
logger.debug("Init at debug")
def safely(retries: int = 0) -> T.Callable:
retr = retries
def _safely(f: T.Callable) -> T.Callable:
ret = retr
def _protection(self, *args, **kwargs):
r = ret
while r > 0:
try:
val = f(self, *args, **kwargs)
logger.debug("Success executing %s", f.__name__)
self.switch_to.default_content()
return val
except WebDriverException as e:
self.logger.error(
"Something went wrong: %s [tentative #%s]", e, ret - r
)
r -= 1
time.sleep(2) # TODO: set value from config.
return _protection
return _safely
def _is_present(driver: wd.Firefox, xpath: str) -> bool:
try:
driver.find_element_by_xpath(xpath)
return True
except NoSuchElementException:
return False
def is_present(
driver: wd.Firefox, xpath: str, timeout: T.Optional[timedelta] = None
) -> bool:
"""
Helper function. If an element is present in the DOM tree,
returns true. False otherwise.
"""
if timeout is None:
return _is_present(driver, xpath)
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while _elapsed < timeout:
logger.debug("Not yet present: %s", xpath)
if _is_present(driver, xpath):
logger.debug("Present: %s", xpath)
return True
time.sleep(0.5)
_elapsed = datetime.now() - _now
return False
class Operator(wd.Firefox):
def __init__(
self,
base_uri: str,
name: str = None,
timeout: int = 20,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
*args,
**kwargs,
) -> None:
"""
Adds some configuration to Firefox.
"""
self.retries = RETRIES
self.profile = wd.FirefoxProfile()
# Do not send telemetry
self.profile.set_preference("datareporting.policy.dataSubmissionEnabled", False)
self.profile.set_preference("datareporting.healthreport.service.enabled", False)
self.profile.set_preference("datareporting.healthreport.uploadEnabled", False)
self.profile.set_preference("dom.webnotifications.enabled", False)
self.opts = wd.firefox.options.Options()
self.opts.headless = headless
self.debug = debug
self.base_uri = base_uri
self.uri = urlparse(base_uri)
self.timeout = timedelta(seconds=timeout)
if proxy:
self.profile.set_preference("network.proxy.type", 1)
self.profile.set_preference("network.proxy.http", proxy[0])
self.profile.set_preference("network.proxy.http_port", proxy[1])
self.profile.set_preference("network.proxy.ssl", proxy[0])
self.profile.set_preference("network.proxy.ssl_port", proxy[1])
super().__init__(
firefox_profile=self.profile, options=self.opts, *args, **kwargs
)
self.fullscreen_window()
self.z_name = name if name is not None else __name__
self.logger = logging.getLogger("{}.{}".format(__name__, self.name))
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.debug("Debug level")
self._logged_in = False
self.username = None
# Clean preceding session
self.delete_all_cookies()
# Fix incompatibility
if not hasattr(self, "switch_to_default_content"):
self.logger.debug("switch_to_default_content patched.")
self.switch_to_default_content = self.switch_to.default_content
@safely(RETRIES)
def login(self, user: str, password: str, force: bool = False) -> None:
"""
Do the login and proceed.
"""
self.logger.info("Logging in - user: %s - session: %s", user, self.name)
if self.logged_in:
self.logger.warning("Already logged in: %s", user)
if not force:
return
self.logger.info("Forcing login: %s", user)
# Retrieve login page if not yet on it
if self.base_uri not in self.current_url:
self.get(self.base_uri)
time.sleep(1)
_correct_url = "cpccchk" in self.current_url
_now = datetime.now()
_elapsed = timedelta(seconds=0)
while (
not is_present(self, '//input[contains(@id, "_Accedi")]') and _correct_url
):
self.logger.debug("Not yet on login page: %s", self.current_url)
time.sleep(0.5)
_correct_url = "cpccchk" in self.current_url
_elapsed = datetime.now() - _now
if _elapsed > self.timeout:
self.logger.error(
"Login page did not load properly: %s", self.current_url
)
return
self.logger.debug("After login get: %s", self.current_url)
time.sleep(1)
self.logger.info("Looking for fields in session %s", self.name)
# Username
user_form = self.find_element_by_name("m_cUserName")
# Password
pass_form = self.find_element_by_name("m_cPassword")
# Login button
login_butt = self.find_element_by_xpath('//input[contains(@id, "_Accedi")]')
# Compile and submit
user_form.send_keys(user)
pass_form.send_keys(password)
login_butt.submit()
time.sleep(5)
self.logger.debug("Login result: %s", self.title)
safe_counter = 0
while "Routine window" in self.title:
self.logger.debug("Reloading...")
self.refresh()
self.logger.info("Reloaded %s", self.name)
self.switch_to.alert.accept()
time.sleep(0.5)
if safe_counter > 5:
self.logger.error("Too many reloads. Aborting.")
raise RuntimeError("Too many reloads.")
safe_counter += 1
if is_present(self, '//a[contains(@class, "imgMenu_ctrl")]', self.timeout):
self._logged_in = True
self.logger.info("Login succeeded for user: %s", user)
else:
self.logger.error("Login failed: %s", user)
self.username = user
@safely(RETRIES)
def logout(self, force: bool = False) -> None:
"""
Do the logout.
"""
if not self._logged_in:
self.logger.warning("Not yet logged in")
if not force:
return
self.logger.info("Forcing logout: %s", self.username)
# Find the Profile menu and open it
profile_butt = self.find_element_by_xpath(
'//span[contains(@id, "imgNoPhotoLabel")]'
)
profile_butt.click()
time.sleep(1)
# Find the logout button
logout_butt = self.find_element_by_xpath('//input[@value="Logout user"]')
logout_butt.click()
if self._back_to_login():
self.logger.debug("Back on login page")
@safely(RETRIES)
def _back_to_login(self) -> bool:
_now = datetime.now()
_elapsed = timedelta(seconds=0)
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
while not _is_logout_page or _elapsed > self.timeout:
time.sleep(1)
self.logger.debug("Waiting to land on logout page...")
_is_logout_page = "jsp/usut_wapplogout_portlet.jsp" in self.current_url
_elapsed = datetime.now() - _now
if _is_logout_page:
self.logger.info("User successfully logged out: %s", self.username)
self.username = None
back_to_login_butt = self.find_element_by_xpath(
'//input[contains(@id, "_button15")]'
)
back_to_login_butt.click()
self.delete_all_cookies()
return True
else:
self.logger.warning("Logout failed: %s", self.username)
return False
@property
def logged_in(self) -> bool:
"""
Check if already logged in. Checks if page is '/jsp/home.jsp'
and if login cookie is set (and not expired).
"""
_base_domain = ".".join(self.uri.netloc.split(".")[-2:])
cookies = [c["name"] for c in self.get_cookies() if _base_domain in c["domain"]]
self.logger.debug("Cookies: %s", cookies)
# _right_url = "/jsp/home.jsp" in self.current_url
_cookies = all(c in cookies for c in ("spcookie", "JSESSIONID", "ipclientid"))
return _cookies
def _switch_to_container(self) -> None:
try:
iframe = self.find_element_by_xpath(
'//iframe[contains(@id, "gsmd_container.jsp")]'
)
self.switch_to.frame(iframe)
except NoSuchElementException:
pass
def get_movements(self) -> T.List[T.Optional[T.Tuple[T.Text, T.Text]]]:
self._switch_to_container()
try:
result = [] # type: T.List[T.Tuple[T.Text]]
movements_table = self.find_elements_by_xpath(
'//div[contains(@class, "ushp_wterminale_container")]//div[@ps-name="Grid2"]//table/tbody/tr'
)
for row in movements_table:
data = row.text.strip().split("\n") # type: T.List[T.Text]
result.append( # type: ignore
tuple(i.strip() for i in data if i.strip())
)
except NoSuchElementException:
result = []
finally:
self.switch_to_default_content()
return result # type: ignore
@property
def checked_in(self) -> bool:
"""
Check if the user is checked in already.
"""
if not self.logged_in:
return False
dates = self.get_movements()
if not dates:
return False
if dates[-1][0] == "Entrata":
return True
return False
@safely(RETRIES)
def check_in(self, force: bool = False) -> None:
"""
Click the check in button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if self.checked_in:
self.logger.warn("Already checked in!")
if not force:
return
self._switch_to_container()
enter_butt = self.find_element_by_xpath('//input[@value="Entrata"]')
enter_butt.click()
self.switch_to_default_content()
@safely(RETRIES)
def check_out(self, force: bool = False) -> None:
"""
Click the check out button.
"""
if not force and not self.logged_in:
self.logger.warning("Not logged in!")
return
if not self.checked_in:
self.logger.warn("Not yet checked in!")
if not force:
return
self._switch_to_container()
exit_butt = self.find_element_by_xpath('//input[@value="Uscita"]')
exit_butt.click()
self.switch_to_default_content()
def __del__(self) -> None:
self.quit()

184
bot_z/utils.py 100644
View File

@ -0,0 +1,184 @@
#! -*- encoding: utf-8 -*-
"""Utilities to better handle the project."""
import errno
import logging
import json
import os
import sys
import typing as T
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
sh = logging.StreamHandler(stream=sys.stdout)
cli_filter = logging.Filter(__name__)
sh.addFilter(cli_filter)
sh.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(sh)
ENC = json.JSONEncoder()
DEC = json.JSONDecoder()
def cmd_marshal(name: str, cmd: str, **kwargs) -> str:
"""
Serializes a command (a python dict) in a JSON object string.
"""
cmd_struct = dict(name=name, cmd=cmd, **kwargs)
logger.debug("Command to be marshalled: %s", cmd_struct)
return ENC.encode(cmd_struct)
def cmd_unmarshal(cmd: str) -> T.Dict[str, T.Any]:
"""
Deserializes a command (a python dict) in a JSON object string.
"""
logger.debug("Command to be unmarshalled: %s", cmd)
return DEC.decode(cmd)
class Fifo:
"""
Iterator to continuously read the command fifo.
Supports also write operations if opened with mode="w".
"""
def __init__(self, fifopath: str, mode: str = "r") -> None:
try:
os.mkfifo(fifopath)
logger.info("Control pipe opened at: %s", fifopath)
except OSError as err:
if err.errno != errno.EEXIST:
logger.critical("Could not open control pipe at: %s", fifopath)
raise
self.path = fifopath
self.fh = open(fifopath, mode)
def __iter__(self): # pragma: noqa
return self
def __next__(self) -> str:
data = self.fh.read()
# This is a never ending iterator. We should exit the
# loop independently.
return data
def __del__(self) -> None:
self.fh.close()
def __enter__(self) -> T.TextIO:
return self.fh
def __exit__(self, *args) -> None:
self.fh.close()
def write(self, data) -> None:
logger.debug("%s ~ Writing: %s", self.path, data)
self.fh.write(data)
self.fh.flush()
class PLifo:
"""
Implements an on-disk persistent LIFO.
Casts everything to str.
"""
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
with open(path, "w") as lifo:
lifo.write("")
self.content = []
self.fh = None
def __enter__(self) -> T.TextIO:
with open(self.path, "r") as fr:
self.content = [line.strip("\n") for line in fr.readlines()]
self.fh = open(self.path, "w")
return self
def __exit__(self, *args) -> None:
for line in self.content:
self.fh.write("{}\n".format(line.strip("\n")))
self.fh.close()
self.fh = None
def push(self, value: T.Any) -> None:
"""
Pushes on the stack. It is an in-memory structure.
The variations are written on disk when exiting the context.
"""
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
self.content.append(str(value))
def pop(self) -> str:
"""
Pops on the stack. It is an in-memory structure.
The variations are written on disk when exiting the context.
"""
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content.pop()
@classmethod
def Push(cls, path: str, value: T.Any) -> None:
"""
Deploys the context and pushes onto the stack, then exits the
context.
"""
with cls(path) as _lifo:
_lifo.push(value)
@classmethod
def Pop(cls, path: str) -> str:
"""
Deploys the context and pops from the stack, then exits the
context.
"""
with cls(path) as _lifo:
_value = _lifo.pop()
return _value
def all(self) -> T.Iterable[str]:
"""
Returns
"""
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content
@classmethod
def All(cls, path: str) -> T.Iterable[str]:
"""
Deploys the context and returns the full list of elements,
then exits the context.
"""
with cls(path) as _lifo:
return _lifo.content
def __contains__(self, *args) -> bool:
return [el in self.content for el in args]
def __len__(self) -> int:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return len(self.content)
def __iter__(self) -> T.Iterable:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content
def __next__(self) -> str:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return next(self.content)
def last(self) -> str:
if self.fh is None:
raise RuntimeError("Called out of context. Open the PLifo firts.")
return self.content[-1]

344
bot_z/zdaemon.py 100644
View File

@ -0,0 +1,344 @@
# -*- coding: utf-8 -*-
"""Daemon for local execution of commands."""
import functools
import json
import lockfile
import logging
import logging.handlers
import os
import queue
import signal
import sys
import threading
import time
import typing as T
from bot_z.operator import Operator
from bot_z.utils import Fifo, PLifo, cmd_marshal, cmd_unmarshal
import daemon
from selenium.webdriver.common.keys import Keys
daemon_format = logging.Formatter("%(levelname)s: [%(name)s] -> %(message)s")
console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(daemon_format)
console.addFilter(logging.Filter(__name__))
syslog = logging.handlers.SysLogHandler(address="/dev/log")
syslog.setFormatter(daemon_format)
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("BOTZ_LOGLEVEL", logging.INFO))
logger.addHandler(console)
logger.addHandler(syslog)
logger.debug("Init at debug")
class StopDaemon(Exception):
"""Auxiliary exception to help stop the program in daemon mode."""
pass
def start_daemon(
name: str,
base_uri: str,
timeout: int = None,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
) -> Operator:
"""
Spawns the Operator object.
"""
optional_args: T.Dict[str, T.Any] = {}
if timeout is not None:
optional_args["timeout"] = timeout
if proxy is not None:
optional_args["proxy"] = proxy
optional_args["headless"] = headless
optional_args["debug"] = debug
logger.info("Started Operator.")
logger.debug(
"Operator parameters: "
"{base_uri: %s, name: %s, timeout: %s, proxy: %s, headless: %s, debug: %s}",
base_uri,
name,
timeout,
proxy,
headless,
debug,
)
return Operator(base_uri=base_uri, name=name, **optional_args)
def operator_drop(op: Operator) -> None:
"""
Stops the Operator.
"""
logger.debug("Stopping Operator...")
logger.debug("Operator id: %s", id(op))
op.quit()
logger.info("Stopped Operator.")
def operator_reload(op: Operator) -> None:
"""
Reload the Operator using the same object.
"""
op_focus = op.switch_to.active_element
op_focus.send_keys(Keys.CONTROL + "Escape")
op.delete_all_cookies()
logger.info("Operator session cleaned.")
def operator_status(op: Operator, debug: bool = False) -> T.Dict[str, str]:
"""
Get the current operator status.
"""
status = {} # type: T.Dict[str, str]
status["is_logged_in"] = str(op.logged_in)
status["is_checked_in"] = str(op.checked_in)
if debug:
status["interal_state"] = str(dir(op))
logger.info("Status: %s", status)
return status
def operator_login(op: Operator, username: str, password: str, force: bool) -> None:
"""
Instructs the operator to perform login.
"""
logger.debug("Performing login...")
op.login(username, password, force)
logger.info("Login done.")
def operator_logout(op: Operator, force: bool) -> None:
"""
Instructs the operator to perform logout.
"""
logger.debug("Performing logout...")
op.logout(force)
logger.info("Logout done.")
def operator_checkin(op: Operator, force: bool) -> None:
"""
Instructs the operator to perform the check in.
"""
logger.debug("Performing check in...")
op.check_in(force)
logger.info("Check in done.")
def operator_checkout(op: Operator, force: bool) -> None:
"""
Instructs the operator to perform the check out.
"""
logger.debug("Performing check out...")
op.check_out(force)
logger.info("Check out done.")
def daemon_process(
fifo_path: str,
working_dir: str,
umask: int,
pidfile: lockfile.FileLock,
base_uri: str,
timeout: int = None,
proxy: T.Optional[T.Tuple[str, int]] = None,
headless: bool = True,
debug: bool = False,
foreground: bool = False,
) -> None:
"""
The daemon function.
"""
if debug:
logger.setLevel(logging.DEBUG)
lifo_path = os.path.join(working_dir, "botz_open_daemons.list")
if os.path.exists(lifo_path):
logger.warning("Lifo (%s) exists. Removing!", lifo_path)
os.remove(lifo_path)
starter = functools.partial(
start_daemon,
base_uri=base_uri,
timeout=timeout,
proxy=proxy,
headless=headless,
debug=debug,
)
def _stop_all() -> None:
logger.debug("SIGTERM...stopping all the clients...")
clients = PLifo.All(lifo_path)
with Fifo(fifo_path) as fifo:
for client in clients:
fifo.write(cmd_marshal(name=client, cmd="stop"))
def _reload_all() -> None:
logger.debug("SIGUSR1 received here.")
clients = PLifo.All(lifo_path)
with Fifo(fifo_path) as fifo:
for client in clients:
fifo.write(cmd_marshal(name=client, cmd="reload"))
def _list_all() -> None:
logger.debug("SIGUSR2 received here.")
clients = PLifo.All(lifo_path)
logger.info("Client list: %s", clients)
context = daemon.DaemonContext(
working_directory=working_dir, umask=umask, pidfile=pidfile
)
logger.debug("context defined")
context.signal_map = {
signal.SIGTERM: _stop_all,
signal.SIGHUP: "terminate",
signal.SIGUSR1: _reload_all,
signal.SIGUSR2: _list_all,
}
logger.debug("signal map defined")
cmd_map = {
"start": starter,
"stop": operator_drop,
"reload": operator_reload,
"status": operator_status,
"login": operator_login,
"logout": operator_logout,
"checkin": operator_checkin,
"checkout": operator_checkout,
}
logger.debug("command map defined")
if foreground:
logger.debug("Started in foreground")
try:
listen_commands(lifo_path, fifo_path, cmd_map)
except KeyboardInterrupt:
logger.info("Process killed!")
finally:
os.remove(lifo_path)
return
with context:
# This is necessary to trigger the daemon start.
time.sleep(1)
logger.info("Started in background")
try:
listen_commands(lifo_path, fifo_path, cmd_map)
except StopDaemon:
os.remove(lifo_path)
return
def listen_client(
name: str, chan: queue.Queue, cmd_map: T.Dict[str, T.Callable]
) -> None:
"""
The listen loop for an operator instance.
"""
op = cmd_map["start"](name)
logger.debug("Started operator id: %s", id(op))
while True:
cmd = chan.get()
if cmd["name"] != name:
log.warning("Command sent to the wrong instance: %s\n%s", name, cmd)
continue
if cmd["cmd"] == "start":
logger.error("Alredy started: %s", name)
continue
if cmd["cmd"] == "stop":
logger.debug("Received command: stop")
cmd_map["stop"](op)
return
elif cmd["cmd"] == "reload":
logger.debug("Received command: reload")
cmd_map["reload"](op)
elif cmd["cmd"] == "status":
logger.debug("Received command: status")
status = cmd_map["status"](op)
logger.debug("Status: %s", status)
with Fifo(cmd["rfifo_path"], "w") as rfifo:
rfifo.write(cmd_marshal(name=cmd["name"], cmd=status))
elif cmd["cmd"] == "login":
logger.debug("Received command: login")
cmd_map["login"](op, cmd["username"], cmd["password"], cmd["force"])
elif cmd["cmd"] == "logout":
logger.debug("Received command: logout")
cmd_map["logout"](op, cmd["force"])
elif cmd["cmd"] == "checkin":
logger.debug("Received command: checkin")
cmd_map["checkin"](op, cmd["force"])
elif cmd["cmd"] == "checkout":
logger.debug("Received command: checkout")
cmd_map["checkout"](op, cmd["force"])
def listen_commands(
lifopath: str, fifopath: str, cmd_map: T.Dict[str, T.Callable]
) -> None:
"""
The main loop to listen incoming commands.
"""
client_chans = {}
while True:
with Fifo(fifopath) as fifo:
for cmd_str in fifo:
logger.debug("Command received in main loop: %s", cmd_str)
cmd = cmd_unmarshal(cmd_str)
logger.debug("Cmd unmarshalled: %s", cmd)
if cmd["cmd"] == "start":
if cmd["name"] not in client_chans:
client_chans[cmd["name"]] = queue.Queue(1)
logger.debug('Queue created for "%s".', cmd["name"])
if cmd["name"] in PLifo.All(lifopath):
logger.warning("Name %s is yet used. Not proceeding.", name)
continue
logger.debug(
'"%s" being pushed onto "%s"...', cmd["name"], lifopath
)
PLifo.Push(lifopath, cmd["name"])
logger.debug("Client %s has been newly created.", cmd["name"])
chan = client_chans.get(cmd["name"], None)
logger.debug("Starting new client in a thread...")
client = threading.Thread(
name=cmd["name"],
target=functools.partial(
listen_client, name=cmd["name"], chan=chan, cmd_map=cmd_map
),
)
client.start()
logger.debug('Client "%s" started.', cmd["name"])
continue
if cmd["cmd"] == "stop-daemon":
stop_all(client_chans)
logger.info("Daemon clients stopped. Exiting...")
raise StopDaemon
logger.debug("Opening client: %s", cmd["name"])
chan = client_chans.get(cmd["name"], None)
if chan is None:
logger.warning(
'No client found with name "%s". Skipping.', cmd["name"]
)
continue
chan.put(cmd)
def stop_all(client_chans: T.Dict[str, queue.Queue]) -> None:
"""
Send the stop command to all the clients.
"""
for name, chan in client_chans.items():
logger.debug('Stopping "%s"...', name)
chan.put({"name": name, "cmd": "stop"})
logger.info("Stopped %s.", name)

View File

@ -20,7 +20,7 @@ import os
# directory, add these directories to sys.path here. If the directory is
# relative to the documentation root, use os.path.abspath to make it
# absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# sys.path.insert(0, os.path.abspath('.'))
# Get the project root dir, which is the parent dir of this
cwd = os.getcwd()
@ -36,27 +36,26 @@ import bot_z
# -- General configuration ---------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.viewcode']
extensions = ["sphinx.ext.autodoc", "sphinx.ext.viewcode"]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The suffix of source filenames.
source_suffix = '.rst'
source_suffix = ".rst"
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
master_doc = "index"
# General information about the project.
project = u'Bot_Z'
copyright = u"2019, Leonardo Barcaroli"
project = u"Bot_Z"
# The version info for the project you're documenting, acts as replacement
# for |version| and |release|, also used in various other places throughout
@ -69,126 +68,126 @@ release = bot_z.__version__
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# language = None
# There are two options for replacing |today|: either, you set today to
# some non-false value, then it is used:
#today = ''
# today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build']
exclude_patterns = ["_build"]
# The reST default role (used for this markup: `text`) to use for all
# documents.
#default_role = None
# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# modindex_common_prefix = []
# If true, keep warnings as "system message" paragraphs in the built
# documents.
#keep_warnings = False
# keep_warnings = False
# -- Options for HTML output -------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
html_theme = "default"
# Theme options are theme-specific and customize the look and feel of a
# theme further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# html_title = None
# A shorter title for the navigation bar. Default is the same as
# html_title.
#html_short_title = None
# html_short_title = None
# The name of an image file (relative to this directory) to place at the
# top of the sidebar.
#html_logo = None
# html_logo = None
# The name of an image file (within the static path) to use as favicon
# of the docs. This file should be a Windows icon file (.ico) being
# 16x16 or 32x32 pixels large.
#html_favicon = None
# html_favicon = None
# Add any paths that contain custom static files (such as style sheets)
# here, relative to this directory. They are copied after the builtin
# static files, so a file named "default.css" will overwrite the builtin
# "default.css".
html_static_path = ['_static']
html_static_path = ["_static"]
# If not '', a 'Last updated on:' timestamp is inserted at every page
# bottom, using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names
# to template names.
#html_additional_pages = {}
# html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer.
# Default is True.
#html_show_sphinx = True
# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer.
# Default is True.
#html_show_copyright = True
# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages
# will contain a <link> tag referring to it. The value of this option
# must be the base URL from which the finished HTML is served.
#html_use_opensearch = ''
# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'bot_zdoc'
htmlhelp_basename = "bot_zdoc"
# -- Options for LaTeX output ------------------------------------------
@ -196,10 +195,8 @@ htmlhelp_basename = 'bot_zdoc'
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
}
@ -208,44 +205,38 @@ latex_elements = {
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
('index', 'bot_z.tex',
u'Bot_Z Documentation',
u'Leonardo Barcaroli', 'manual'),
("index", "bot_z.tex", u"Bot_Z Documentation", u"Leonardo Barcaroli", "manual")
]
# The name of an image file (relative to this directory) to place at
# the top of the title page.
#latex_logo = None
# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings
# are parts, not chapters.
#latex_use_parts = False
# latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# latex_show_urls = False
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# latex_domain_indices = True
# -- Options for manual page output ------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'bot_z',
u'Bot_Z Documentation',
[u'Leonardo Barcaroli'], 1)
]
man_pages = [("index", "bot_z", u"Bot_Z Documentation", [u"Leonardo Barcaroli"], 1)]
# If true, show URL addresses after external links.
#man_show_urls = False
# man_show_urls = False
# -- Options for Texinfo output ----------------------------------------
@ -254,22 +245,25 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'bot_z',
u'Bot_Z Documentation',
u'Leonardo Barcaroli',
'bot_z',
'One line description of project.',
'Miscellaneous'),
(
"index",
"bot_z",
u"Bot_Z Documentation",
u"Leonardo Barcaroli",
"bot_z",
"One line description of project.",
"Miscellaneous",
)
]
# Documents to append as an appendix to all manuals.
#texinfo_appendices = []
# texinfo_appendices = []
# If false, no module index is generated.
#texinfo_domain_indices = True
# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#texinfo_show_urls = 'footnote'
# texinfo_show_urls = 'footnote'
# If true, do not generate a @detailmenu in the "Top" node's menu.
#texinfo_no_detailmenu = False
# texinfo_no_detailmenu = False

34
entrypoint.sh 100755
View File

@ -0,0 +1,34 @@
#!/bin/bash
CONF="/tmp/botz.yaml"
if [ -z ${DEBUG} ]; then
_DEBUG="false"
else
_DEBUG="true"
fi
if [ -z ${BASE_URI} ]; then
echo "Missing BASE_URI environment variable."
exit 1
fi
cat > ${CONF} <<-EOF
base_uri: ${BASE_URI}
debug: ${_DEBUG}
log:
level: ${DEBUG_LEVEL:-INFO}
syslog: false
http:
bind_addr:
- "0.0.0.0"
port: 3003
EOF
echo "--------------------------------------------"
echo " STARTING WITH CONFIG:"
echo "--------------------------------------------"
cat ${CONF}
echo "--------------------------------------------"
z_app -c ${CONF}

View File

@ -1,15 +1,24 @@
[bumpversion]
current_version = 0.1.0
current_version = 1.1.3
commit = True
tag = True
sign_tags = True
[bumpversion:file:setup.py]
search = version='{current_version}'
replace = version='{new_version}'
search = VERSION = "{current_version}"
replace = VERSION = "{new_version}"
[bumpversion:file:bot_z/__init__.py]
search = __version__ = '{current_version}'
replace = __version__ = '{new_version}'
search = __version__ = "{current_version}"
replace = __version__ = "{new_version}"
[bumpversion:file:Dockerfile]
search = LABEL io.troubles.botz.version="{current_version}"
replace = LABEL io.troubles.botz.version="{new_version}"
[bumpversion:file:Makefile]
search = VERSION = {current_version}
replace = VERSION = {new_version}
[bdist_wheel]
universal = 0
@ -18,4 +27,4 @@ universal = 0
exclude = docs
[aliases]
# Define setup.py command aliases here

238
setup.py
View File

@ -4,42 +4,98 @@
"""The setup script."""
from collections import namedtuple
from distutils.dir_util import copy_tree, mkpath
import glob
from html.parser import HTMLParser
import json
import os
import pkg_resources
from setuptools import setup, find_packages # noqa
from setuptools.command.develop import develop # noqa
from setuptools.command.install import install # noqa
from setuptools.command.bdist_egg import bdist_egg # noqa
from pprint import pformat
from setuptools import setup, find_packages # type: ignore
from setuptools.command.develop import develop # type: ignore
from setuptools.command.install import install # type: ignore
from setuptools.command.bdist_egg import bdist_egg # type: ignore
import shutil
import subprocess
import sys
import tarfile
import typing as T
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from wheel.bdist_wheel import bdist_wheel # noqa
from wheel.bdist_wheel import bdist_wheel # type: ignore
import zipfile
GECKO_RELEASE_PATH = "https://github.com/mozilla/geckodriver"
PKG_NAME = 'bot_z'
VERSION = '0.1.0'
AUTHOR = 'blallo'
AUTHOR_EMAIL = 'blallo@autistici.org'
BIN_PATH = 'bin/geckodriver'
PKG_NAME = "bot_z"
VERSION = "1.1.3"
AUTHOR = "blallo"
AUTHOR_EMAIL = "blallo@autistici.org"
BIN_PATH = "bin/geckodriver"
ASSET_FILE_PATHS = [
"assets/*",
"assets/static/*",
"assets/static/css/*",
"assets/static/js/*",
]
with open('README.md') as readme_file:
with open("README.md") as readme_file:
readme = readme_file.read()
requirements = [
'Click>=6.0',
'selenium>=3.141.0',
"Click>=7.0",
"selenium>=3.141.0",
"lockfile>=0.12.2",
"python-daemon>=2.2.3",
"aiohttp>=3.5.4",
"aiodns>=2.0.0",
"cchardet",
"aiohttp-session==2.7.0",
"cryptography==2.7",
"PyYAML==5.1.2",
"passlib>=1.7.1, <2.0.0",
"bcrypt>=3.0.0",
]
setup_requirements = [
] # type: T.List[str]
setup_requirements = [] # type: T.List[str]
test_requirements = [
] # type: T.List[str]
test_requirements = [] # type: T.List[str]
def _find_javascript_pkgmgr():
pkgmgr_list = ("yarn", "npm")
for _pkgmgr in pkgmgr_list:
if shutil.which(_pkgmgr):
pkgmgr = _pkgmgr
break
if not pkgmgr:
raise RuntimeError(
"Missing javascript package manager. Allowed are: {}".format(pkgmgr_list)
)
return pkgmgr
def _find_built_assets(base_path: str):
asset_manifest = os.path.join(base_path, "build", "asset-manifest.json")
with open(asset_manifest) as f:
manifest = json.load(f)
return manifest.get("files")
def build_web():
pkgmgr = _find_javascript_pkgmgr()
webpath = "./bot.z_web" # TODO: find base path
script = f"cd {webpath} && {pkgmgr} build"
print("[BUILD_WEB] running script:\n\t{}".format(script))
subprocess.check_call(["bash", "-c", script])
assets = _find_built_assets(webpath)
print("[BUILD_WEB] built assets: {}".format(pformat(assets)))
assets_path = pkg_resources.resource_filename("api", "assets")
if not os.path.exists(assets_path):
mkpath(assets_path)
copy_tree(os.path.join(webpath, "build"), assets_path)
class GitTags(HTMLParser):
@ -48,7 +104,7 @@ class GitTags(HTMLParser):
def handle_starttag(self, tag, attrs):
dattrs = dict(attrs)
if 'commit-title' in dattrs.get('class', ''):
if "commit-title" in dattrs.get("class", ""):
self.take_next = 1
def handle_data(self, data):
@ -57,7 +113,7 @@ class GitTags(HTMLParser):
elif self.take_next == 1:
self.take_next = 2
elif self.take_next == 2:
self.tags.append(data.strip('\n').strip(' ').strip('\n'))
self.tags.append(data.strip("\n").strip(" ").strip("\n"))
self.take_next = 0
@ -83,9 +139,9 @@ def find_latest_version(url: str) -> str:
"""
Retrieves latest geckodriver tag.
"""
tag_page = retrieve_page('{}/tags'.format(url))
tag_page = retrieve_page("{}/tags".format(url))
gt = GitTags()
gt.feed(tag_page.decode('utf-8'))
gt.feed(tag_page.decode("utf-8"))
gt.tags.sort()
return gt.tags[-1]
@ -108,32 +164,50 @@ def ensure_local_folder() -> None:
pkg_resources.ensure_directory(bin_path)
PLATFORM_MAP = {
"win32": "win32",
"win64": "win64",
"linux32": "linux32",
"linux64": "linux64",
"darwin64": "macos",
"darwin32": "macos", # This is impossible (?)
"macos": "macos",
}
def _identify_platform():
s_platform = sys.platform
if s_platform == "darwin":
return "macos"
is_64bits = sys.maxsize > 2 ** 32
if "linux" in s_platform:
plat = "linux"
elif "win" in s_platform:
plat = "win"
if is_64bits:
platform = "{}64".format(plat)
else:
platform = "{}32".format(plat)
return platform
def assemble_driver_uri(
version: T.Optional[str]=None,
platform: T.Optional[str]=None
) -> str:
version: T.Optional[str] = None, platform: T.Optional[str] = None
) -> str:
"""
Selects the right geckodriver URI.
"""
# TODO: use pkg_resources.get_platform()
if not version:
version = find_latest_version(GECKO_RELEASE_PATH)
if not platform:
s_platform = sys.platform
is_64bits = sys.maxsize > 2**32
if is_64bits:
platform = '{}64'.format(s_platform)
else:
platform = '{}64'.format(s_platform)
if 'win' in platform:
ext = 'zip'
if platform is None:
platform = _identify_platform()
if "win" in platform:
ext = "zip"
else:
ext = 'tar.gz'
return '{base}/releases/download/{vers}/geckodriver-{vers}-{platform}.{ext}'.format(
base=GECKO_RELEASE_PATH,
vers=version,
platform=platform,
ext=ext
ext = "tar.gz"
return "{base}/releases/download/{vers}/geckodriver-{vers}-{platform}.{ext}".format(
base=GECKO_RELEASE_PATH, vers=version, platform=PLATFORM_MAP[platform], ext=ext
)
@ -141,18 +215,18 @@ def download_driver_bin(uri: str, path: str) -> None:
"""
Donwloads the geckodriver binary.
"""
name = uri.split('/')[-1]
name = uri.split("/")[-1]
filepath = os.path.join(path, name)
print("[DRIVER] downloading '{}' to {}".format(uri, filepath))
content = retrieve_page(uri)
try:
with open(filepath, 'wb') as f:
with open(filepath, "wb") as f:
f.write(content)
if name.endswith(".zip"):
with zipfile.ZipFile(filepath, 'r') as z:
with zipfile.ZipFile(filepath, "r") as z:
z.extractall(path)
elif name.endswith(".tar.gz"):
with tarfile.open(filepath, 'r') as r:
with tarfile.open(filepath, "r") as r:
r.extractall(path)
else:
raise RuntimeError("Unrecognized file extension: %s", name)
@ -160,18 +234,32 @@ def download_driver_bin(uri: str, path: str) -> None:
os.remove(filepath)
def preinstall(platform: T.Optional[str]=None) -> None:
def preinstall(platform: T.Text, build_iface: bool = True) -> None:
"""
Performs all the postintallation flow, donwloading in the
right place the geckodriver binary.
"""
# target_path = os.path.join(os.path.abspath(os.path.curdir), 'bot_z', 'bin')
target_path = pkg_resources.resource_filename('bot_z', 'bin')
target_path = pkg_resources.resource_filename(
"bot_z", os.path.join("bin", platform)
)
if not os.path.exists(target_path):
mkpath(target_path)
ensure_local_folder()
version = os.environ.get('BOTZ_GECKO_VERSION')
version = os.environ.get("BOTZ_GECKO_VERSION")
gecko_uri = assemble_driver_uri(version, platform)
print("[POSTINSTALL] gecko_uri: {}".format(gecko_uri))
download_driver_bin(gecko_uri, target_path)
if build_iface:
build_web()
PLATS = {
"win32": "win32",
"win-amd64": "win64",
"manylinux1-i686": "linux32",
"manylinux1-x86_64": "linux64",
"macosx": "macos",
}
def translate_platform_to_gecko_vers(plat: str) -> str:
@ -182,16 +270,9 @@ def translate_platform_to_gecko_vers(plat: str) -> str:
if plat is None:
return None
PLATS = {
"win32": "win32",
"win-amd64": "win64",
"manylinux1-i686": "linux32",
"manylinux1-x86_64": "linux64",
"macosx": "macos",
}
try:
return PLATS[plat]
except KeyError as e:
except KeyError:
print("Allowed platforms are: {!r}".format(list(PLATS.keys())))
raise
@ -199,20 +280,23 @@ def translate_platform_to_gecko_vers(plat: str) -> str:
# From: https://stackoverflow.com/a/36902139
class CustomDevelopCommand(develop):
"""Custom installation for development mode."""
def run(self):
print("POSTINSTALL")
preinstall()
platform = _identify_platform()
preinstall(platform, build_iface=False)
super().run()
class CustomInstallCommand(install):
"""Custom installation for installation mode."""
def run(self):
opts = self.distribution.get_cmdline_options()
platform = None
if 'bdist_wheel' in opts:
if "bdist_wheel" in opts:
platform = translate_platform_to_gecko_vers(
opts['bdist_wheel'].get('plat-name')
opts["bdist_wheel"].get("plat-name")
)
preinstall(platform)
super().run()
@ -224,13 +308,14 @@ class CustomBDistWheel(bdist_wheel):
Custom bdist_wheel command to ship the right binary
of geckodriver.
"""
def finalize_options(self):
super().finalize_options()
self.root_is_pure = False
def get_tag(self):
python, abi, plat = super().get_tag()
python, abi = 'py3', 'none'
python, abi = "py3", "none"
return python, abi, plat
@ -241,33 +326,28 @@ setup(
long_description=readme,
author=AUTHOR,
author_email=AUTHOR_EMAIL,
url='https://git.abbiamoundominio.org/blallo/BotZ',
packages=find_packages(include=['bot_z']),
url="https://git.abbiamoundominio.org/blallo/BotZ",
packages=find_packages(include=["bot_z", "api"]),
cmdclass={
'develop': CustomDevelopCommand,
'install': CustomInstallCommand,
'bdist_wheel': CustomBDistWheel,
"develop": CustomDevelopCommand,
"install": CustomInstallCommand,
"bdist_wheel": CustomBDistWheel,
},
entry_points={
'console_scripts': [
'bot_z=bot_z.cli:main'
]
},
package_data = {'bot_z': ['bin/geckodriver']},
include_package_data=True,
entry_points={"console_scripts": ["bot_z=bot_z.cli:cli", "z_app=api.app:cli"]},
package_data={"bot_z": ["bin/geckodriver"], "api": ASSET_FILE_PATHS},
install_requires=requirements,
license="GLWTS Public Licence",
zip_safe=False,
keywords='bot_z',
keywords="bot_z",
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: GLWTS Public Licence',
'Natural Language :: English',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"License :: GLWTS Public Licence",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
],
test_suite='pytest',
test_suite="pytest",
tests_require=test_requirements,
setup_requires=setup_requirements,
)

View File

@ -28,7 +28,7 @@ class TestBot_z(unittest.TestCase):
runner = CliRunner()
result = runner.invoke(cli.main)
assert result.exit_code == 0
assert 'bot_z.cli.main' in result.output
help_result = runner.invoke(cli.main, ['--help'])
assert "bot_z.cli.main" in result.output
help_result = runner.invoke(cli.main, ["--help"])
assert help_result.exit_code == 0
assert '--help Show this message and exit.' in help_result.output
assert "--help Show this message and exit." in help_result.output