397 lines
10 KiB
Python
Executable File
397 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import click
|
|
from tabulate import tabulate
|
|
from datetime import datetime
|
|
|
|
from pos.config import Config
|
|
from pos.logging import init_logging, get_logger
|
|
from pos.database import Database, User, Event, ProductCategory, Product,\
|
|
Transaction
|
|
|
|
config = Config()
|
|
conf_db = config.core['DATABASE']
|
|
|
|
init_logging(config.logging)
|
|
log = get_logger('cli')
|
|
|
|
db = Database(**conf_db)
|
|
|
|
|
|
def get_total(transaction):
|
|
return sum(o.product.price * o.quantity
|
|
for o in transaction.orders)
|
|
|
|
|
|
def get_income(event):
|
|
if event.transactions:
|
|
return sum(get_total(t) for t in event.transactions)
|
|
else:
|
|
return 0
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
pass
|
|
|
|
|
|
@cli.command('initdb')
|
|
def initdb():
|
|
with db.get_session() as session:
|
|
categories = session.query(ProductCategory).count()
|
|
if not categories:
|
|
session.add(ProductCategory(name='Default'))
|
|
|
|
|
|
@cli.group('user')
|
|
def user():
|
|
pass
|
|
|
|
|
|
def tabulate_users(users):
|
|
tab = [["UID", "Username", "Enabled", "Created at"]]
|
|
for u in users:
|
|
tab.append([u.uid, u.username, u.is_active, u.created_at])
|
|
return tabulate(tab, headers='firstrow')
|
|
|
|
|
|
@user.command('add')
|
|
@click.argument('username')
|
|
@click.argument('password')
|
|
def user_add(username, password):
|
|
user = User(username=username, password=password)
|
|
with db.get_session() as session:
|
|
session.add(user)
|
|
print("User successfully added.")
|
|
|
|
|
|
@user.command('list')
|
|
def user_list():
|
|
with db.get_session() as session:
|
|
users = session.query(User).all()
|
|
|
|
if users:
|
|
print(tabulate_users(users))
|
|
else:
|
|
print("No users found.")
|
|
|
|
|
|
@user.command('set')
|
|
@click.option('-p', '--password')
|
|
@click.argument('user_uid', type=click.INT)
|
|
def user_set(user_uid, password):
|
|
with db.get_session() as session:
|
|
user = session.query(User).get(user_uid)
|
|
|
|
if not user:
|
|
print("No user found with id #{}.".format(user_uid))
|
|
return
|
|
|
|
if password:
|
|
user.password = password
|
|
|
|
with db.get_session() as session:
|
|
session.add(user)
|
|
|
|
print("User successfully edited.")
|
|
|
|
|
|
@cli.group('event')
|
|
def event():
|
|
pass
|
|
|
|
|
|
def tabulate_events(events):
|
|
tab = [["UID", "Name", "Starts at", "Ends at", "Income", "Created at"]]
|
|
|
|
for e in events:
|
|
tab.append([e.uid, e.name, e.starts_at,
|
|
e.ends_at, get_income(e), e.created_at])
|
|
|
|
return tabulate(tab, headers='firstrow')
|
|
|
|
|
|
def get_overlapping_events(session, starts_at, ends_at):
|
|
events = session.query(Event)
|
|
|
|
if ends_at is None:
|
|
events = events.filter(Event.starts_at <= starts_at)
|
|
else:
|
|
events = events.filter(Event.ends_at >= starts_at)\
|
|
.filter(Event.starts_at <= ends_at)
|
|
|
|
return events.all()
|
|
|
|
|
|
@event.command('add')
|
|
@click.argument('name')
|
|
@click.argument('starts_at')
|
|
@click.argument('ends_at', required=False)
|
|
def event_add(name, starts_at, ends_at):
|
|
starts_at = datetime.strptime(starts_at, "%Y-%m-%d %H:%M")
|
|
ends_at = (datetime.strptime(ends_at, "%Y-%m-%d %H:%M")
|
|
if ends_at else None)
|
|
|
|
if ends_at and starts_at >= ends_at:
|
|
print("Could now add event: specified start date ({}) "
|
|
"is past the end date ({})."
|
|
.format(starts_at.strftime("%Y-%m-%d %H:%M"),
|
|
ends_at.strftime("%Y-%m-%d %H:%M")))
|
|
return
|
|
|
|
with db.get_session() as session:
|
|
events = get_overlapping_events(session, starts_at, ends_at)
|
|
if events:
|
|
print("Could not add event: another event is overlapping the date "
|
|
"range you have specified.")
|
|
print(tabulate_events(events))
|
|
return
|
|
|
|
with db.get_session() as session:
|
|
event = Event(name=name, starts_at=starts_at, ends_at=ends_at)
|
|
session.add(event)
|
|
session.flush()
|
|
print("Event successfully added.")
|
|
print(tabulate_events([event]))
|
|
|
|
|
|
@event.command('list')
|
|
def event_list():
|
|
with db.get_session() as session:
|
|
events = session.query(Event).all()
|
|
|
|
if events:
|
|
print(tabulate_events(events))
|
|
else:
|
|
print("No events found.")
|
|
|
|
|
|
@event.command('set')
|
|
@click.option('-n', '--name')
|
|
@click.option('-s', '--start')
|
|
@click.option('-e', '--end')
|
|
@click.argument('event_uid')
|
|
def event_set(event_uid, name, start, end):
|
|
with db.get_session() as session:
|
|
event = session.query(Event).get(event_uid)
|
|
|
|
if not event:
|
|
print("No event found with id #{}.".format(event_uid))
|
|
return
|
|
|
|
if name:
|
|
event.name = name
|
|
|
|
if start:
|
|
starts_at = datetime.strptime(start, "%Y-%m-%d %H:%M")
|
|
|
|
if starts_at >= event.ends_at:
|
|
print("Could not edit event #{}: specified start date ({}) "
|
|
"is past the end date ({})"
|
|
.format(event.uid,
|
|
starts_at.strftime("%Y-%m-%d %H:%M"),
|
|
event.ends_at.strftime("%Y-%m-%d %H:%M")))
|
|
return
|
|
|
|
event.starts_at = starts_at
|
|
|
|
if end:
|
|
if end == 'none':
|
|
event.ends_at = None
|
|
elif end == 'now':
|
|
event.ends_at = datetime.now()
|
|
else:
|
|
ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M")
|
|
|
|
if ends_at <= event.starts_at:
|
|
print("Could not edit event #{}: specified end date ({}) "
|
|
"is before the start date ({})"
|
|
.format(event.uid,
|
|
ends_at.strftime("%Y-%m-%d %H:%M"),
|
|
event.starts_at.strftime("%Y-%m-%d %H:%M")))
|
|
return
|
|
|
|
event.ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M")
|
|
|
|
if event.starts_at and event.ends_at:
|
|
with db.get_session() as session:
|
|
events = get_overlapping_events(session,
|
|
event.starts_at, event.ends_at)
|
|
if events:
|
|
print("Could not edit event: another event is overlapping the "
|
|
"date range you have specified.")
|
|
print(tabulate_events(events))
|
|
return
|
|
|
|
if any([name, start, end]):
|
|
with db.get_session() as session:
|
|
session.add(event)
|
|
session.flush()
|
|
print("Event successfully edited.")
|
|
print(tabulate_events([event]))
|
|
|
|
|
|
@cli.group('category')
|
|
def category():
|
|
pass
|
|
|
|
|
|
@category.command('add')
|
|
@click.option('-s', '--sort', type=click.INT)
|
|
@click.argument('name')
|
|
def category_add(name, sort):
|
|
category = ProductCategory(name=name)
|
|
|
|
if sort:
|
|
category.sort = sort
|
|
|
|
with db.get_session() as session:
|
|
session.add(category)
|
|
print("Category successfully added.")
|
|
|
|
|
|
def tabulate_categories(categories):
|
|
tab = [["UID", "Name", "Sort", "Created at"]]
|
|
for c in categories:
|
|
tab.append([c.uid, c.name, c.sort, c.created_at])
|
|
return tabulate(tab, headers='firstrow')
|
|
|
|
|
|
@category.command('list')
|
|
@click.option('-s', '--sort', is_flag=True)
|
|
def category_list(sort):
|
|
with db.get_session() as session:
|
|
categories = session.query(ProductCategory)
|
|
|
|
if sort:
|
|
categories = categories.order_by(ProductCategory.sort.asc())
|
|
|
|
categories = categories.all()
|
|
|
|
if categories:
|
|
print(tabulate_categories(categories))
|
|
else:
|
|
print("No categories found.")
|
|
|
|
|
|
@cli.group('product')
|
|
def product():
|
|
pass
|
|
|
|
|
|
def tabulate_products(products):
|
|
tab = [["UID", "Name", "Price", "Sort", "Category",
|
|
"Enabled", "Created at"]]
|
|
for p in products:
|
|
tab.append([p.uid, p.name, p.price, p.sort, p.category.name,
|
|
p.is_active, p.created_at])
|
|
return tabulate(tab, headers='firstrow')
|
|
|
|
|
|
@product.command('add')
|
|
@click.argument('name')
|
|
@click.argument('price', type=float)
|
|
@click.option('-s', '--sort', type=click.INT)
|
|
@click.option('-c', '--category', type=click.INT)
|
|
def product_add(name, price, sort, category):
|
|
price = int(price * 100)
|
|
product = Product(name=name, price=price)
|
|
|
|
if sort:
|
|
product.sort = sort
|
|
|
|
if category:
|
|
product.category_uid = category
|
|
else:
|
|
product.category_uid = 1
|
|
|
|
with db.get_session() as session:
|
|
session.add(product)
|
|
print("Product successfully added.")
|
|
|
|
|
|
@product.command('list')
|
|
@click.option('-s', '--sort', is_flag=True)
|
|
def product_list(sort):
|
|
with db.get_session() as session:
|
|
products = session.query(Product)
|
|
|
|
if sort:
|
|
products = products.order_by(Product.sort.asc())
|
|
|
|
products = products.all()
|
|
|
|
if products:
|
|
print(tabulate_products(products))
|
|
else:
|
|
print("No products found.")
|
|
|
|
|
|
@product.command('set')
|
|
@click.option('-n', '--name')
|
|
@click.option('-p', '--price', type=click.INT)
|
|
@click.option('-s', '--sort', type=click.INT)
|
|
@click.option('-c', '--category', type=click.INT)
|
|
@click.argument('product_uid')
|
|
def product_set(product_uid, name, price, sort, category):
|
|
with db.get_session() as session:
|
|
product = session.query(Product).get(product_uid)
|
|
|
|
if not product:
|
|
print("No product found with id #{}.".format(product_uid))
|
|
return
|
|
|
|
if name:
|
|
product.name = name
|
|
|
|
if price:
|
|
product.price = price
|
|
|
|
if sort:
|
|
product.sort = sort
|
|
|
|
if category:
|
|
product.category_uid = category
|
|
|
|
if any([name, price, sort, category]):
|
|
with db.get_session() as session:
|
|
session.add(product)
|
|
print("Event successfully edited.")
|
|
print(tabulate_products([product]))
|
|
|
|
|
|
@cli.group('transaction')
|
|
def transaction():
|
|
pass
|
|
|
|
|
|
def tabulate_orders(orders):
|
|
tab = [["Product", "Price", "Quantity", "Total"]]
|
|
for o in orders:
|
|
if o.quantity > 0:
|
|
tab.append([o.product.name, o.product.price, o.quantity,
|
|
o.product.price * o.quantity])
|
|
return tabulate(tab, headers='firstrow')
|
|
|
|
|
|
def print_transactions(transactions):
|
|
for t in transactions:
|
|
print("Listing transaction #{} ({}):".format(t.uid, t.created_at))
|
|
print(tabulate_orders(t.orders))
|
|
print("Total:", get_total(t))
|
|
print()
|
|
|
|
|
|
@transaction.command('list')
|
|
def transaction_list():
|
|
with db.get_session() as session:
|
|
transactions = session.query(Transaction).all()
|
|
|
|
if transactions:
|
|
print_transactions(transactions)
|
|
else:
|
|
print("No transactions found.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli()
|