You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

396 lines
10 KiB

#!/usr/bin/env python3
import click
from tabulate import tabulate
from datetime import datetime
from pos.config import Config
from pos.logging import init_logging, get_logger
from pos.database import Database, User, Event, ProductCategory, Product,\
Transaction
config = Config()
conf_db = config.core['DATABASE']
init_logging(config.logging)
log = get_logger('cli')
db = Database(**conf_db)
def get_total(transaction):
return sum(o.product.price * o.quantity
for o in transaction.orders)
def get_income(event):
if event.transactions:
return sum(get_total(t) for t in event.transactions)
else:
return 0
@click.group()
def cli():
pass
@cli.command('initdb')
def initdb():
with db.get_session() as session:
categories = session.query(ProductCategory).count()
if not categories:
session.add(ProductCategory(name='Default'))
@cli.group('user')
def user():
pass
def tabulate_users(users):
tab = [["UID", "Username", "Enabled", "Created at"]]
for u in users:
tab.append([u.uid, u.username, u.is_active, u.created_at])
return tabulate(tab, headers='firstrow')
@user.command('add')
@click.argument('username')
@click.argument('password')
def user_add(username, password):
user = User(username=username, password=password)
with db.get_session() as session:
session.add(user)
print("User successfully added.")
@user.command('list')
def user_list():
with db.get_session() as session:
users = session.query(User).all()
if users:
print(tabulate_users(users))
else:
print("No users found.")
@user.command('set')
@click.option('-p', '--password')
@click.argument('user_uid', type=click.INT)
def user_set(user_uid, password):
with db.get_session() as session:
user = session.query(User).get(user_uid)
if not user:
print("No user found with id #{}.".format(user_uid))
return
if password:
user.password = password
with db.get_session() as session:
session.add(user)
print("User successfully edited.")
@cli.group('event')
def event():
pass
def tabulate_events(events):
tab = [["UID", "Name", "Starts at", "Ends at", "Income", "Created at"]]
for e in events:
tab.append([e.uid, e.name, e.starts_at,
e.ends_at, get_income(e), e.created_at])
return tabulate(tab, headers='firstrow')
def get_overlapping_events(session, starts_at, ends_at):
events = session.query(Event)
if ends_at is None:
events = events.filter(Event.starts_at <= starts_at)
else:
events = events.filter(Event.ends_at >= starts_at)\
.filter(Event.starts_at <= ends_at)
return events.all()
@event.command('add')
@click.argument('name')
@click.argument('starts_at')
@click.argument('ends_at', required=False)
def event_add(name, starts_at, ends_at):
starts_at = datetime.strptime(starts_at, "%Y-%m-%d %H:%M")
ends_at = (datetime.strptime(ends_at, "%Y-%m-%d %H:%M")
if ends_at else None)
if ends_at and starts_at >= ends_at:
print("Could now add event: specified start date ({}) "
"is past the end date ({})."
.format(starts_at.strftime("%Y-%m-%d %H:%M"),
ends_at.strftime("%Y-%m-%d %H:%M")))
return
with db.get_session() as session:
events = get_overlapping_events(session, starts_at, ends_at)
if events:
print("Could not add event: another event is overlapping the date "
"range you have specified.")
print(tabulate_events(events))
return
with db.get_session() as session:
event = Event(name=name, starts_at=starts_at, ends_at=ends_at)
session.add(event)
session.flush()
print("Event successfully added.")
print(tabulate_events([event]))
@event.command('list')
def event_list():
with db.get_session() as session:
events = session.query(Event).all()
if events:
print(tabulate_events(events))
else:
print("No events found.")
@event.command('set')
@click.option('-n', '--name')
@click.option('-s', '--start')
@click.option('-e', '--end')
@click.argument('event_uid')
def event_set(event_uid, name, start, end):
with db.get_session() as session:
event = session.query(Event).get(event_uid)
if not event:
print("No event found with id #{}.".format(event_uid))
return
if name:
event.name = name
if start:
starts_at = datetime.strptime(start, "%Y-%m-%d %H:%M")
if starts_at >= event.ends_at:
print("Could not edit event #{}: specified start date ({}) "
"is past the end date ({})"
.format(event.uid,
starts_at.strftime("%Y-%m-%d %H:%M"),
event.ends_at.strftime("%Y-%m-%d %H:%M")))
return
event.starts_at = starts_at
if end:
if end == 'none':
event.ends_at = None
elif end == 'now':
event.ends_at = datetime.now()
else:
ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M")
if ends_at <= event.starts_at:
print("Could not edit event #{}: specified end date ({}) "
"is before the start date ({})"
.format(event.uid,
ends_at.strftime("%Y-%m-%d %H:%M"),
event.starts_at.strftime("%Y-%m-%d %H:%M")))
return
event.ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M")
if event.starts_at and event.ends_at:
with db.get_session() as session:
events = get_overlapping_events(session,
event.starts_at, event.ends_at)
if events:
print("Could not edit event: another event is overlapping the "
"date range you have specified.")
print(tabulate_events(events))
return
if any([name, start, end]):
with db.get_session() as session:
session.add(event)
session.flush()
print("Event successfully edited.")
print(tabulate_events([event]))
@cli.group('category')
def category():
pass
@category.command('add')
@click.option('-s', '--sort', type=click.INT)
@click.argument('name')
def category_add(name, sort):
category = ProductCategory(name=name)
if sort:
category.sort = sort
with db.get_session() as session:
session.add(category)
print("Category successfully added.")
def tabulate_categories(categories):
tab = [["UID", "Name", "Sort", "Created at"]]
for c in categories:
tab.append([c.uid, c.name, c.sort, c.created_at])
return tabulate(tab, headers='firstrow')
@category.command('list')
@click.option('-s', '--sort', is_flag=True)
def category_list(sort):
with db.get_session() as session:
categories = session.query(ProductCategory)
if sort:
categories = categories.order_by(ProductCategory.sort.asc())
categories = categories.all()
if categories:
print(tabulate_categories(categories))
else:
print("No categories found.")
@cli.group('product')
def product():
pass
def tabulate_products(products):
tab = [["UID", "Name", "Price", "Sort", "Category",
"Enabled", "Created at"]]
for p in products:
tab.append([p.uid, p.name, p.price, p.sort, p.category.name,
p.is_active, p.created_at])
return tabulate(tab, headers='firstrow')
@product.command('add')
@click.argument('name')
@click.argument('price', type=float)
@click.option('-s', '--sort', type=click.INT)
@click.option('-c', '--category', type=click.INT)
def product_add(name, price, sort, category):
price = int(price * 100)
product = Product(name=name, price=price)
if sort:
product.sort = sort
if category:
product.category_uid = category
else:
product.category_uid = 1
with db.get_session() as session:
session.add(product)
print("Product successfully added.")
@product.command('list')
@click.option('-s', '--sort', is_flag=True)
def product_list(sort):
with db.get_session() as session:
products = session.query(Product)
if sort:
products = products.order_by(Product.sort.asc())
products = products.all()
if products:
print(tabulate_products(products))
else:
print("No products found.")
@product.command('set')
@click.option('-n', '--name')
@click.option('-p', '--price', type=click.INT)
@click.option('-s', '--sort', type=click.INT)
@click.option('-c', '--category', type=click.INT)
@click.argument('product_uid')
def product_set(product_uid, name, price, sort, category):
with db.get_session() as session:
product = session.query(Product).get(product_uid)
if not product:
print("No product found with id #{}.".format(product_uid))
return
if name:
product.name = name
if price:
product.price = price
if sort:
product.sort = sort
if category:
product.category_uid = category
if any([name, price, sort, category]):
with db.get_session() as session:
session.add(product)
print("Event successfully edited.")
print(tabulate_products([product]))
@cli.group('transaction')
def transaction():
pass
def tabulate_orders(orders):
tab = [["Product", "Price", "Quantity", "Total"]]
for o in orders:
if o.quantity > 0:
tab.append([o.product.name, o.product.price, o.quantity,
o.product.price * o.quantity])
return tabulate(tab, headers='firstrow')
def print_transactions(transactions):
for t in transactions:
print("Listing transaction #{} ({}):".format(t.uid, t.created_at))
print(tabulate_orders(t.orders))
print("Total:", get_total(t))
print()
@transaction.command('list')
def transaction_list():
with db.get_session() as session:
transactions = session.query(Transaction).all()
if transactions:
print_transactions(transactions)
else:
print("No transactions found.")
if __name__ == '__main__':
cli()