Initial commit

master
valerio 2017-09-30 01:36:10 +02:00
commit c0be47764e
9 changed files with 777 additions and 0 deletions

102
.gitignore vendored 100644
View File

@ -0,0 +1,102 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
# macao-pos files
conf/
pos.db
pos.log*
*.swp
.idea

View File

@ -0,0 +1,39 @@
import os.path
from configparser import ConfigParser
import yaml
APP_NAME = 'pos'
CONFIG_PATHS = ['conf/', '~/.config/pos', '/usr/local/etc/pos', '/etc/pos']
CONFIG_FILES = {
'core': 'core.ini',
'logging': 'logging.yaml'
}
def get_default_path():
for p in CONFIG_PATHS:
if all([
os.path.isfile(os.path.join(p, f))
for f in CONFIG_FILES.values()
]):
return p
else:
raise Exception('Unable to find a configuration folder.')
class Config:
def __init__(self):
self.basedir = get_default_path()
self.path_core = os.path.join(self.basedir, CONFIG_FILES['core'])
self.path_logging = os.path.join(self.basedir, CONFIG_FILES['logging'])
self.core = ConfigParser()
self.logging = None
self.read()
def read(self):
self.core.read(self.path_core)
self.logging = yaml.load(open(self.path_logging, 'r'))

View File

@ -0,0 +1,156 @@
from datetime import datetime
from contextlib import contextmanager
import sqlalchemy as sa
import sqlalchemy.ext.declarative
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import Table, Column, ForeignKey
from sqlalchemy import Integer, String, Boolean, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy_utils import force_auto_coercion
from sqlalchemy_utils.types.password import PasswordType
from autogestionale.logging import get_logger
# The database URL must follow RFC 1738 in the form
# dialect+driver://username:password@host:port/database
ENGINE_GENERIC = "{engine}://{user}:{password}@{host}:{port}/{database}"\
"?charset=utf8"
ENGINE_SQLITE = "sqlite:///{path}"
ENGINE_SQLITE_MEMORY = "sqlite://"
PASSWORD_SCHEMES = ['pbkdf2_sha512']
Base = sqlalchemy.ext.declarative.declarative_base()
log = get_logger('database')
force_auto_coercion()
class Database:
"""
Handle database operations."
"""
Session = None
engine = None
def __init__(self, **kwargs):
"""
Initialize database connection.
:param engine: The SQLAlchemy database backend in the form
dialect+driver where dialect is the name of a SQLAlchemy
dialect (sqlite, mysql, postgresql, oracle or mssql) and
driver is the name of the DBAPI in all lowercase
letters. If driver is not specified the default DBAPI
will be imported if available.
:param path: Only for SQLite. Path to database. If not specified the
database will be kept in memory (should be used only for
testing).
:param host:
:param port:
:param database:
:param user:
:param password:
"""
if kwargs['engine'] == 'sqlite':
if 'path' in kwargs:
url = ENGINE_SQLITE.format(path=kwargs['path'])
else:
url = ENGINE_SQLITE_MEMORY
else:
url = ENGINE_GENERIC.format(**kwargs)
self.engine = sa.create_engine(url)
self.Session = sa.orm.sessionmaker(
bind=self.engine,
expire_on_commit=False
)
Base.metadata.create_all(self.engine)
@contextmanager
def get_session(self):
session = self.Session()
try:
yield session
except SQLAlchemyError as e:
log.critical("Error performing transaction:")
log.critical(e)
session.rollback()
else:
session.commit()
finally:
session.close()
class User(Base):
__tablename__ = 'users'
uid = Column(Integer, primary_key=True)
username = Column(String, nullable=False, unique=True)
password = Column(PasswordType(schemes=PASSWORD_SCHEMES), nullable=False)
is_active = Column(Boolean, nullable=False, server_default='1')
is_authenticated = Column(Boolean, nullable=False, server_default='0')
created_at = Column(DateTime, nullable=False, default=datetime.now)
def get_id(self):
return u'{}'.format(self.uid)
class Event(Base):
__tablename__ = 'events'
uid = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
starts_at = Column(DateTime, nullable=False, default=datetime.now)
ends_at = Column(DateTime)
created_at = Column(DateTime, nullable=False, default=datetime.now)
transactions = relationship('Transaction', lazy='joined')
class ProductCategory(Base):
__tablename__ = 'product_categories'
uid = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
sort = Column(Integer, nullable=False, server_default='0')
created_at = Column(DateTime, nullable=False, default=datetime.now)
products = relationship('Product', lazy='joined')
class Product(Base):
__tablename__ = 'products'
uid = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
price = Column(Integer, nullable=False)
sort = Column(Integer, nullable=False, server_default='0')
category_uid = Column(Integer, ForeignKey('product_categories.uid'),
nullable=False)
is_active = Column(Boolean, nullable=False, server_default='1')
created_at = Column(DateTime, nullable=False, default=datetime.now)
category = relationship('ProductCategory', lazy='joined')
class Transaction(Base):
__tablename__ = 'transactions'
uid = Column(Integer, primary_key=True)
event_uid = Column(Integer, ForeignKey('events.uid'), nullable=False)
created_at = Column(DateTime, nullable=False, default=datetime.now)
event = relationship('Event', lazy='joined')
orders = relationship('Order', lazy='joined')
class Order(Base):
__tablename__ = 'orders'
uid = Column(Integer, primary_key=True)
product_uid = Column(Integer, ForeignKey('products.uid'), nullable=False)
quantity = Column(Integer, nullable=False)
transaction_uid = Column(Integer, ForeignKey('transactions.uid'), nullable=False)
product = relationship('Product', lazy='joined')
transaction = relationship('Transaction', lazy='joined')

View File

@ -0,0 +1,15 @@
import logging
import logging.config
from pos.config import APP_NAME
root = logging.getLogger(APP_NAME)
def init_logging(config):
logging.config.dictConfig(config)
def get_logger(name):
return root.getChild(name)

View File

@ -0,0 +1,17 @@
from aiohttp.web import json_response
async def playlist_add(request):
uuid = request.match_info['uuid']
try:
request.app['playlist'].put(uuid)
except DuplicateTrackError:
return json_response({
'err': 'duplicate',
'msg': 'The track is already present in the playlist.'
}, status=400)
else:
return json_response({}, status=200)

View File

@ -0,0 +1,9 @@
def setup_routes(app):
app.router.add_route('GET','/', info)
app.router.add_route('GET', '/login', info)
app.router.add_route('POST', '/login', info)
async def info(request):
return json_response({
'this is': 'working'
})

396
cli.py 100755
View File

@ -0,0 +1,396 @@
#!/usr/bin/env python3
import click
from tabulate import tabulate
from datetime import datetime
from pos.config import Config
from pos.logging import init_logging, get_logger
from pos.database import Database, User, Event, ProductCategory, Product,\
Transaction
config = Config()
conf_db = config.core['DATABASE']
init_logging(config.logging)
log = get_logger('cli')
db = Database(**conf_db)
def get_total(transaction):
return sum(o.product.price * o.quantity
for o in transaction.orders)
def get_income(event):
if event.transactions:
return sum(get_total(t) for t in event.transactions)
else:
return 0
@click.group()
def cli():
pass
@cli.command('initdb')
def initdb():
with db.get_session() as session:
categories = session.query(ProductCategory).count()
if not categories:
session.add(ProductCategory(name='Default'))
@cli.group('user')
def user():
pass
def tabulate_users(users):
tab = [["UID", "Username", "Enabled", "Created at"]]
for u in users:
tab.append([u.uid, u.username, u.is_active, u.created_at])
return tabulate(tab, headers='firstrow')
@user.command('add')
@click.argument('username')
@click.argument('password')
def user_add(username, password):
user = User(username=username, password=password)
with db.get_session() as session:
session.add(user)
print("User successfully added.")
@user.command('list')
def user_list():
with db.get_session() as session:
users = session.query(User).all()
if users:
print(tabulate_users(users))
else:
print("No users found.")
@user.command('set')
@click.option('-p', '--password')
@click.argument('user_uid', type=click.INT)
def user_set(user_uid, password):
with db.get_session() as session:
user = session.query(User).get(user_uid)
if not user:
print("No user found with id #{}.".format(user_uid))
return
if password:
user.password = password
with db.get_session() as session:
session.add(user)
print("User successfully edited.")
@cli.group('event')
def event():
pass
def tabulate_events(events):
tab = [["UID", "Name", "Starts at", "Ends at", "Income", "Created at"]]
for e in events:
tab.append([e.uid, e.name, e.starts_at,
e.ends_at, get_income(e), e.created_at])
return tabulate(tab, headers='firstrow')
def get_overlapping_events(session, starts_at, ends_at):
events = session.query(Event)
if ends_at is None:
events = events.filter(Event.starts_at <= starts_at)
else:
events = events.filter(Event.ends_at >= starts_at)\
.filter(Event.starts_at <= ends_at)
return events.all()
@event.command('add')
@click.argument('name')
@click.argument('starts_at')
@click.argument('ends_at', required=False)
def event_add(name, starts_at, ends_at):
starts_at = datetime.strptime(starts_at, "%Y-%m-%d %H:%M")
ends_at = (datetime.strptime(ends_at, "%Y-%m-%d %H:%M")
if ends_at else None)
if ends_at and starts_at >= ends_at:
print("Could now add event: specified start date ({}) "
"is past the end date ({})."
.format(starts_at.strftime("%Y-%m-%d %H:%M"),
ends_at.strftime("%Y-%m-%d %H:%M")))
return
with db.get_session() as session:
events = get_overlapping_events(session, starts_at, ends_at)
if events:
print("Could not add event: another event is overlapping the date "
"range you have specified.")
print(tabulate_events(events))
return
with db.get_session() as session:
event = Event(name=name, starts_at=starts_at, ends_at=ends_at)
session.add(event)
session.flush()
print("Event successfully added.")
print(tabulate_events([event]))
@event.command('list')
def event_list():
with db.get_session() as session:
events = session.query(Event).all()
if events:
print(tabulate_events(events))
else:
print("No events found.")
@event.command('set')
@click.option('-n', '--name')
@click.option('-s', '--start')
@click.option('-e', '--end')
@click.argument('event_uid')
def event_set(event_uid, name, start, end):
with db.get_session() as session:
event = session.query(Event).get(event_uid)
if not event:
print("No event found with id #{}.".format(event_uid))
return
if name:
event.name = name
if start:
starts_at = datetime.strptime(start, "%Y-%m-%d %H:%M")
if starts_at >= event.ends_at:
print("Could not edit event #{}: specified start date ({}) "
"is past the end date ({})"
.format(event.uid,
starts_at.strftime("%Y-%m-%d %H:%M"),
event.ends_at.strftime("%Y-%m-%d %H:%M")))
return
event.starts_at = starts_at
if end:
if end == 'none':
event.ends_at = None
elif end == 'now':
event.ends_at = datetime.now()
else:
ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M")
if ends_at <= event.starts_at:
print("Could not edit event #{}: specified end date ({}) "
"is before the start date ({})"
.format(event.uid,
ends_at.strftime("%Y-%m-%d %H:%M"),
event.starts_at.strftime("%Y-%m-%d %H:%M")))
return
event.ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M")
if event.starts_at and event.ends_at:
with db.get_session() as session:
events = get_overlapping_events(session,
event.starts_at, event.ends_at)
if events:
print("Could not edit event: another event is overlapping the "
"date range you have specified.")
print(tabulate_events(events))
return
if any([name, start, end]):
with db.get_session() as session:
session.add(event)
session.flush()
print("Event successfully edited.")
print(tabulate_events([event]))
@cli.group('category')
def category():
pass
@category.command('add')
@click.option('-s', '--sort', type=click.INT)
@click.argument('name')
def category_add(name, sort):
category = ProductCategory(name=name)
if sort:
category.sort = sort
with db.get_session() as session:
session.add(category)
print("Category successfully added.")
def tabulate_categories(categories):
tab = [["UID", "Name", "Sort", "Created at"]]
for c in categories:
tab.append([c.uid, c.name, c.sort, c.created_at])
return tabulate(tab, headers='firstrow')
@category.command('list')
@click.option('-s', '--sort', is_flag=True)
def category_list(sort):
with db.get_session() as session:
categories = session.query(ProductCategory)
if sort:
categories = categories.order_by(ProductCategory.sort.asc())
categories = categories.all()
if categories:
print(tabulate_categories(categories))
else:
print("No categories found.")
@cli.group('product')
def product():
pass
def tabulate_products(products):
tab = [["UID", "Name", "Price", "Sort", "Category",
"Enabled", "Created at"]]
for p in products:
tab.append([p.uid, p.name, p.price, p.sort, p.category.name,
p.is_active, p.created_at])
return tabulate(tab, headers='firstrow')
@product.command('add')
@click.argument('name')
@click.argument('price', type=float)
@click.option('-s', '--sort', type=click.INT)
@click.option('-c', '--category', type=click.INT)
def product_add(name, price, sort, category):
price = int(price * 100)
product = Product(name=name, price=price)
if sort:
product.sort = sort
if category:
product.category_uid = category
else:
product.category_uid = 1
with db.get_session() as session:
session.add(product)
print("Product successfully added.")
@product.command('list')
@click.option('-s', '--sort', is_flag=True)
def product_list(sort):
with db.get_session() as session:
products = session.query(Product)
if sort:
products = products.order_by(Product.sort.asc())
products = products.all()
if products:
print(tabulate_products(products))
else:
print("No products found.")
@product.command('set')
@click.option('-n', '--name')
@click.option('-p', '--price', type=click.INT)
@click.option('-s', '--sort', type=click.INT)
@click.option('-c', '--category', type=click.INT)
@click.argument('product_uid')
def product_set(product_uid, name, price, sort, category):
with db.get_session() as session:
product = session.query(Product).get(product_uid)
if not product:
print("No product found with id #{}.".format(product_uid))
return
if name:
product.name = name
if price:
product.price = price
if sort:
product.sort = sort
if category:
product.category_uid = category
if any([name, price, sort, category]):
with db.get_session() as session:
session.add(product)
print("Event successfully edited.")
print(tabulate_products([product]))
@cli.group('transaction')
def transaction():
pass
def tabulate_orders(orders):
tab = [["Product", "Price", "Quantity", "Total"]]
for o in orders:
if o.quantity > 0:
tab.append([o.product.name, o.product.price, o.quantity,
o.product.price * o.quantity])
return tabulate(tab, headers='firstrow')
def print_transactions(transactions):
for t in transactions:
print("Listing transaction #{} ({}):".format(t.uid, t.created_at))
print(tabulate_orders(t.orders))
print("Total:", get_total(t))
print()
@transaction.command('list')
def transaction_list():
with db.get_session() as session:
transactions = session.query(Transaction).all()
if transactions:
print_transactions(transactions)
else:
print("No transactions found.")
if __name__ == '__main__':
cli()

10
requirements.txt 100644
View File

@ -0,0 +1,10 @@
aiohttp
click>=6.0
SQLAlchemy>=1.1.0
sqlalchemy_utils>=0.32.00
pymysql
babel
passlib
tabulate
PyYAML
python-escpos

33
web.py 100755
View File

@ -0,0 +1,33 @@
#! /usr/bin/env python3
from pos.config import Config
from pos.logging import init_logging, get_logger
from pos.database import Database
from pos.routes import setup_routes
import asyncio
from aiohttp import web
log = get_logger('web')
def setup_app(loop, config):
app = web.Application(loop=loop)
app['config'] = config
setup_routes(app)
return app
if __name__ == '__main__':
config = Config()
init_logging(config.logging)
conf_db = config.core['DATABASE']
db = Database(**conf_db)
loop = asyncio.get_event_loop()
app = setup_app(loop, config)
web.run_app(app,
host=config.core.get('GENERAL', 'Address'),
port=config.core.getint('GENERAL', 'Port'))