Files
nouri-App/nouri/main.py
T
2026-04-12 10:36:13 +02:00

725 lines
22 KiB
Python

from __future__ import annotations
import os
import uuid
from collections import defaultdict
from datetime import date, datetime, timedelta
from pathlib import Path
from flask import (
Blueprint,
current_app,
flash,
g,
redirect,
render_template,
request,
url_for,
)
from werkzeug.utils import secure_filename
from .auth import login_required
from .constants import (
AVAILABILITY_LABELS,
CATEGORIES,
ITEM_KIND_LABELS,
ITEM_KIND_SINGULAR_LABELS,
)
from .db import get_db
main_bp = Blueprint("main", __name__)
ALLOWED_IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
def get_dayparts() -> list:
return get_db().execute("SELECT * FROM dayparts ORDER BY sort_order").fetchall()
def parse_week_start(raw: str | None) -> date:
if raw:
try:
parsed = datetime.strptime(raw, "%Y-%m-%d").date()
return parsed - timedelta(days=parsed.weekday())
except ValueError:
pass
today = date.today()
return today - timedelta(days=today.weekday())
def allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_IMAGE_EXTENSIONS
def save_photo(upload, current_filename: str | None = None) -> str | None:
if not upload or not upload.filename:
return current_filename
if not allowed_file(upload.filename):
raise ValueError("Bitte ein Bild als PNG, JPG, GIF oder WEBP hochladen.")
original_name = secure_filename(upload.filename)
extension = original_name.rsplit(".", 1)[1].lower()
filename = f"{uuid.uuid4().hex}.{extension}"
destination = Path(current_app.config["UPLOAD_FOLDER"]) / filename
upload.save(destination)
if current_filename:
old_path = Path(current_app.config["UPLOAD_FOLDER"]) / current_filename
if old_path.exists():
old_path.unlink()
return filename
def get_item(item_id: int):
item = get_db().execute(
"""
SELECT *
FROM items
WHERE id = ?
""",
(item_id,),
).fetchone()
if item is None:
raise ValueError("Der Eintrag wurde nicht gefunden.")
return item
def get_item_daypart_ids(item_id: int) -> list[int]:
rows = get_db().execute(
"SELECT daypart_id FROM item_dayparts WHERE item_id = ?",
(item_id,),
).fetchall()
return [row["daypart_id"] for row in rows]
def get_meal_component_ids(meal_id: int) -> list[int]:
rows = get_db().execute(
"SELECT food_item_id FROM meal_components WHERE meal_item_id = ?",
(meal_id,),
).fetchall()
return [row["food_item_id"] for row in rows]
def attach_dayparts(items: list) -> list[dict]:
if not items:
return []
database = get_db()
ids = [item["id"] for item in items]
placeholders = ",".join("?" for _ in ids)
rows = database.execute(
f"""
SELECT item_dayparts.item_id, dayparts.name
FROM item_dayparts
JOIN dayparts ON dayparts.id = item_dayparts.daypart_id
WHERE item_dayparts.item_id IN ({placeholders})
ORDER BY dayparts.sort_order
""",
ids,
).fetchall()
grouped = defaultdict(list)
for row in rows:
grouped[row["item_id"]].append(row["name"])
enriched = []
for item in items:
entry = dict(item)
entry["dayparts"] = grouped.get(item["id"], [])
enriched.append(entry)
return enriched
def attach_components(items: list[dict]) -> list[dict]:
meal_ids = [item["id"] for item in items if item["kind"] == "meal"]
if not meal_ids:
return items
placeholders = ",".join("?" for _ in meal_ids)
rows = get_db().execute(
f"""
SELECT meal_components.meal_item_id, items.name
FROM meal_components
JOIN items ON items.id = meal_components.food_item_id
WHERE meal_components.meal_item_id IN ({placeholders})
ORDER BY LOWER(items.name)
""",
meal_ids,
).fetchall()
grouped = defaultdict(list)
for row in rows:
grouped[row["meal_item_id"]].append(row["name"])
for item in items:
item["components"] = grouped.get(item["id"], [])
return items
def fetch_items(kind: str | None = None, availability: str | None = None, include_archived: bool = False):
database = get_db()
conditions = []
params = []
if kind:
conditions.append("kind = ?")
params.append(kind)
if availability:
conditions.append("availability_state = ?")
params.append(availability)
elif not include_archived:
conditions.append("availability_state != 'archived'")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
rows = database.execute(
f"""
SELECT items.*,
EXISTS(
SELECT 1
FROM shopping_entries
WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0
) AS is_on_shopping_list
FROM items
{where}
ORDER BY LOWER(name)
"""
, params).fetchall()
return attach_components(attach_dayparts(rows))
def fetch_food_options():
return fetch_items(kind="food", include_archived=False)
def add_to_shopping_list(item_id: int, user_id: int) -> bool:
database = get_db()
existing = database.execute(
"""
SELECT id FROM shopping_entries
WHERE item_id = ? AND is_checked = 0
""",
(item_id,),
).fetchone()
if existing:
return False
database.execute(
"""
INSERT INTO shopping_entries (item_id, added_by)
VALUES (?, ?)
""",
(item_id, user_id),
)
database.commit()
return True
def sync_item_dayparts(item_id: int, daypart_ids: list[int]) -> None:
database = get_db()
database.execute("DELETE FROM item_dayparts WHERE item_id = ?", (item_id,))
for daypart_id in daypart_ids:
database.execute(
"INSERT INTO item_dayparts (item_id, daypart_id) VALUES (?, ?)",
(item_id, daypart_id),
)
def sync_meal_components(meal_id: int, food_ids: list[int]) -> None:
database = get_db()
database.execute("DELETE FROM meal_components WHERE meal_item_id = ?", (meal_id,))
for food_id in food_ids:
database.execute(
"""
INSERT INTO meal_components (meal_item_id, food_item_id)
VALUES (?, ?)
""",
(meal_id, food_id),
)
def fetch_shopping_entries():
rows = get_db().execute(
"""
SELECT shopping_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
users.display_name,
users.username
FROM shopping_entries
JOIN items ON items.id = shopping_entries.item_id
LEFT JOIN users ON users.id = shopping_entries.added_by
WHERE shopping_entries.is_checked = 0
ORDER BY shopping_entries.added_at DESC
"""
).fetchall()
return rows
def fetch_archive_items():
return fetch_items(availability="archived", include_archived=True)
def planner_entries_for_week(week_start: date):
week_end = week_start + timedelta(days=6)
rows = get_db().execute(
"""
SELECT plan_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
dayparts.name AS daypart_name,
dayparts.slug AS daypart_slug
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
WHERE plan_date BETWEEN ? AND ?
ORDER BY plan_date, dayparts.sort_order, items.name
""",
(week_start.isoformat(), week_end.isoformat()),
).fetchall()
grouped = defaultdict(list)
for row in rows:
grouped[(row["plan_date"], row["daypart_id"])].append(row)
return grouped
@main_bp.get("/")
@login_required
def dashboard():
database = get_db()
today = date.today().isoformat()
home_count = database.execute(
"SELECT COUNT(*) AS count FROM items WHERE availability_state = 'home'"
).fetchone()["count"]
shopping_count = database.execute(
"SELECT COUNT(*) AS count FROM shopping_entries WHERE is_checked = 0"
).fetchone()["count"]
archive_count = database.execute(
"SELECT COUNT(*) AS count FROM items WHERE availability_state = 'archived'"
).fetchone()["count"]
today_entries = database.execute(
"""
SELECT plan_entries.id,
items.name AS item_name,
items.kind AS item_kind,
dayparts.name AS daypart_name
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
WHERE plan_entries.plan_date = ?
ORDER BY dayparts.sort_order, items.name
""",
(today,),
).fetchall()
home_items = fetch_items(availability="home")
return render_template(
"dashboard.html",
home_count=home_count,
shopping_count=shopping_count,
archive_count=archive_count,
today_entries=today_entries,
home_items=home_items[:8],
today=today,
)
@main_bp.route("/items/<kind>")
@login_required
def item_list(kind: str):
if kind not in ITEM_KIND_LABELS:
return redirect(url_for("main.dashboard"))
items = fetch_items(kind=kind)
return render_template(
"items/list.html",
kind=kind,
items=items,
availability_labels=AVAILABILITY_LABELS,
)
@main_bp.route("/items/<kind>/new", methods=("GET", "POST"))
@login_required
def item_create(kind: str):
if kind not in ITEM_KIND_LABELS:
return redirect(url_for("main.dashboard"))
database = get_db()
dayparts = get_dayparts()
foods = fetch_food_options()
form_data = {
"name": "",
"category": "",
"note": "",
"daypart_ids": [],
"component_ids": [],
}
if request.method == "POST":
name = request.form.get("name", "").strip()
category = request.form.get("category", "").strip()
note = request.form.get("note", "").strip()
daypart_ids = [int(value) for value in request.form.getlist("daypart_ids")]
component_ids = [int(value) for value in request.form.getlist("component_ids")]
form_data.update(
{
"name": name,
"category": category,
"note": note,
"daypart_ids": daypart_ids,
"component_ids": component_ids,
}
)
error = None
if not name:
error = "Bitte einen Namen eintragen."
elif kind == "meal" and not component_ids:
error = "Bitte mindestens ein Lebensmittel fuer die Mahlzeitenidee waehlen."
photo_filename = None
if error is None:
try:
photo_filename = save_photo(request.files.get("photo"))
except ValueError as exc:
error = str(exc)
if error is None:
cursor = database.execute(
"""
INSERT INTO items (kind, name, category, note, photo_filename, created_by, updated_by)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(kind, name, category, note, photo_filename, g.user["id"], g.user["id"]),
)
item_id = cursor.lastrowid
sync_item_dayparts(item_id, daypart_ids)
if kind == "meal":
sync_meal_components(item_id, component_ids)
database.commit()
flash(f"{ITEM_KIND_SINGULAR_LABELS[kind]} wurde angelegt.", "success")
return redirect(url_for("main.item_list", kind=kind))
flash(error, "error")
return render_template(
"items/form.html",
kind=kind,
item=None,
dayparts=dayparts,
foods=foods,
categories=CATEGORIES,
form_data=form_data,
)
@main_bp.route("/items/<int:item_id>/edit", methods=("GET", "POST"))
@login_required
def item_edit(item_id: int):
database = get_db()
item = get_item(item_id)
kind = item["kind"]
dayparts = get_dayparts()
foods = fetch_food_options()
form_data = {
"name": item["name"],
"category": item["category"] or "",
"note": item["note"] or "",
"daypart_ids": get_item_daypart_ids(item_id),
"component_ids": get_meal_component_ids(item_id) if kind == "meal" else [],
}
if request.method == "POST":
name = request.form.get("name", "").strip()
category = request.form.get("category", "").strip()
note = request.form.get("note", "").strip()
daypart_ids = [int(value) for value in request.form.getlist("daypart_ids")]
component_ids = [int(value) for value in request.form.getlist("component_ids")]
form_data.update(
{
"name": name,
"category": category,
"note": note,
"daypart_ids": daypart_ids,
"component_ids": component_ids,
}
)
error = None
if not name:
error = "Bitte einen Namen eintragen."
elif kind == "meal" and not component_ids:
error = "Bitte mindestens ein Lebensmittel fuer die Mahlzeitenidee waehlen."
photo_filename = item["photo_filename"]
if error is None:
try:
photo_filename = save_photo(request.files.get("photo"), current_filename=item["photo_filename"])
except ValueError as exc:
error = str(exc)
if error is None:
database.execute(
"""
UPDATE items
SET name = ?, category = ?, note = ?, photo_filename = ?, updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(name, category, note, photo_filename, g.user["id"], item_id),
)
sync_item_dayparts(item_id, daypart_ids)
if kind == "meal":
sync_meal_components(item_id, component_ids)
database.commit()
flash("Der Eintrag wurde aktualisiert.", "success")
return redirect(url_for("main.item_list", kind=kind))
flash(error, "error")
return render_template(
"items/form.html",
kind=kind,
item=item,
dayparts=dayparts,
foods=foods,
categories=CATEGORIES,
form_data=form_data,
)
@main_bp.post("/items/<int:item_id>/shopping")
@login_required
def item_add_to_shopping(item_id: int):
item = get_item(item_id)
added = add_to_shopping_list(item_id, g.user["id"])
if added:
flash(f"{item['name']} steht jetzt auf der Einkaufsliste.", "success")
else:
flash(f"{item['name']} ist bereits auf der Einkaufsliste.", "info")
return redirect(request.referrer or url_for("main.shopping_list"))
@main_bp.post("/items/<int:item_id>/set-home")
@login_required
def item_set_home(item_id: int):
item = get_item(item_id)
database = get_db()
database.execute(
"""
UPDATE items
SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item_id),
)
database.commit()
flash(f"{item['name']} ist jetzt unter Zuhause sichtbar.", "success")
return redirect(request.referrer or url_for("main.home_view"))
@main_bp.post("/items/<int:item_id>/archive")
@login_required
def item_archive(item_id: int):
item = get_item(item_id)
database = get_db()
database.execute(
"""
UPDATE items
SET availability_state = 'archived', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item_id),
)
database.commit()
flash(f"{item['name']} liegt jetzt im Archiv und bleibt spaeter leicht wiederfindbar.", "info")
return redirect(request.referrer or url_for("main.archive_view"))
@main_bp.post("/items/<int:item_id>/restore")
@login_required
def item_restore(item_id: int):
item = get_item(item_id)
database = get_db()
database.execute(
"""
UPDATE items
SET availability_state = 'idea', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item_id),
)
database.commit()
flash(f"{item['name']} ist wieder in der aktiven Liste.", "success")
return redirect(request.referrer or url_for("main.archive_view"))
@main_bp.route("/shopping", methods=("GET", "POST"))
@login_required
def shopping_list():
database = get_db()
if request.method == "POST":
selected_item_id = request.form.get("item_id", "").strip()
if not selected_item_id:
flash("Bitte zuerst etwas auswaehlen.", "error")
else:
item = get_item(int(selected_item_id))
added = add_to_shopping_list(item["id"], g.user["id"])
if added:
flash(f"{item['name']} wurde auf die Einkaufsliste gesetzt.", "success")
else:
flash(f"{item['name']} ist bereits auf der Einkaufsliste.", "info")
return redirect(url_for("main.shopping_list"))
entries = fetch_shopping_entries()
addable_items = database.execute(
"""
SELECT items.id, items.name, items.kind, items.availability_state
FROM items
WHERE items.availability_state != 'archived'
AND NOT EXISTS (
SELECT 1 FROM shopping_entries
WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0
)
ORDER BY CASE items.availability_state WHEN 'home' THEN 0 ELSE 1 END, LOWER(items.name)
"""
).fetchall()
return render_template("shopping/list.html", entries=entries, addable_items=addable_items)
@main_bp.post("/shopping/<int:entry_id>/check")
@login_required
def shopping_check(entry_id: int):
database = get_db()
entry = database.execute(
"SELECT * FROM shopping_entries WHERE id = ?",
(entry_id,),
).fetchone()
if entry is None:
flash("Der Einkaufseintrag wurde nicht gefunden.", "error")
return redirect(url_for("main.shopping_list"))
item = get_item(entry["item_id"])
database.execute(
"""
UPDATE shopping_entries
SET is_checked = 1, checked_at = CURRENT_TIMESTAMP, checked_by = ?
WHERE id = ?
""",
(g.user["id"], entry_id),
)
database.execute(
"""
UPDATE items
SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item["id"]),
)
database.commit()
flash(f"{item['name']} ist jetzt als Zuhause vorhanden markiert.", "success")
return redirect(url_for("main.shopping_list"))
@main_bp.post("/shopping/<int:entry_id>/remove")
@login_required
def shopping_remove(entry_id: int):
database = get_db()
database.execute("DELETE FROM shopping_entries WHERE id = ?", (entry_id,))
database.commit()
flash("Der Eintrag wurde von der Einkaufsliste entfernt.", "info")
return redirect(url_for("main.shopping_list"))
@main_bp.get("/home")
@login_required
def home_view():
items = fetch_items(availability="home")
grouped = defaultdict(list)
for item in items:
key = item["dayparts"][0] if item["dayparts"] else "Ohne feste Tageszeit"
grouped[key].append(item)
return render_template("home/list.html", grouped=grouped)
@main_bp.get("/archive")
@login_required
def archive_view():
items = fetch_archive_items()
return render_template("archive/list.html", items=items)
@main_bp.route("/planner", methods=("GET", "POST"))
@login_required
def planner():
database = get_db()
week_start = parse_week_start(request.values.get("week"))
if request.method == "POST":
try:
selected_date = datetime.strptime(request.form.get("plan_date", ""), "%Y-%m-%d").date()
except ValueError:
selected_date = None
item_id = request.form.get("item_id", "").strip()
daypart_id = request.form.get("daypart_id", "").strip()
note = request.form.get("note", "").strip()
error = None
if selected_date is None:
error = "Bitte einen gueltigen Tag auswaehlen."
elif not item_id:
error = "Bitte etwas fuer den Plan waehlen."
elif not daypart_id:
error = "Bitte eine Tageszeit waehlen."
if error is None:
database.execute(
"""
INSERT INTO plan_entries (plan_date, daypart_id, item_id, note, created_by)
VALUES (?, ?, ?, ?, ?)
""",
(selected_date.isoformat(), int(daypart_id), int(item_id), note, g.user["id"]),
)
database.commit()
flash("Der Eintrag wurde in den Wochenplan gelegt.", "success")
else:
flash(error, "error")
return redirect(url_for("main.planner", week=week_start.isoformat()))
days = [week_start + timedelta(days=index) for index in range(7)]
dayparts = get_dayparts()
entries = planner_entries_for_week(week_start)
selectable_items = database.execute(
"""
SELECT id, name, kind, availability_state
FROM items
WHERE availability_state != 'archived'
ORDER BY CASE availability_state WHEN 'home' THEN 0 ELSE 1 END, LOWER(name)
"""
).fetchall()
return render_template(
"planner/week.html",
week_start=week_start,
prev_week=week_start - timedelta(days=7),
next_week=week_start + timedelta(days=7),
days=days,
dayparts=dayparts,
entries=entries,
selectable_items=selectable_items,
)
@main_bp.post("/planner/<int:entry_id>/remove")
@login_required
def planner_remove(entry_id: int):
database = get_db()
week = request.args.get("week")
database.execute("DELETE FROM plan_entries WHERE id = ?", (entry_id,))
database.commit()
flash("Der Planeintrag wurde entfernt.", "info")
return redirect(url_for("main.planner", week=week))