first commit
This commit is contained in:
2
webapp/storage/__init__.py
Normal file
2
webapp/storage/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .orders import load_orders, add_order, mark_done
|
||||
__all__ = ["load_orders", "add_order", "mark_done"]
|
||||
BIN
webapp/storage/__pycache__/__init__.cpython-313.pyc
Normal file
BIN
webapp/storage/__pycache__/__init__.cpython-313.pyc
Normal file
Binary file not shown.
BIN
webapp/storage/__pycache__/orders.cpython-313.pyc
Normal file
BIN
webapp/storage/__pycache__/orders.cpython-313.pyc
Normal file
Binary file not shown.
BIN
webapp/storage/__pycache__/sql_store.cpython-313.pyc
Normal file
BIN
webapp/storage/__pycache__/sql_store.cpython-313.pyc
Normal file
Binary file not shown.
60
webapp/storage/orders.py
Normal file
60
webapp/storage/orders.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from __future__ import annotations
|
||||
import os, json, time
|
||||
from typing import List, Dict, Any
|
||||
from flask import current_app
|
||||
|
||||
FILE_NAME = "orders.json"
|
||||
|
||||
def _file_path() -> str:
|
||||
return os.path.join(current_app.root_path, "data", FILE_NAME)
|
||||
|
||||
def load_orders() -> List[Dict[str, Any]]:
|
||||
p = _file_path()
|
||||
if not os.path.isfile(p): return []
|
||||
try:
|
||||
with open(p, "r", encoding="utf-8") as f:
|
||||
return json.load(f) or []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _save_orders(orders: List[Dict[str, Any]]) -> None:
|
||||
p = _file_path()
|
||||
os.makedirs(os.path.dirname(p), exist_ok=True)
|
||||
with open(p, "w", encoding="utf-8") as f:
|
||||
json.dump(orders, f, ensure_ascii=False, indent=2)
|
||||
|
||||
def add_order(order: Dict[str, Any]) -> None:
|
||||
orders = load_orders()
|
||||
orders.append(order)
|
||||
_save_orders(orders)
|
||||
|
||||
def mark_done(order_id: str) -> bool:
|
||||
orders = load_orders()
|
||||
changed = False
|
||||
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
|
||||
for o in orders:
|
||||
if o.get("id") == order_id and o.get("status") != "done":
|
||||
o["status"] = "done"
|
||||
o["done_at"] = now
|
||||
changed = True
|
||||
if changed: _save_orders(orders)
|
||||
return changed
|
||||
|
||||
def update_order(order_id: str, patch: Dict[str, Any]) -> bool:
|
||||
orders = load_orders()
|
||||
ok = False
|
||||
for o in orders:
|
||||
if o.get("id") == order_id:
|
||||
o.update(patch)
|
||||
ok = True
|
||||
break
|
||||
if ok: _save_orders(orders)
|
||||
return ok
|
||||
|
||||
def cache_costs(order_id: str, costs: Dict[str, Any]) -> bool:
|
||||
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
|
||||
return update_order(order_id, {"cookbook": costs, "last_updated": now})
|
||||
|
||||
def cache_materials(order_id: str, mats: Dict[str, Any]) -> bool:
|
||||
now = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.gmtime())
|
||||
return update_order(order_id, {"materials": mats, "last_updated": now})
|
||||
Reference in New Issue
Block a user