#!/usr/bin/env python3 import click from tabulate import tabulate from datetime import datetime from pos.config import Config from pos.logging import init_logging, get_logger from pos.database import Database, User, Event, Product, Transaction config = Config() conf_db = config.core['DATABASE'] init_logging(config.logging) log = get_logger('cli') db = Database(**conf_db) def get_total(transaction): return sum(o.product.price * o.quantity for o in transaction.orders) def get_income(event): return sum(get_total(t) for t in event.transactions) @click.group() def cli(): pass @cli.group('user') def user(): pass def tabulate_users(users): tab = [["UID", "Username", "Enabled", "Created at"]] for u in users: tab.append([u.uid, u.username, u.is_active, u.created_at]) return tabulate(tab, headers='firstrow') @user.command('add') @click.argument('username') @click.argument('password') def user_add(username, password): user = User(username=username, password=password) with db.get_session() as session: session.add(user) print("User successfully added.") @user.command('list') def user_list(): with db.get_session() as session: users = session.query(User).all() if users: print(tabulate_users(users)) else: print("No users found.") @user.command('set') @click.option('-p', '--password') @click.argument('user_uid', type=click.INT) def user_set(user_uid, password): with db.get_session() as session: user = session.query(User).get(user_uid) if not user: print("No user found with id #{}.".format(user_uid)) return if password: user.password = password with db.get_session() as session: session.add(user) print("User successfully edited.") @cli.group('event') def event(): pass def tabulate_events(events): tab = [["UID", "Name", "Starts at", "Ends at", "Income", "Created at"]] for e in events: tab.append([e.uid, e.name, e.starts_at, e.ends_at, get_income(e), e.created_at]) return tabulate(tab, headers='firstrow') def get_overlapping_events(session, starts_at, ends_at): events = session.query(Event) if ends_at is None: events = events.filter(Event.starts_at <= starts_at) else: events = events.filter(Event.ends_at >= starts_at)\ .filter(Event.starts_at <= ends_at) return events.all() @event.command('add') @click.argument('name') @click.argument('starts_at') @click.argument('ends_at', required=False) def event_add(name, starts_at, ends_at): starts_at = datetime.strptime(starts_at, "%Y-%m-%d %H:%M") ends_at = (datetime.strptime(ends_at, "%Y-%m-%d %H:%M") if ends_at else None) if ends_at and starts_at >= ends_at: print("Could now add event: specified start date ({}) " "is past the end date ({})." .format(starts_at.strftime("%Y-%m-%d %H:%M"), ends_at.strftime("%Y-%m-%d %H:%M"))) return with db.get_session() as session: events = get_overlapping_events(session, starts_at, ends_at) if events: print("Could not add event: another event is overlapping the date " "range you have specified.") print(tabulate_events(events)) return with db.get_session() as session: event = Event(name=name, starts_at=starts_at, ends_at=ends_at) session.add(event) session.flush() print("Event successfully added.") print(tabulate_events([event])) @event.command('list') def event_list(): with db.get_session() as session: events = session.query(Event).all() if events: print(tabulate_events(events)) else: print("No events found.") @event.command('set') @click.option('-n', '--name') @click.option('-s', '--start') @click.option('-e', '--end') @click.argument('event_uid') def event_set(event_uid, name, start, end): with db.get_session() as session: event = session.query(Event).get(event_uid) if not event: print("No event found with id #{}.".format(event_uid)) return if name: event.name = name if start: starts_at = datetime.strptime(start, "%Y-%m-%d %H:%M") if starts_at >= event.ends_at: print("Could not edit event #{}: specified start date ({}) " "is past the end date ({})" .format(event.uid, starts_at.strftime("%Y-%m-%d %H:%M"), event.ends_at.strftime("%Y-%m-%d %H:%M"))) return event.starts_at = starts_at if end: if end == 'none': event.ends_at = None elif end == 'now': event.ends_at = datetime.now() else: ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M") if ends_at <= event.starts_at: print("Could not edit event #{}: specified end date ({}) " "is before the start date ({})" .format(event.uid, ends_at.strftime("%Y-%m-%d %H:%M"), event.starts_at.strftime("%Y-%m-%d %H:%M"))) return event.ends_at = datetime.strptime(end, "%Y-%m-%d %H:%M") if event.starts_at and event.ends_at: with db.get_session() as session: events = get_overlapping_events(session, event.starts_at, event.ends_at) if events: print("Could not edit event: another event is overlapping the " "date range you have specified.") print(tabulate_events(events)) return if any([name, start, end]): with db.get_session() as session: session.add(event) session.flush() print("Event successfully edited.") print(tabulate_events([event])) @cli.group('product') def product(): pass def tabulate_products(products): tab = [["UID", "Name", "Price", "Sort", "Enabled", "Created at"]] for p in products: tab.append([p.uid, p.name, p.price, p.sort, p.is_active, p.created_at]) return tabulate(tab, headers='firstrow') @product.command('add') @click.argument('name') @click.argument('price') @click.option('-s', '--sort', type=click.INT) def product_add(name, price, sort): product = Product(name=name, price=price) if sort: product.sort = sort with db.get_session() as session: session.add(product) print("Product successfully added.") @product.command('list') @click.option('-s', '--sort', is_flag=True) def product_list(sort): with db.get_session() as session: products = session.query(Product) if sort: products = products.order_by(Product.sort.asc()) products = products.all() if products: print(tabulate_products(products)) else: print("No products found.") @product.command('set') @click.option('-n', '--name') @click.option('-p', '--price', type=click.INT) @click.option('-s', '--sort', type=click.INT) @click.argument('product_uid') def product_set(product_uid, name, price, sort): with db.get_session() as session: product = session.query(Product).get(product_uid) if not product: print("No product found with id #{}.".format(product_uid)) return if name: product.name = name if price: product.price = price if sort: product.sort = sort if any([name, price, sort]): with db.get_session() as session: session.add(product) print("Event successfully edited.") print(tabulate_products([product])) @cli.group('transaction') def transaction(): pass def tabulate_orders(orders): tab = [["Product", "Price", "Quantity", "Total"]] for o in orders: if o.quantity > 0: tab.append([o.product.name, o.product.price, o.quantity, o.product.price * o.quantity]) return tabulate(tab, headers='firstrow') def print_transactions(transactions): for t in transactions: print("Listing transaction #{} ({}):".format(t.uid, t.created_at)) print(tabulate_orders(t.orders)) print("Total:", get_total(t)) print() @transaction.command('list') def transaction_list(): with db.get_session() as session: transactions = session.query(Transaction).all() if transactions: print_transactions(transactions) else: print("No transactions found.") if __name__ == '__main__': cli()