migrate to table based tinydb

NOTE to migrate database install tinydb & run migrate.py

- added new functions to filter data with db queries
- in the storage module
This commit is contained in:
2025-08-27 20:39:04 +02:00
parent 0339da1668
commit 36f7f6d278
9 changed files with 2073 additions and 2335 deletions

View File

@@ -1,55 +1,117 @@
from __future__ import annotations
import os, json, time
from typing import List, Dict, Any
from flask import current_app
from flask import current_app, g
from tinydb import TinyDB, Query
from tinydb.storages import Storage
FILE_NAME = "orders.json"
def _file_path() -> str:
return os.path.join(current_app.root_path, "data", FILE_NAME)
PRODUCTION = 'dev'
class PrettyJSONStorage(Storage):
"""Custom TinyDB storage that pretty-prints JSON."""
def __init__(self, path):
self.path = path
def read(self):
try:
with open(self.path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return None
def write(self, data):
with open(self.path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_db() -> TinyDB:
"""
Return a persistent TinyDB instance per Flask app context.
Stored in `g` to avoid reopening the file multiple times.
"""
if "orders_db" not in g:
db_path = os.path.join(current_app.root_path, "data", FILE_NAME)
if PRODUCTION == "dev":
g.orders_db = TinyDB(db_path, storage=PrettyJSONStorage )
else:
g.orders_db = TinyDB(db_path)
return g.orders_db
def get_orders_table():
"""Return the orders table."""
return get_db().table("orders")
def load_orders() -> List[Dict[str, Any]]:
p = _file_path()
if not os.path.isfile(p): return []
try:
with open(p, "r", encoding="utf-8") as f:
return json.load(f) or []
except Exception:
return []
"""Return all orders as a list of dicts."""
table = get_orders_table()
return table.all()
def _save_orders(orders: List[Dict[str, Any]]) -> None:
p = _file_path()
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, "w", encoding="utf-8") as f:
json.dump(orders, f, ensure_ascii=False, indent=2)
def get_order_by_id(order_id: str) -> Dict[str, Any] | None:
"""Return a single order by its ID, or None if not found."""
table = get_orders_table()
Order = Query()
result = table.get(Order.id == order_id)
return result
def load_archived() -> List[Dict[str, Any]]:
"""Return all archived orders, sorted by archived/done/created date descending."""
table = get_orders_table()
Order = Query()
archived = table.search(Order.status == "archived")
def sort_key(o: Dict[str, Any]) -> str:
return o.get("archived_at") or o.get("done_at") or o.get("created_at") or ""
archived.sort(key=sort_key, reverse=True)
return archived
def load_pending() -> List[Dict[str, Any]]:
"""Return all orders with status 'open', sorted by created_at descending."""
table = get_orders_table()
Order = Query()
pending = table.search(Order.status == "open")
pending.sort(key=lambda o: o.get("created_at", ""), reverse=True)
return pending
def load_done() -> List[Dict[str, Any]]:
"""Return all orders with status 'done', sorted by done_at descending."""
table = get_orders_table()
Order = Query()
done = table.search(Order.status == "done")
done.sort(key=lambda o: o.get("done_at") or "", reverse=True)
return done
def add_order(order: Dict[str, Any]) -> None:
orders = load_orders()
orders.append(order)
_save_orders(orders)
table = get_orders_table()
table.insert(order)
def mark_done(order_id: str) -> bool:
orders = load_orders()
changed = False
table = get_orders_table()
Order = Query()
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
for o in orders:
if o.get("id") == order_id and o.get("status") != "done":
o["status"] = "done"
o["done_at"] = now
changed = True
if changed: _save_orders(orders)
return changed
updated = table.update({"status": "done", "done_at": now}, Order.id == order_id)
return bool(updated)
def mark_archived(order_id: str) -> bool:
"""Mark an order as archived and set archived_at timestamp."""
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
table = get_orders_table()
Order = Query()
updated = table.update({"status": "archived", "archived_at": now}, Order.id == order_id)
return bool(updated)
def update_order(order_id: str, patch: Dict[str, Any]) -> bool:
orders = load_orders()
ok = False
for o in orders:
if o.get("id") == order_id:
o.update(patch)
ok = True
break
if ok: _save_orders(orders)
return ok
table = get_orders_table()
Order = Query()
updated = table.update(patch, Order.id == order_id)
return bool(updated)
def cache_costs(order_id: str, costs: Dict[str, Any]) -> bool:
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
@@ -57,4 +119,4 @@ def cache_costs(order_id: str, costs: Dict[str, Any]) -> bool:
def cache_materials(order_id: str, mats: Dict[str, Any]) -> bool:
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
return update_order(order_id, {"materials": mats, "last_updated": now})
return update_order(order_id, {"materials": mats, "last_updated": now})