from __future__ import annotations import uuid from collections import defaultdict from datetime import date, datetime, timedelta from pathlib import Path from flask import ( Blueprint, current_app, flash, g, jsonify, redirect, render_template, request, url_for, ) from werkzeug.utils import secure_filename from .auth import login_required from .constants import ( AVAILABILITY_LABELS, CATEGORIES, ITEM_KIND_LABELS, ITEM_KIND_SINGULAR_LABELS, VISIBILITY_DESCRIPTIONS, VISIBILITY_LABELS, ) from .db import get_db main_bp = Blueprint("main", __name__) ALLOWED_IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"} ACTIVE_STATE_OPTIONS = [ ("", "Alle aktiven"), ("home", "Zuhause"), ("idea", "Merkliste"), ] KIND_FILTER_OPTIONS = [ ("", "Alles"), ("food", "Lebensmittel"), ("meal", "Mahlzeitenideen"), ] VISIBILITY_FILTER_OPTIONS = [ ("", "Alles Sichtbare"), ("shared", "Für alle"), ("personal", "Persönlich"), ] VISIBILITY_FORM_OPTIONS = [ ("shared", "Für alle"), ("personal", "Persönlich"), ] def get_dayparts() -> list: return get_db().execute("SELECT * FROM dayparts ORDER BY sort_order").fetchall() def parse_week_start(raw: str | None) -> date: if raw: try: parsed = datetime.strptime(raw, "%Y-%m-%d").date() return parsed - timedelta(days=parsed.weekday()) except ValueError: pass today = date.today() return today - timedelta(days=today.weekday()) def parse_plan_date(raw: str | None, fallback: date | None = None) -> date: if raw: try: return datetime.strptime(raw, "%Y-%m-%d").date() except ValueError: pass return fallback or date.today() def normalize_visibility(raw: str | None, default: str = "shared") -> str: if raw in VISIBILITY_LABELS: return raw return default def current_household_id() -> int: return int(g.user["household_id"]) def visible_clause(table_alias: str) -> str: return ( f"{table_alias}.household_id = ? " f"AND ({table_alias}.visibility = 'shared' OR {table_alias}.owner_user_id = ?)" ) def visible_params() -> list[int]: return [current_household_id(), int(g.user["id"])] def user_display_name(display_name: str | None, username: str | None) -> str: return display_name or username or "Haushalt" def describe_record(entry: dict) -> dict: owner_name = user_display_name(entry.get("owner_display_name"), entry.get("owner_username")) entry["owner_name"] = owner_name entry["is_personal"] = entry.get("visibility") == "personal" entry["is_shared"] = entry.get("visibility") == "shared" entry["is_mine"] = entry.get("owner_user_id") == g.user["id"] entry["visibility_label"] = VISIBILITY_LABELS.get(entry.get("visibility"), "Für alle") entry["visibility_description"] = VISIBILITY_DESCRIPTIONS.get(entry.get("visibility"), "") entry["owner_label"] = "Von mir" if entry["is_mine"] else f"Von {owner_name}" entry["context_label"] = "Gemeinsam" if entry["is_shared"] else "Persönlich" entry["can_edit"] = entry["is_shared"] or entry["is_mine"] or g.user["role"] == "admin" return entry def describe_records(rows) -> list[dict]: return [describe_record(dict(row)) for row in rows] def ensure_can_edit(entry: dict, error_message: str = "Diesen Eintrag kannst du gerade nicht bearbeiten.") -> None: if not (entry.get("can_edit") or g.user["role"] == "admin"): raise PermissionError(error_message) def allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_IMAGE_EXTENSIONS def save_photo(upload, current_filename: str | None = None) -> str | None: if not upload or not upload.filename: return current_filename if not allowed_file(upload.filename): raise ValueError("Bitte ein Bild als PNG, JPG, GIF oder WEBP hochladen.") original_name = secure_filename(upload.filename) extension = original_name.rsplit(".", 1)[1].lower() filename = f"{uuid.uuid4().hex}.{extension}" destination = Path(current_app.config["UPLOAD_FOLDER"]) / filename upload.save(destination) if current_filename: old_path = Path(current_app.config["UPLOAD_FOLDER"]) / current_filename if old_path.exists(): old_path.unlink() return filename def get_item(item_id: int) -> dict: item = get_db().execute( f""" SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, EXISTS( SELECT 1 FROM shopping_entries WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0 ) AS is_on_shopping_list FROM items LEFT JOIN users AS owner ON owner.id = items.owner_user_id WHERE items.id = ? AND {visible_clause('items')} """, [item_id, *visible_params()], ).fetchone() if item is None: raise ValueError("Der Eintrag wurde nicht gefunden.") return describe_record(dict(item)) def get_item_daypart_ids(item_id: int) -> list[int]: rows = get_db().execute( "SELECT daypart_id FROM item_dayparts WHERE item_id = ?", (item_id,), ).fetchall() return [int(row["daypart_id"]) for row in rows] def get_meal_component_ids(meal_id: int) -> list[int]: rows = get_db().execute( """ SELECT meal_components.food_item_id FROM meal_components JOIN items ON items.id = meal_components.food_item_id WHERE meal_components.meal_item_id = ? AND items.household_id = ? AND (items.visibility = 'shared' OR items.owner_user_id = ?) """, (meal_id, current_household_id(), g.user["id"]), ).fetchall() return [int(row["food_item_id"]) for row in rows] def attach_dayparts(items: list[dict]) -> list[dict]: if not items: return [] item_ids = [item["id"] for item in items] placeholders = ",".join("?" for _ in item_ids) rows = get_db().execute( f""" SELECT item_dayparts.item_id, dayparts.id, dayparts.slug, dayparts.name FROM item_dayparts JOIN dayparts ON dayparts.id = item_dayparts.daypart_id WHERE item_dayparts.item_id IN ({placeholders}) ORDER BY dayparts.sort_order """, item_ids, ).fetchall() grouped = defaultdict(list) for row in rows: grouped[row["item_id"]].append( { "id": row["id"], "slug": row["slug"], "name": row["name"], } ) enriched = [] for item in items: item["dayparts_meta"] = grouped.get(item["id"], []) item["dayparts"] = [daypart["name"] for daypart in item["dayparts_meta"]] item["primary_daypart_id"] = item["dayparts_meta"][0]["id"] if item["dayparts_meta"] else None enriched.append(item) return enriched def attach_components(items: list[dict]) -> list[dict]: meal_ids = [item["id"] for item in items if item["kind"] == "meal"] if not meal_ids: for item in items: item["components"] = [] return items placeholders = ",".join("?" for _ in meal_ids) rows = get_db().execute( f""" SELECT meal_components.meal_item_id, items.name FROM meal_components JOIN items ON items.id = meal_components.food_item_id WHERE meal_components.meal_item_id IN ({placeholders}) AND items.household_id = ? AND (items.visibility = 'shared' OR items.owner_user_id = ?) ORDER BY LOWER(items.name) """, [*meal_ids, current_household_id(), g.user["id"]], ).fetchall() grouped = defaultdict(list) for row in rows: grouped[row["meal_item_id"]].append(row["name"]) for item in items: item["components"] = grouped.get(item["id"], []) return items def fetch_items( *, kind: str | None = None, availability: str | None = None, include_archived: bool = False, query: str | None = None, daypart_id: int | None = None, visibility: str | None = None, ): database = get_db() conditions = [visible_clause("items")] params = visible_params() if kind: conditions.append("items.kind = ?") params.append(kind) if availability: conditions.append("items.availability_state = ?") params.append(availability) elif not include_archived: conditions.append("items.availability_state != 'archived'") if query: conditions.append("LOWER(items.name) LIKE ?") params.append(f"%{query.lower()}%") if daypart_id: conditions.append( """ EXISTS ( SELECT 1 FROM item_dayparts WHERE item_dayparts.item_id = items.id AND item_dayparts.daypart_id = ? ) """ ) params.append(daypart_id) if visibility: conditions.append("items.visibility = ?") params.append(visibility) rows = database.execute( f""" SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, EXISTS( SELECT 1 FROM shopping_entries WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0 ) AS is_on_shopping_list FROM items LEFT JOIN users AS owner ON owner.id = items.owner_user_id WHERE {' AND '.join(conditions)} ORDER BY CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END, CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END, LOWER(items.name) """, params, ).fetchall() return attach_components(attach_dayparts(describe_records(rows))) def fetch_food_options(): return fetch_items(kind="food", include_archived=True) def group_items_by_availability(items: list[dict]) -> list[dict]: grouped = defaultdict(list) for item in items: grouped[item["availability_state"]].append(item) ordered_states = ["home", "idea", "archived"] result = [] for state in ordered_states: entries = grouped.get(state, []) if entries: result.append( { "state": state, "title": AVAILABILITY_LABELS[state], "items": entries, } ) return result def extract_item_form_data(existing: dict | None = None) -> dict: form_data = existing or {} form_data.update( { "name": request.form.get("name", "").strip(), "category": request.form.get("category", "").strip(), "note": request.form.get("note", "").strip(), "visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")), "daypart_ids": [int(value) for value in request.form.getlist("daypart_ids") if value.isdigit()], "component_ids": [int(value) for value in request.form.getlist("component_ids") if value.isdigit()], "quick_food_name": request.form.get("quick_food_name", "").strip(), "quick_food_category": request.form.get("quick_food_category", "").strip(), "quick_food_note": request.form.get("quick_food_note", "").strip(), } ) return form_data def create_quick_food_from_form(form_data: dict) -> int: database = get_db() cursor = database.execute( """ INSERT INTO items ( household_id, owner_user_id, visibility, kind, name, category, note, created_by, updated_by ) VALUES (?, ?, ?, 'food', ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["visibility"], form_data["quick_food_name"], form_data["quick_food_category"], form_data["quick_food_note"], g.user["id"], g.user["id"], ), ) food_id = int(cursor.lastrowid) sync_item_dayparts(food_id, form_data["daypart_ids"]) database.commit() return food_id def add_to_shopping_list(item_id: int, user_id: int, visibility_override: str | None = None) -> bool: database = get_db() item = get_item(item_id) existing = database.execute( """ SELECT id FROM shopping_entries WHERE item_id = ? AND is_checked = 0 """, (item_id,), ).fetchone() if existing: return False visibility = normalize_visibility(visibility_override, item["visibility"]) owner_user_id = user_id if visibility == "personal" else item["owner_user_id"] database.execute( """ INSERT INTO shopping_entries (household_id, owner_user_id, visibility, item_id, added_by) VALUES (?, ?, ?, ?, ?) """, (current_household_id(), owner_user_id, visibility, item_id, user_id), ) database.commit() return True def ensure_planned_item_is_shopped(item_id: int, user_id: int, visibility: str) -> bool: item = get_item(item_id) if item["availability_state"] == "home": return False return add_to_shopping_list(item_id, user_id, visibility_override=visibility) def sync_item_dayparts(item_id: int, daypart_ids: list[int]) -> None: database = get_db() database.execute("DELETE FROM item_dayparts WHERE item_id = ?", (item_id,)) for daypart_id in daypart_ids: database.execute( "INSERT INTO item_dayparts (item_id, daypart_id) VALUES (?, ?)", (item_id, daypart_id), ) def sync_meal_components(meal_id: int, food_ids: list[int]) -> None: database = get_db() database.execute("DELETE FROM meal_components WHERE meal_item_id = ?", (meal_id,)) visible_foods = { row["id"] for row in database.execute( f""" SELECT items.id FROM items WHERE items.kind = 'food' AND {visible_clause('items')} """, visible_params(), ).fetchall() } for food_id in food_ids: if food_id not in visible_foods: continue database.execute( """ INSERT INTO meal_components (meal_item_id, food_item_id) VALUES (?, ?) """, (meal_id, food_id), ) def fetch_shopping_entries(): rows = get_db().execute( f""" SELECT shopping_entries.*, items.name AS item_name, items.kind AS item_kind, items.photo_filename, items.availability_state, owner.display_name AS owner_display_name, owner.username AS owner_username, added_by_user.display_name AS added_by_display_name, added_by_user.username AS added_by_username FROM shopping_entries JOIN items ON items.id = shopping_entries.item_id LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id LEFT JOIN users AS added_by_user ON added_by_user.id = shopping_entries.added_by WHERE shopping_entries.is_checked = 0 AND {visible_clause('shopping_entries')} ORDER BY CASE shopping_entries.visibility WHEN 'shared' THEN 0 ELSE 1 END, shopping_entries.added_at DESC """, visible_params(), ).fetchall() return describe_records(rows) def fetch_plan_entries_for_range(start_date: date, end_date: date): rows = get_db().execute( f""" SELECT plan_entries.*, items.name AS item_name, items.kind AS item_kind, items.photo_filename, items.availability_state, dayparts.name AS daypart_name, dayparts.slug AS daypart_slug, dayparts.sort_order, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries JOIN items ON items.id = plan_entries.item_id JOIN dayparts ON dayparts.id = plan_entries.daypart_id LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_date BETWEEN ? AND ? AND {visible_clause('plan_entries')} ORDER BY plan_date, dayparts.sort_order, items.name """, [start_date.isoformat(), end_date.isoformat(), *visible_params()], ).fetchall() grouped = defaultdict(list) for row in describe_records(rows): grouped[(row["plan_date"], row["daypart_id"])].append(row) return grouped def fetch_day_plan_entries(selected_date: date): return fetch_plan_entries_for_range(selected_date, selected_date) def fetch_recent_plan_items(daypart_id: int, limit: int = 6): rows = get_db().execute( f""" SELECT DISTINCT items.id, items.household_id, items.owner_user_id, items.visibility, items.name, items.kind, items.category, items.note, items.photo_filename, items.availability_state, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries JOIN items ON items.id = plan_entries.item_id LEFT JOIN users AS owner ON owner.id = items.owner_user_id WHERE plan_entries.daypart_id = ? AND {visible_clause('items')} ORDER BY plan_entries.created_at DESC LIMIT ? """, [daypart_id, *visible_params(), limit * 3], ).fetchall() return attach_components(attach_dayparts(describe_records(rows))) def fetch_plan_candidates(daypart_id: int, query: str | None = None): params = [daypart_id, *visible_params()] conditions = [visible_clause("items"), "items.availability_state != 'archived'"] if query: conditions.append("LOWER(items.name) LIKE ?") params.append(f"%{query.lower()}%") rows = get_db().execute( f""" SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, EXISTS( SELECT 1 FROM item_dayparts WHERE item_dayparts.item_id = items.id AND item_dayparts.daypart_id = ? ) AS matches_daypart FROM items LEFT JOIN users AS owner ON owner.id = items.owner_user_id WHERE {' AND '.join(conditions)} ORDER BY CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END, matches_daypart DESC, CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END, LOWER(items.name) """, params, ).fetchall() return attach_components(attach_dayparts(describe_records(rows))) def build_home_sections(items: list[dict], dayparts: list, selected_daypart_id: int | None): sections = [] if selected_daypart_id: selected_daypart = next((daypart for daypart in dayparts if daypart["id"] == selected_daypart_id), None) matching_items = [item for item in items if any(dp["id"] == selected_daypart_id for dp in item["dayparts_meta"])] sections.append( { "title": selected_daypart["name"] if selected_daypart else "Ausgewählte Tageszeit", "items": matching_items, "slug": selected_daypart["slug"] if selected_daypart else "selected", } ) return sections for daypart in dayparts: matching_items = [item for item in items if any(dp["id"] == daypart["id"] for dp in item["dayparts_meta"])] sections.append( { "title": daypart["name"], "items": matching_items, "slug": daypart["slug"], } ) anytime_items = [item for item in items if not item["dayparts_meta"]] if anytime_items: sections.append( { "title": "Ohne feste Tageszeit", "items": anytime_items, "slug": "anytime", } ) return sections def dedupe_items(items: list[dict], limit: int = 6) -> list[dict]: seen_ids = set() result = [] for item in items: if item["id"] in seen_ids: continue seen_ids.add(item["id"]) result.append(item) if len(result) >= limit: break return result def build_day_planner_sections(selected_date: date, selected_item_id: int | None, selected_daypart_id: int | None): dayparts = get_dayparts() day_entries = fetch_day_plan_entries(selected_date) sections = [] for daypart in dayparts: candidates = fetch_plan_candidates(daypart["id"]) home_candidates = [item for item in candidates if item["availability_state"] == "home"] matching_candidates = [ item for item in candidates if any(meta["id"] == daypart["id"] for meta in item["dayparts_meta"]) ] recent_candidates = fetch_recent_plan_items(daypart["id"]) quick_items = dedupe_items(home_candidates + recent_candidates + matching_candidates, limit=6) sections.append( { "daypart": daypart, "entries": day_entries.get((selected_date.isoformat(), daypart["id"]), []), "candidates": candidates, "quick_items": quick_items, "selected_item_id": selected_item_id if selected_daypart_id == daypart["id"] else None, "is_open": selected_daypart_id == daypart["id"], "summary_items": [entry["item_name"] for entry in day_entries.get((selected_date.isoformat(), daypart["id"]), [])][:2], "default_visibility": "shared", } ) return sections def fetch_week_cards(week_start: date): days = [week_start + timedelta(days=index) for index in range(7)] dayparts = get_dayparts() grouped_entries = fetch_plan_entries_for_range(week_start, week_start + timedelta(days=6)) cards = [] for current_day in days: filled_dayparts = [] planned_count = 0 preview_items = [] slots = [] for daypart in dayparts: slot_entries = grouped_entries.get((current_day.isoformat(), daypart["id"]), []) slots.append({"daypart": dict(daypart), "entries": slot_entries}) if slot_entries: filled_dayparts.append( { "id": daypart["id"], "name": daypart["name"], "count": len(slot_entries), } ) planned_count += len(slot_entries) preview_items.extend(entry["item_name"] for entry in slot_entries[:2]) cards.append( { "date": current_day, "filled_dayparts": filled_dayparts, "planned_count": planned_count, "preview_items": preview_items[:4], "slots": slots, } ) return cards def count_visible_items(availability_state: str) -> int: row = get_db().execute( f"SELECT COUNT(*) AS count FROM items WHERE availability_state = ? AND {visible_clause('items')}", [availability_state, *visible_params()], ).fetchone() return int(row["count"]) @main_bp.get("/") @login_required def dashboard(): today = date.today() home_count = count_visible_items("home") archive_count = count_visible_items("archived") shopping_count = get_db().execute( f"SELECT COUNT(*) AS count FROM shopping_entries WHERE is_checked = 0 AND {visible_clause('shopping_entries')}", visible_params(), ).fetchone()["count"] today_entries = [] for entries in fetch_day_plan_entries(today).values(): today_entries.extend(entries) today_entries.sort(key=lambda entry: (entry["sort_order"], entry["item_name"].lower())) week_cards = fetch_week_cards(today - timedelta(days=today.weekday())) home_items = fetch_items(availability="home") return render_template( "dashboard.html", home_count=home_count, shopping_count=shopping_count, archive_count=archive_count, today_entries=today_entries, home_items=home_items[:8], today=today, week_cards=week_cards[:3], ) @main_bp.get("/more") @login_required def more_view(): return render_template("more.html") @main_bp.route("/items/") @login_required def item_list(kind: str): if kind not in ITEM_KIND_LABELS: return redirect(url_for("main.dashboard")) query = request.args.get("q", "").strip() state = request.args.get("state", "").strip() scope = request.args.get("visibility", "").strip() raw_daypart_id = request.args.get("daypart_id", "").strip() daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None items = fetch_items( kind=kind, availability=state or None, query=query or None, daypart_id=daypart_id, visibility=scope or None, ) return render_template( "items/list.html", kind=kind, items=items, availability_labels=AVAILABILITY_LABELS, query=query, selected_state=state, selected_visibility=scope, selected_daypart_id=daypart_id, dayparts=get_dayparts(), state_options=ACTIVE_STATE_OPTIONS, visibility_options=VISIBILITY_FILTER_OPTIONS, today=date.today(), ) @main_bp.route("/items//new", methods=("GET", "POST")) @login_required def item_create(kind: str): if kind not in ITEM_KIND_LABELS: return redirect(url_for("main.dashboard")) database = get_db() dayparts = get_dayparts() foods = fetch_food_options() food_groups = group_items_by_availability(foods) form_data = { "name": "", "category": "", "note": "", "visibility": "shared", "daypart_ids": [], "component_ids": [], "quick_food_name": "", "quick_food_category": "", "quick_food_note": "", } if request.method == "POST": form_action = request.form.get("form_action", "save_item") form_data = extract_item_form_data(form_data) name = form_data["name"] if kind == "meal" and form_action == "quick_add_food": if not form_data["quick_food_name"]: flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error") else: new_food_id = create_quick_food_from_form(form_data) if new_food_id not in form_data["component_ids"]: form_data["component_ids"].append(new_food_id) form_data["component_ids"] = sorted(form_data["component_ids"]) form_data["quick_food_name"] = "" form_data["quick_food_category"] = "" form_data["quick_food_note"] = "" foods = fetch_food_options() food_groups = group_items_by_availability(foods) flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success") return render_template( "items/form.html", kind=kind, item=None, dayparts=dayparts, food_groups=food_groups, categories=CATEGORIES, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, ) error = None if not name: error = "Bitte einen Namen eintragen." photo_filename = None if error is None: try: photo_filename = save_photo(request.files.get("photo")) except ValueError as exc: error = str(exc) if error is None: cursor = database.execute( """ INSERT INTO items ( household_id, owner_user_id, visibility, kind, name, category, note, photo_filename, created_by, updated_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["visibility"], kind, name, form_data["category"], form_data["note"], photo_filename, g.user["id"], g.user["id"], ), ) item_id = int(cursor.lastrowid) sync_item_dayparts(item_id, form_data["daypart_ids"]) if kind == "meal": sync_meal_components(item_id, form_data["component_ids"]) database.commit() flash(f"{ITEM_KIND_SINGULAR_LABELS[kind]} wurde angelegt.", "success") return redirect(url_for("main.item_list", kind=kind)) flash(error, "error") return render_template( "items/form.html", kind=kind, item=None, dayparts=dayparts, food_groups=food_groups, categories=CATEGORIES, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, ) @main_bp.route("/items//edit", methods=("GET", "POST")) @login_required def item_edit(item_id: int): database = get_db() try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.dashboard")) kind = item["kind"] dayparts = get_dayparts() foods = fetch_food_options() food_groups = group_items_by_availability(foods) form_data = { "name": item["name"], "category": item["category"] or "", "note": item["note"] or "", "visibility": item["visibility"], "daypart_ids": get_item_daypart_ids(item_id), "component_ids": get_meal_component_ids(item_id) if kind == "meal" else [], "quick_food_name": "", "quick_food_category": "", "quick_food_note": "", } if request.method == "POST": form_action = request.form.get("form_action", "save_item") form_data = extract_item_form_data(form_data) name = form_data["name"] if kind == "meal" and form_action == "quick_add_food": if not form_data["quick_food_name"]: flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error") else: new_food_id = create_quick_food_from_form(form_data) if new_food_id not in form_data["component_ids"]: form_data["component_ids"].append(new_food_id) form_data["component_ids"] = sorted(form_data["component_ids"]) form_data["quick_food_name"] = "" form_data["quick_food_category"] = "" form_data["quick_food_note"] = "" foods = fetch_food_options() food_groups = group_items_by_availability(foods) flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success") return render_template( "items/form.html", kind=kind, item=item, dayparts=dayparts, food_groups=food_groups, categories=CATEGORIES, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, ) error = None if not name: error = "Bitte einen Namen eintragen." photo_filename = item["photo_filename"] if error is None: try: photo_filename = save_photo(request.files.get("photo"), current_filename=item["photo_filename"]) except ValueError as exc: error = str(exc) if error is None: database.execute( """ UPDATE items SET name = ?, category = ?, note = ?, visibility = ?, photo_filename = ?, updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( name, form_data["category"], form_data["note"], form_data["visibility"], photo_filename, g.user["id"], item_id, ), ) sync_item_dayparts(item_id, form_data["daypart_ids"]) if kind == "meal": sync_meal_components(item_id, form_data["component_ids"]) database.commit() flash("Der Eintrag wurde aktualisiert.", "success") return redirect(url_for("main.item_list", kind=kind)) flash(error, "error") return render_template( "items/form.html", kind=kind, item=item, dayparts=dayparts, food_groups=food_groups, categories=CATEGORIES, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, ) @main_bp.post("/items//shopping") @login_required def item_add_to_shopping(item_id: int): try: item = get_item(item_id) except ValueError as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.shopping_list")) added = add_to_shopping_list(item_id, g.user["id"], visibility_override=item["visibility"]) if added: flash(f"{item['name']} steht jetzt auf der Einkaufsliste.", "success") else: flash(f"{item['name']} ist bereits auf der Einkaufsliste.", "info") return redirect(request.referrer or url_for("main.shopping_list")) @main_bp.post("/items//set-home") @login_required def item_set_home(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.home_view")) get_db().execute( """ UPDATE items SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (g.user["id"], item_id), ) get_db().commit() flash(f"{item['name']} ist jetzt unter Zuhause sichtbar.", "success") return redirect(request.referrer or url_for("main.home_view")) @main_bp.post("/items//archive") @login_required def item_archive(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.archive_view")) get_db().execute( """ UPDATE items SET availability_state = 'archived', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (g.user["id"], item_id), ) get_db().commit() flash(f"{item['name']} liegt jetzt im Archiv und bleibt später leicht wiederfindbar.", "info") return redirect(request.referrer or url_for("main.archive_view")) @main_bp.post("/items//restore") @login_required def item_restore(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.archive_view")) get_db().execute( """ UPDATE items SET availability_state = 'idea', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (g.user["id"], item_id), ) get_db().commit() flash(f"{item['name']} ist wieder in der aktiven Liste.", "success") return redirect(request.referrer or url_for("main.archive_view")) @main_bp.route("/shopping", methods=("GET", "POST")) @login_required def shopping_list(): database = get_db() if request.method == "POST": selected_item_id = request.form.get("item_id", "").strip() if not selected_item_id or not selected_item_id.isdigit(): flash("Bitte zuerst etwas auswählen.", "error") else: try: item = get_item(int(selected_item_id)) added = add_to_shopping_list(item["id"], g.user["id"], visibility_override=item["visibility"]) if added: flash(f"{item['name']} wurde auf die Einkaufsliste gesetzt.", "success") else: flash(f"{item['name']} ist bereits auf der Einkaufsliste.", "info") except ValueError as exc: flash(str(exc), "error") return redirect(url_for("main.shopping_list")) entries = fetch_shopping_entries() addable_items = fetch_items(include_archived=False) addable_items = [item for item in addable_items if not item["is_on_shopping_list"]] return render_template("shopping/list.html", entries=entries, addable_items=addable_items) @main_bp.post("/shopping//check") @login_required def shopping_check(entry_id: int): entry = get_db().execute( f""" SELECT shopping_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM shopping_entries LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Einkaufseintrag wurde nicht gefunden.", "error") return redirect(url_for("main.shopping_list")) entry_dict = describe_record(dict(entry)) try: ensure_can_edit(entry_dict, "Diesen Einkaufseintrag kannst du gerade nicht ändern.") item = get_item(entry["item_id"]) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.shopping_list")) database = get_db() database.execute( """ UPDATE shopping_entries SET is_checked = 1, checked_at = CURRENT_TIMESTAMP, checked_by = ? WHERE id = ? """, (g.user["id"], entry_id), ) database.execute( """ UPDATE items SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (g.user["id"], item["id"]), ) database.commit() flash(f"{item['name']} ist jetzt als Zuhause vorhanden markiert.", "success") return redirect(url_for("main.shopping_list")) @main_bp.post("/shopping//remove") @login_required def shopping_remove(entry_id: int): entry = get_db().execute( f""" SELECT shopping_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM shopping_entries LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Eintrag wurde nicht gefunden.", "error") return redirect(url_for("main.shopping_list")) entry_dict = describe_record(dict(entry)) try: ensure_can_edit(entry_dict, "Diesen Einkaufseintrag kannst du gerade nicht entfernen.") except PermissionError as exc: flash(str(exc), "error") return redirect(url_for("main.shopping_list")) get_db().execute("DELETE FROM shopping_entries WHERE id = ?", (entry_id,)) get_db().commit() flash("Der Eintrag wurde von der Einkaufsliste entfernt.", "info") return redirect(url_for("main.shopping_list")) @main_bp.get("/home") @login_required def home_view(): query = request.args.get("q", "").strip() scope = request.args.get("visibility", "").strip() raw_daypart_id = request.args.get("daypart_id", "").strip() daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None dayparts = get_dayparts() items = fetch_items( availability="home", query=query or None, daypart_id=daypart_id, visibility=scope or None, ) sections = build_home_sections(items, dayparts, daypart_id) return render_template( "home/list.html", sections=sections, query=query, dayparts=dayparts, selected_daypart_id=daypart_id, selected_visibility=scope, visibility_options=VISIBILITY_FILTER_OPTIONS, today=date.today(), ) @main_bp.get("/archive") @login_required def archive_view(): query = request.args.get("q", "").strip() selected_kind = request.args.get("kind", "").strip() selected_visibility = request.args.get("visibility", "").strip() kind = selected_kind if selected_kind in ITEM_KIND_LABELS else None items = fetch_items( kind=kind, availability="archived", include_archived=True, query=query or None, visibility=selected_visibility or None, ) return render_template( "archive/list.html", items=items, query=query, selected_kind=selected_kind, selected_visibility=selected_visibility, kind_options=KIND_FILTER_OPTIONS, visibility_options=VISIBILITY_FILTER_OPTIONS, today=date.today(), ) @main_bp.get("/planner") @login_required def planner(): week_start = parse_week_start(request.args.get("week")) return render_template( "planner/week.html", week_start=week_start, week_end=week_start + timedelta(days=6), prev_week=week_start - timedelta(days=7), next_week=week_start + timedelta(days=7), week_cards=fetch_week_cards(week_start), today=date.today(), ) @main_bp.route("/planner/day", methods=("GET", "POST")) @login_required def planner_day(): selected_date = parse_plan_date(request.values.get("date")) if request.method == "POST": item_id_raw = request.form.get("item_id", "").strip() daypart_id_raw = request.form.get("daypart_id", "").strip() note = request.form.get("note", "").strip() selected_date = parse_plan_date(request.form.get("plan_date")) visibility = normalize_visibility(request.form.get("visibility"), "shared") error = None if not item_id_raw or not item_id_raw.isdigit(): error = "Bitte etwas für den Tagesplan auswählen." elif not daypart_id_raw or not daypart_id_raw.isdigit(): error = "Bitte eine Tageszeit auswählen." if error is None: item_id = int(item_id_raw) daypart_id = int(daypart_id_raw) try: item = get_item(item_id) except ValueError as exc: error = str(exc) if error is None: get_db().execute( """ INSERT INTO plan_entries (household_id, owner_user_id, visibility, plan_date, daypart_id, item_id, note, created_by) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], visibility, selected_date.isoformat(), daypart_id, item_id, note, g.user["id"], ), ) get_db().commit() if ensure_planned_item_is_shopped(item_id, g.user["id"], visibility): flash("Der Eintrag ist noch nicht zuhause und wurde zusätzlich auf die Einkaufsliste gesetzt.", "info") flash("Der Eintrag wurde in den Tagesplan gelegt.", "success") return redirect( f"{url_for('main.planner_day', date=selected_date.isoformat(), daypart_id=daypart_id)}#daypart-{daypart_id}" ) flash(error, "error") selected_item_raw = request.args.get("item_id", "").strip() selected_daypart_raw = request.args.get("daypart_id", "").strip() selected_item_id = int(selected_item_raw) if selected_item_raw.isdigit() else None selected_daypart_id = int(selected_daypart_raw) if selected_daypart_raw.isdigit() else None sections = build_day_planner_sections(selected_date, selected_item_id, selected_daypart_id) return render_template( "planner/day.html", selected_date=selected_date, previous_day=selected_date - timedelta(days=1), next_day=selected_date + timedelta(days=1), sections=sections, today=date.today(), visibility_options=VISIBILITY_FORM_OPTIONS, ) @main_bp.post("/planner//remove") @login_required def planner_remove(entry_id: int): selected_date = request.args.get("date", "") entry = get_db().execute( f""" SELECT plan_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_entries.id = ? AND {visible_clause('plan_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Planeintrag wurde nicht gefunden.", "error") else: try: ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht entfernen.") get_db().execute("DELETE FROM plan_entries WHERE id = ?", (entry_id,)) get_db().commit() flash("Der Planeintrag wurde entfernt.", "info") except PermissionError as exc: flash(str(exc), "error") if selected_date: return redirect(url_for("main.planner_day", date=selected_date)) return redirect(url_for("main.planner")) @main_bp.post("/planner//move") @login_required def planner_move(entry_id: int): target_date = parse_plan_date(request.form.get("target_date")) target_daypart_raw = request.form.get("target_daypart_id", "").strip() if not target_daypart_raw.isdigit(): return jsonify({"ok": False, "error": "Ungültige Tageszeit"}), 400 database = get_db() entry = database.execute( f""" SELECT plan_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_entries.id = ? AND {visible_clause('plan_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: return jsonify({"ok": False, "error": "Eintrag nicht gefunden"}), 404 entry_dict = describe_record(dict(entry)) try: ensure_can_edit(entry_dict, "Diesen Planeintrag kannst du gerade nicht verschieben.") except PermissionError as exc: return jsonify({"ok": False, "error": str(exc)}), 403 target_daypart_id = int(target_daypart_raw) database.execute( """ UPDATE plan_entries SET plan_date = ?, daypart_id = ? WHERE id = ? """, (target_date.isoformat(), target_daypart_id, entry_id), ) database.commit() was_added_to_shopping = ensure_planned_item_is_shopped(entry["item_id"], g.user["id"], entry["visibility"]) if was_added_to_shopping: flash("Der verschobene Eintrag ist noch nicht zuhause und wurde auf die Einkaufsliste gesetzt.", "info") return jsonify( { "ok": True, "added_to_shopping": was_added_to_shopping, "redirect_url": url_for("main.planner", week=parse_week_start(target_date.isoformat()).isoformat()), } )