from __future__ import annotations from collections import defaultdict from datetime import date, datetime, timedelta from io import BytesIO from itertools import product from pathlib import Path import sqlite3 from flask import ( Blueprint, after_this_request, current_app, flash, g, jsonify, redirect, render_template, request, send_file, url_for, ) from .auth import admin_required, login_required from .backup import RESTORE_CONFIRMATION_TEXT, export_backup_archive, restore_backup_archive from .constants import ( AVAILABILITY_LABELS, BUILDER_LABELS, DEFAULT_CATEGORY_BUILDERS, DAY_TEMPLATE_NAME_SUGGESTIONS, DEFAULT_CATEGORIES, ENERGY_DENSITY_LABELS, ENERGY_DENSITY_OPTIONS, ITEM_KIND_LABELS, ITEM_KIND_SINGULAR_LABELS, ITEM_SET_NAME_SUGGESTIONS, NOTIFICATION_CHANNEL_OPTIONS, SUGGESTION_STYLE_LABELS, SUGGESTION_STYLE_OPTIONS, VISIBILITY_DESCRIPTIONS, VISIBILITY_LABELS, WEEKDAY_OPTIONS, WEEK_TEMPLATE_NAME_SUGGESTIONS, ) from .db import get_db from .images import ( allowed_image_file, save_photo_with_variants, upload_file_size_ok, ) from .push import push_is_configured, push_public_key, send_push_message main_bp = Blueprint("main", __name__) ACTIVE_STATE_OPTIONS = [ ("", "Alle aktiven"), ("home", "Zuhause"), ("idea", "Merkliste"), ] KIND_FILTER_OPTIONS = [ ("", "Alles"), ("food", "Lebensmittel"), ("meal", "Mahlzeitenideen"), ] VISIBILITY_FILTER_OPTIONS = [ ("", "Alles Sichtbare"), ("shared", "Gemeinsam"), ("personal", "Persönlich"), ] VISIBILITY_FORM_OPTIONS = [ ("shared", "Gemeinsam"), ("personal", "Persönlich"), ] TARGET_USER_OPTIONS_DEFAULT = "__all__" WEEKDAY_LABELS = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"] PRIMARY_DAYPART_SLUGS = {"breakfast", "lunch", "dinner"} SNACK_DAYPART_SLUGS = {"morning-snack", "afternoon-snack", "late-snack"} PDF_DAYPART_LABELS = { "morning-snack": "Snack am Vormittag", "afternoon-snack": "Snack am Nachmittag", "late-snack": "Später Snack", } @main_bp.before_app_request def refresh_due_context(): endpoint = request.endpoint or "" if getattr(g, "user", None) is None: return None if request.method == "GET" and endpoint.startswith("main."): try: activate_due_shopping_needs() except sqlite3.OperationalError: current_app.logger.warning("Due shopping needs could not be activated during this request.") return None def current_household_id() -> int: return int(g.user["household_id"]) def get_dayparts() -> list: return get_db().execute("SELECT * FROM dayparts ORDER BY sort_order").fetchall() def format_weekday(day_value: date) -> str: return WEEKDAY_LABELS[day_value.weekday()] def get_household_users(active_only: bool = True): query = """ SELECT id, username, display_name, role FROM users WHERE household_id = ? """ params: list[object] = [current_household_id()] if active_only: query += " AND is_active = 1" query += " ORDER BY LOWER(COALESCE(display_name, username))" return get_db().execute(query, params).fetchall() def get_target_user_options() -> list[dict]: options = [{"value": TARGET_USER_OPTIONS_DEFAULT, "label": "Für alle"}] for user in get_household_users(): options.append( { "value": str(user["id"]), "label": user["display_name"] or user["username"], } ) return options def get_category_options(include_inactive_selected: str | None = None) -> list[str]: rows = get_db().execute( """ SELECT name FROM household_categories WHERE household_id = ? AND is_active = 1 ORDER BY sort_order, LOWER(name) """, (current_household_id(),), ).fetchall() categories = [row["name"] for row in rows] if not categories: categories = DEFAULT_CATEGORIES[:] if include_inactive_selected and include_inactive_selected not in categories: categories.append(include_inactive_selected) return categories def get_category_builder_map() -> dict[str, str]: rows = get_db().execute( """ SELECT name, builder_key FROM household_categories WHERE household_id = ? """, (current_household_id(),), ).fetchall() builder_map = {row["name"]: (row["builder_key"] or "neutral") for row in rows} for name, builder_key in DEFAULT_CATEGORY_BUILDERS.items(): builder_map.setdefault(name, builder_key) return builder_map def get_household_settings() -> dict: row = get_db().execute( """ SELECT shopping_weekday, shopping_prep_days, shopping_reminder_time FROM households WHERE id = ? """, (current_household_id(),), ).fetchone() if row is None: return { "shopping_weekday": 5, "shopping_prep_days": 1, "shopping_reminder_time": "18:00", } return { "shopping_weekday": int(row["shopping_weekday"] or 5), "shopping_prep_days": int(row["shopping_prep_days"] or 1), "shopping_reminder_time": row["shopping_reminder_time"] or "18:00", } def default_user_settings() -> dict: suggestion_style = "balanced" return { "user_id": int(g.user["id"]), "reminders_enabled": True, "push_enabled": False, "notification_channel": "in_app", "suggestion_style": suggestion_style, "energy_preference": suggestion_style_energy_preference(suggestion_style), "remind_before_shopping": True, "remind_on_shopping_day": True, "show_missing_for_upcoming_week": True, "show_planned_not_shopped": True, "remind_tomorrow_if_sparse": True, "remind_week_if_sparse": True, "push_missing_breakfast": False, "push_missing_lunch": False, "push_missing_dinner": False, "push_small_snack": False, "suggest_home_for_today": True, "remind_small_snack": False, "remind_nuts": False, "show_meal_balancing": True, "suggest_templates": True, "suggest_patterns": True, } def ensure_user_settings_row(*, commit: bool = False) -> None: existing = get_db().execute( "SELECT 1 FROM user_settings WHERE user_id = ? LIMIT 1", (g.user["id"],), ).fetchone() if existing is not None: return get_db().execute( "INSERT INTO user_settings (user_id) VALUES (?)", (g.user["id"],), ) if commit: get_db().commit() def get_user_settings() -> dict: settings = default_user_settings() row = get_db().execute("SELECT * FROM user_settings WHERE user_id = ?", (g.user["id"],)).fetchone() if row is None: return settings settings.update(dict(row)) boolean_fields = { "reminders_enabled", "push_enabled", "remind_before_shopping", "remind_on_shopping_day", "show_missing_for_upcoming_week", "show_planned_not_shopped", "remind_tomorrow_if_sparse", "remind_week_if_sparse", "push_missing_breakfast", "push_missing_lunch", "push_missing_dinner", "push_small_snack", "suggest_home_for_today", "remind_small_snack", "remind_nuts", "show_meal_balancing", "suggest_templates", "suggest_patterns", } for field in boolean_fields: settings[field] = bool(settings.get(field)) settings["notification_channel"] = settings.get("notification_channel") or "in_app" settings["suggestion_style"] = normalize_suggestion_style(settings.get("suggestion_style"), "balanced") settings["energy_preference"] = suggestion_style_energy_preference(settings["suggestion_style"]) return settings def parse_checkbox(name: str, default: bool = False) -> int: return 1 if request.form.get(name, "1" if default else "0") == "1" else 0 def normalize_weekday(raw: str | None, default: int = 5) -> int: if raw and raw.isdigit(): value = int(raw) if 0 <= value <= 6: return value return default def normalize_notification_channel(raw: str | None, default: str = "in_app") -> str: allowed = {value for value, _label in NOTIFICATION_CHANNEL_OPTIONS} return raw if raw in allowed else default def normalize_suggestion_style(raw: str | None, default: str = "balanced") -> str: allowed = {value for value, _label in SUGGESTION_STYLE_OPTIONS} if raw == "easy" or raw == "snack": return "balanced" return raw if raw in allowed else default def normalize_energy_density(raw: str | None, default: str = "neutral") -> str: allowed = {value for value, _label in ENERGY_DENSITY_OPTIONS} return raw if raw in allowed else default def suggestion_style_energy_preference(style: str) -> str: if style == "fitness": return "low" return "neutral" def visible_clause(table_alias: str) -> str: return ( f"{table_alias}.household_id = ? " f"AND ({table_alias}.visibility = 'shared' OR {table_alias}.owner_user_id = ?)" ) def visible_params() -> list[int]: return [current_household_id(), int(g.user["id"])] def parse_week_start(raw: str | None) -> date: if raw: try: parsed = datetime.strptime(raw, "%Y-%m-%d").date() return parsed - timedelta(days=parsed.weekday()) except ValueError: pass today = date.today() return today - timedelta(days=today.weekday()) def parse_plan_date(raw: str | None, fallback: date | None = None) -> date: if raw: try: return datetime.strptime(raw, "%Y-%m-%d").date() except ValueError: pass return fallback or date.today() def normalize_visibility(raw: str | None, default: str = "shared") -> str: return raw if raw in VISIBILITY_LABELS else default def normalize_target_user_id(raw: str | None) -> int | None: if not raw or raw == TARGET_USER_OPTIONS_DEFAULT: return None if not raw.isdigit(): return None target_id = int(raw) allowed = {int(user["id"]) for user in get_household_users()} return target_id if target_id in allowed else None def save_photo(upload, current_filename: str | None = None) -> str | None: if not upload or not upload.filename: return current_filename if not allowed_image_file(upload.filename): raise ValueError("Bitte ein Bild als PNG, JPG, GIF oder WEBP hochladen.") if not upload_file_size_ok(upload, current_app.config["MAX_CONTENT_LENGTH"]): raise ValueError("Das Bild ist gerade zu groß. Ein etwas kleineres Foto hilft hier am besten.") return save_photo_with_variants(upload, current_app.config["UPLOAD_FOLDER"], current_filename=current_filename) def user_display_name(display_name: str | None, username: str | None) -> str: return display_name or username or "Haushalt" def describe_record(entry: dict) -> dict: owner_name = user_display_name(entry.get("owner_display_name"), entry.get("owner_username")) target_name = user_display_name(entry.get("target_display_name"), entry.get("target_username")) if entry.get("target_user_id") else None entry["owner_name"] = owner_name entry["target_name"] = target_name entry["energy_density"] = normalize_energy_density(entry.get("energy_density"), "neutral") entry["energy_density_label"] = ENERGY_DENSITY_LABELS.get(entry["energy_density"], ENERGY_DENSITY_LABELS["neutral"]) entry["is_personal"] = entry.get("visibility") == "personal" entry["is_shared"] = entry.get("visibility") == "shared" entry["is_mine"] = entry.get("owner_user_id") == g.user["id"] entry["visibility_label"] = VISIBILITY_LABELS.get(entry.get("visibility"), "Gemeinsam") entry["visibility_description"] = VISIBILITY_DESCRIPTIONS.get(entry.get("visibility"), "") entry["owner_label"] = "Von mir" if entry["is_mine"] else f"Von {owner_name}" if target_name: entry["for_label"] = f"Für {target_name}" elif entry["is_personal"]: entry["for_label"] = "Für mich" if entry["is_mine"] else f"Für {owner_name}" else: entry["for_label"] = "Für alle" entry["can_edit"] = entry["is_shared"] or entry["is_mine"] or g.user["role"] == "admin" return entry def describe_records(rows) -> list[dict]: return [describe_record(dict(row)) for row in rows] def describe_template_record(entry: dict) -> dict: owner_name = user_display_name(entry.get("owner_display_name"), entry.get("owner_username")) entry["owner_name"] = owner_name entry["is_mine"] = entry.get("owner_user_id") == g.user["id"] entry["visibility_label"] = VISIBILITY_LABELS.get(entry.get("visibility"), "Gemeinsam") entry["owner_label"] = "Von mir" if entry["is_mine"] else f"Von {owner_name}" entry["can_edit"] = entry.get("visibility") == "shared" or entry["is_mine"] or g.user["role"] == "admin" return entry def ensure_can_edit(entry: dict, error_message: str = "Diesen Eintrag kannst du gerade nicht bearbeiten.") -> None: if not (entry.get("can_edit") or g.user["role"] == "admin"): raise PermissionError(error_message) def get_item(item_id: int) -> dict: item = get_db().execute( f""" SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username, EXISTS( SELECT 1 FROM shopping_entries WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0 ) AS is_on_shopping_list FROM items LEFT JOIN users AS owner ON owner.id = items.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE items.id = ? AND {visible_clause('items')} """, [item_id, *visible_params()], ).fetchone() if item is None: raise ValueError("Der Eintrag wurde nicht gefunden.") return describe_record(dict(item)) def get_item_daypart_ids(item_id: int) -> list[int]: rows = get_db().execute( "SELECT daypart_id FROM item_dayparts WHERE item_id = ?", (item_id,), ).fetchall() return [int(row["daypart_id"]) for row in rows] def get_meal_component_ids(meal_id: int) -> list[int]: rows = get_db().execute( """ SELECT meal_components.food_item_id FROM meal_components JOIN items ON items.id = meal_components.food_item_id WHERE meal_components.meal_item_id = ? AND items.household_id = ? AND (items.visibility = 'shared' OR items.owner_user_id = ?) """, (meal_id, current_household_id(), g.user["id"]), ).fetchall() return [int(row["food_item_id"]) for row in rows] def attach_dayparts(items: list[dict]) -> list[dict]: if not items: return [] item_ids = [item["id"] for item in items] placeholders = ",".join("?" for _ in item_ids) rows = get_db().execute( f""" SELECT item_dayparts.item_id, dayparts.id, dayparts.slug, dayparts.name FROM item_dayparts JOIN dayparts ON dayparts.id = item_dayparts.daypart_id WHERE item_dayparts.item_id IN ({placeholders}) ORDER BY dayparts.sort_order """, item_ids, ).fetchall() grouped = defaultdict(list) for row in rows: grouped[row["item_id"]].append( {"id": row["id"], "slug": row["slug"], "name": row["name"]} ) for item in items: item["dayparts_meta"] = grouped.get(item["id"], []) item["dayparts"] = [daypart["name"] for daypart in item["dayparts_meta"]] item["primary_daypart_id"] = item["dayparts_meta"][0]["id"] if item["dayparts_meta"] else None return items def attach_components(items: list[dict]) -> list[dict]: meal_ids = [item["id"] for item in items if item["kind"] == "meal"] if not meal_ids: for item in items: item["components"] = [] item["component_ids"] = [] return items placeholders = ",".join("?" for _ in meal_ids) rows = get_db().execute( f""" SELECT meal_components.meal_item_id, meal_components.food_item_id, items.name FROM meal_components JOIN items ON items.id = meal_components.food_item_id WHERE meal_components.meal_item_id IN ({placeholders}) AND items.household_id = ? AND (items.visibility = 'shared' OR items.owner_user_id = ?) ORDER BY LOWER(items.name) """, [*meal_ids, current_household_id(), g.user["id"]], ).fetchall() grouped_names: dict[int, list[str]] = defaultdict(list) grouped_ids: dict[int, list[int]] = defaultdict(list) for row in rows: grouped_names[row["meal_item_id"]].append(row["name"]) grouped_ids[row["meal_item_id"]].append(int(row["food_item_id"])) for item in items: item["components"] = grouped_names.get(item["id"], []) item["component_ids"] = grouped_ids.get(item["id"], []) return items def attach_builder_keys(items: list[dict]) -> list[dict]: if not items: return [] category_builder_map = get_category_builder_map() meal_ids = [item["id"] for item in items if item["kind"] == "meal"] meal_builder_map: dict[int, set[str]] = defaultdict(set) if meal_ids: placeholders = ",".join("?" for _ in meal_ids) rows = get_db().execute( f""" SELECT meal_components.meal_item_id, component.category FROM meal_components JOIN items AS component ON component.id = meal_components.food_item_id WHERE meal_components.meal_item_id IN ({placeholders}) """, meal_ids, ).fetchall() for row in rows: builder_key = category_builder_map.get(row["category"] or "", "neutral") meal_builder_map[int(row["meal_item_id"])].add(builder_key) for item in items: builder_keys: list[str] if item["kind"] == "meal": builder_keys = sorted(meal_builder_map.get(item["id"], set())) if not builder_keys: builder_keys = [category_builder_map.get(item.get("category") or "", "neutral")] else: builder_keys = [category_builder_map.get(item.get("category") or "", "neutral")] item["builder_keys"] = builder_keys item["builder_labels"] = [BUILDER_LABELS.get(key, BUILDER_LABELS["neutral"]) for key in builder_keys] item["primary_builder_key"] = builder_keys[0] if builder_keys else "neutral" return items def decorate_items(rows) -> list[dict]: return attach_builder_keys(attach_components(attach_dayparts(describe_records(rows)))) def fetch_builder_keys_for_item_ids(item_ids: list[int]) -> dict[int, set[str]]: if not item_ids: return {} category_builder_map = get_category_builder_map() placeholders = ",".join("?" for _ in item_ids) rows = get_db().execute( f""" SELECT id, kind, category FROM items WHERE id IN ({placeholders}) """, item_ids, ).fetchall() builder_map: dict[int, set[str]] = {int(row["id"]): {category_builder_map.get(row["category"] or "", "neutral")} for row in rows} meal_ids = [int(row["id"]) for row in rows if row["kind"] == "meal"] if meal_ids: meal_placeholders = ",".join("?" for _ in meal_ids) component_rows = get_db().execute( f""" SELECT meal_components.meal_item_id, component.category FROM meal_components JOIN items AS component ON component.id = meal_components.food_item_id WHERE meal_components.meal_item_id IN ({meal_placeholders}) """, meal_ids, ).fetchall() for row in component_rows: builder_map.setdefault(int(row["meal_item_id"]), set()).add( category_builder_map.get(row["category"] or "", "neutral") ) return builder_map def score_suggestion_components(component_items: list[dict], daypart_slug: str, settings: dict) -> int: builder_keys = {key for item in component_items for key in item.get("builder_keys", ["neutral"])} energy_values = [normalize_energy_density(item.get("energy_density"), "neutral") for item in component_items] score = 0 style = settings.get("suggestion_style", "balanced") if style == "fitness": score += 9 if "protein" in builder_keys else 0 score += 4 if daypart_slug in {"lunch", "dinner"} and "veg" in builder_keys else 0 score += 2 if "carb" in builder_keys else 0 elif style == "protein": score += 8 if "protein" in builder_keys else 0 score += 3 if daypart_slug in {"lunch", "dinner"} and "veg" in builder_keys else 0 else: if daypart_slug in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"}: score += 5 if "carb" in builder_keys else 0 score += 4 if builder_keys & {"dairy", "fruit", "nuts", "seeds"} else 0 else: score += 5 if "protein" in builder_keys else 0 score += 4 if "carb" in builder_keys else 0 score += 4 if "veg" in builder_keys else 0 energy_preference = settings.get("energy_preference", "neutral") if style == "fitness": score += energy_values.count("low") * 4 score -= energy_values.count("high") * 2 elif energy_preference == "high": score += energy_values.count("high") * 3 score -= energy_values.count("low") elif energy_preference == "low": score += energy_values.count("low") * 3 score -= energy_values.count("high") else: score += energy_values.count("neutral") return score def fetch_items( *, kind: str | None = None, availability: str | None = None, include_archived: bool = False, query: str | None = None, daypart_id: int | None = None, visibility: str | None = None, ): conditions = [visible_clause("items")] params = visible_params() if kind: conditions.append("items.kind = ?") params.append(kind) if availability: conditions.append("items.availability_state = ?") params.append(availability) elif not include_archived: conditions.append("items.availability_state != 'archived'") if query: conditions.append("LOWER(items.name) LIKE ?") params.append(f"%{query.lower()}%") if daypart_id: conditions.append( """ EXISTS ( SELECT 1 FROM item_dayparts WHERE item_dayparts.item_id = items.id AND item_dayparts.daypart_id = ? ) """ ) params.append(daypart_id) if visibility: conditions.append("items.visibility = ?") params.append(visibility) rows = get_db().execute( f""" SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username, EXISTS( SELECT 1 FROM shopping_entries WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0 ) AS is_on_shopping_list FROM items LEFT JOIN users AS owner ON owner.id = items.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE {' AND '.join(conditions)} ORDER BY CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END, CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END, LOWER(items.name) """, params, ).fetchall() return decorate_items(rows) def fetch_food_options(query: str | None = None): return fetch_items(kind="food", include_archived=True, query=query) def group_items_by_availability(items: list[dict]) -> list[dict]: grouped = defaultdict(list) for item in items: grouped[item["availability_state"]].append(item) result = [] for state in ("home", "idea", "archived"): entries = grouped.get(state, []) if entries: result.append({"state": state, "title": AVAILABILITY_LABELS[state], "items": entries}) return result def extract_item_form_data(existing: dict | None = None) -> dict: form_data = existing or {} form_data.update( { "name": request.form.get("name", "").strip(), "category": request.form.get("category", "").strip(), "energy_density": normalize_energy_density(request.form.get("energy_density"), form_data.get("energy_density", "neutral")), "note": request.form.get("note", "").strip(), "visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")), "target_user_id": normalize_target_user_id(request.form.get("target_user_id")), "target_user_raw": request.form.get("target_user_id", TARGET_USER_OPTIONS_DEFAULT), "food_search": request.form.get("food_search", "").strip(), "daypart_ids": [int(value) for value in request.form.getlist("daypart_ids") if value.isdigit()], "component_ids": [int(value) for value in request.form.getlist("component_ids") if value.isdigit()], "quick_food_name": request.form.get("quick_food_name", "").strip(), "quick_food_category": request.form.get("quick_food_category", "").strip(), "quick_food_energy_density": normalize_energy_density(request.form.get("quick_food_energy_density"), form_data.get("quick_food_energy_density", "neutral")), "quick_food_note": request.form.get("quick_food_note", "").strip(), } ) return form_data def create_quick_food_from_form(form_data: dict) -> int: cursor = get_db().execute( """ INSERT INTO items ( household_id, owner_user_id, target_user_id, visibility, kind, name, category, energy_density, note, created_by, updated_by ) VALUES (?, ?, ?, ?, 'food', ?, ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["target_user_id"], form_data["visibility"], form_data["quick_food_name"], form_data["quick_food_category"], form_data["quick_food_energy_density"], form_data["quick_food_note"], g.user["id"], g.user["id"], ), ) food_id = int(cursor.lastrowid) sync_item_dayparts(food_id, form_data["daypart_ids"]) get_db().commit() return food_id def shopping_activation_date_for(needed_date: date) -> date: settings = get_household_settings() shopping_weekday = int(settings["shopping_weekday"]) prep_days = int(settings["shopping_prep_days"]) days_back = (needed_date.weekday() - shopping_weekday) % 7 shopping_date = needed_date - timedelta(days=days_back) return shopping_date - timedelta(days=prep_days) def should_activate_shopping_need(needed_date: date, today: date | None = None) -> bool: return (today or date.today()) >= shopping_activation_date_for(needed_date) def schedule_shopping_need( *, item_id: int, user_id: int, visibility: str, needed_for_date: str, needed_for_daypart_id: int | None, source_item_id: int | None = None, ) -> bool: activation_date = shopping_activation_date_for(parse_plan_date(needed_for_date)) existing = get_db().execute( """ SELECT id FROM shopping_needs WHERE household_id = ? AND item_id = ? AND COALESCE(source_item_id, 0) = COALESCE(?, 0) AND needed_for_date = ? AND COALESCE(needed_for_daypart_id, 0) = COALESCE(?, 0) AND visibility = ? AND is_activated = 0 LIMIT 1 """, ( current_household_id(), item_id, source_item_id, needed_for_date, needed_for_daypart_id, visibility, ), ).fetchone() if existing: get_db().execute( """ UPDATE shopping_needs SET activation_date = ?, owner_user_id = ? WHERE id = ? """, (activation_date.isoformat(), user_id, existing["id"]), ) else: get_db().execute( """ INSERT INTO shopping_needs ( household_id, owner_user_id, visibility, item_id, source_item_id, needed_for_date, needed_for_daypart_id, activation_date, created_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), user_id, visibility, item_id, source_item_id, needed_for_date, needed_for_daypart_id, activation_date.isoformat(), g.user["id"], ), ) get_db().commit() return True def add_to_shopping_list( item_id: int, user_id: int, *, visibility_override: str | None = None, needed_for_date: str | None = None, needed_for_daypart_id: int | None = None, ) -> bool: item = get_item(item_id) existing = get_db().execute( """ SELECT id FROM shopping_entries WHERE item_id = ? AND is_checked = 0 """, (item_id,), ).fetchone() if existing: return False visibility = normalize_visibility(visibility_override, item["visibility"]) owner_user_id = user_id if visibility == "personal" else item["owner_user_id"] get_db().execute( """ INSERT INTO shopping_entries ( household_id, owner_user_id, visibility, item_id, added_by, needed_for_date, needed_for_daypart_id ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), owner_user_id, visibility, item_id, user_id, needed_for_date, needed_for_daypart_id, ), ) get_db().commit() return True def fetch_meal_missing_components(meal_id: int) -> list[dict]: rows = get_db().execute( """ SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username, 0 AS is_on_shopping_list FROM meal_components JOIN items ON items.id = meal_components.food_item_id LEFT JOIN users AS owner ON owner.id = items.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE meal_components.meal_item_id = ? AND items.household_id = ? AND (items.visibility = 'shared' OR items.owner_user_id = ?) AND items.availability_state != 'home' ORDER BY LOWER(items.name) """, (meal_id, current_household_id(), g.user["id"]), ).fetchall() return attach_builder_keys(attach_dayparts(describe_records(rows))) def ensure_item_or_missing_components_are_shopped( item_id: int, user_id: int, visibility: str, *, needed_for_date: str | None = None, needed_for_daypart_id: int | None = None, source_item_id: int | None = None, ) -> dict: item = get_item(item_id) if item["kind"] == "meal": missing_components = fetch_meal_missing_components(item_id) if missing_components: added_names = [] scheduled_names = [] for component in missing_components: if needed_for_date and not should_activate_shopping_need(parse_plan_date(needed_for_date)): schedule_shopping_need( item_id=component["id"], user_id=user_id, visibility=visibility, needed_for_date=needed_for_date, needed_for_daypart_id=needed_for_daypart_id, source_item_id=source_item_id or item_id, ) scheduled_names.append(component["name"]) else: added = add_to_shopping_list( component["id"], user_id, visibility_override=visibility, needed_for_date=needed_for_date, needed_for_daypart_id=needed_for_daypart_id, ) if added: added_names.append(component["name"]) return { "added": bool(added_names), "count": len(added_names), "names": added_names, "scheduled_count": len(scheduled_names), "scheduled_names": scheduled_names, "used_components": True, } return { "added": False, "count": 0, "names": [], "scheduled_count": 0, "scheduled_names": [], "used_components": True, } if item["availability_state"] == "home": return { "added": False, "count": 0, "names": [], "scheduled_count": 0, "scheduled_names": [], "used_components": False, } if needed_for_date and not should_activate_shopping_need(parse_plan_date(needed_for_date)): schedule_shopping_need( item_id=item_id, user_id=user_id, visibility=visibility, needed_for_date=needed_for_date, needed_for_daypart_id=needed_for_daypart_id, source_item_id=source_item_id, ) return { "added": False, "count": 0, "names": [], "scheduled_count": 1, "scheduled_names": [item["name"]], "used_components": False, } added = add_to_shopping_list( item_id, user_id, visibility_override=visibility, needed_for_date=needed_for_date, needed_for_daypart_id=needed_for_daypart_id, ) return { "added": added, "count": 1 if added else 0, "names": [item["name"]] if added else [], "scheduled_count": 0, "scheduled_names": [], "used_components": False, } def sync_item_dayparts(item_id: int, daypart_ids: list[int]) -> None: get_db().execute("DELETE FROM item_dayparts WHERE item_id = ?", (item_id,)) for daypart_id in daypart_ids: get_db().execute( "INSERT INTO item_dayparts (item_id, daypart_id) VALUES (?, ?)", (item_id, daypart_id), ) def sync_meal_components(meal_id: int, food_ids: list[int]) -> None: get_db().execute("DELETE FROM meal_components WHERE meal_item_id = ?", (meal_id,)) visible_foods = { row["id"] for row in get_db().execute( f""" SELECT items.id FROM items WHERE items.kind = 'food' AND {visible_clause('items')} """, visible_params(), ).fetchall() } for food_id in food_ids: if food_id in visible_foods: get_db().execute( "INSERT INTO meal_components (meal_item_id, food_item_id) VALUES (?, ?)", (meal_id, food_id), ) def fetch_shopping_entries(): rows = get_db().execute( f""" SELECT shopping_entries.*, items.name AS item_name, items.kind AS item_kind, items.photo_filename, items.availability_state, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username, dayparts.name AS needed_daypart_name FROM shopping_entries JOIN items ON items.id = shopping_entries.item_id LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id LEFT JOIN dayparts ON dayparts.id = shopping_entries.needed_for_daypart_id WHERE shopping_entries.is_checked = 0 AND {visible_clause('shopping_entries')} ORDER BY CASE shopping_entries.visibility WHEN 'shared' THEN 0 ELSE 1 END, shopping_entries.added_at DESC """, visible_params(), ).fetchall() entries = describe_records(rows) for entry in entries: if entry.get("needed_for_date"): try: parsed = datetime.strptime(entry["needed_for_date"], "%Y-%m-%d").date() entry["needed_for_label"] = parsed.strftime("%d.%m.%Y") except ValueError: entry["needed_for_label"] = entry["needed_for_date"] else: entry["needed_for_label"] = None return entries def activate_due_shopping_needs(today: date | None = None) -> int: today = today or date.today() rows = get_db().execute( """ SELECT shopping_needs.*, items.availability_state FROM shopping_needs JOIN items ON items.id = shopping_needs.item_id WHERE shopping_needs.household_id = ? AND shopping_needs.is_activated = 0 AND shopping_needs.activation_date <= ? ORDER BY shopping_needs.activation_date, shopping_needs.needed_for_date """, (current_household_id(), today.isoformat()), ).fetchall() activated = 0 for row in rows: if row["availability_state"] != "home": was_added = add_to_shopping_list( int(row["item_id"]), int(row["owner_user_id"] or g.user["id"]), visibility_override=row["visibility"], needed_for_date=row["needed_for_date"], needed_for_daypart_id=row["needed_for_daypart_id"], ) activated += 1 if was_added else 0 get_db().execute( "UPDATE shopping_needs SET is_activated = 1, activated_at = CURRENT_TIMESTAMP WHERE id = ?", (row["id"],), ) if rows: get_db().commit() return activated def fetch_upcoming_shopping_needs(limit: int | None = None) -> list[dict]: query = f""" SELECT shopping_needs.*, items.name AS item_name, items.kind AS item_kind, items.photo_filename, items.availability_state, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username, dayparts.name AS needed_daypart_name FROM shopping_needs JOIN items ON items.id = shopping_needs.item_id LEFT JOIN users AS owner ON owner.id = shopping_needs.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id LEFT JOIN dayparts ON dayparts.id = shopping_needs.needed_for_daypart_id WHERE shopping_needs.household_id = ? AND shopping_needs.is_activated = 0 AND (shopping_needs.visibility = 'shared' OR shopping_needs.owner_user_id = ?) AND items.availability_state != 'home' ORDER BY shopping_needs.activation_date, shopping_needs.needed_for_date, LOWER(items.name) """ params: list[object] = [current_household_id(), g.user["id"]] if limit: query += " LIMIT ?" params.append(limit) rows = get_db().execute(query, params).fetchall() entries = describe_records(rows) for entry in entries: try: entry["needed_for_label"] = datetime.strptime(entry["needed_for_date"], "%Y-%m-%d").date().strftime("%d.%m.%Y") entry["activation_label"] = datetime.strptime(entry["activation_date"], "%Y-%m-%d").date().strftime("%d.%m.%Y") except ValueError: entry["needed_for_label"] = entry["needed_for_date"] entry["activation_label"] = entry["activation_date"] return entries def fetch_plan_entries_for_range(start_date: date, end_date: date): rows = get_db().execute( f""" SELECT plan_entries.*, items.name AS item_name, items.kind AS item_kind, items.photo_filename, items.availability_state, dayparts.name AS daypart_name, dayparts.slug AS daypart_slug, dayparts.sort_order, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username FROM plan_entries JOIN items ON items.id = plan_entries.item_id JOIN dayparts ON dayparts.id = plan_entries.daypart_id LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE plan_date BETWEEN ? AND ? AND {visible_clause('plan_entries')} ORDER BY plan_date, dayparts.sort_order, items.name """, [start_date.isoformat(), end_date.isoformat(), *visible_params()], ).fetchall() grouped = defaultdict(list) for row in describe_records(rows): grouped[(row["plan_date"], row["daypart_id"])].append(row) return grouped def fetch_day_plan_entries(selected_date: date): return fetch_plan_entries_for_range(selected_date, selected_date) def fetch_recent_plan_items(daypart_id: int, limit: int = 6): rows = get_db().execute( f""" SELECT DISTINCT items.id, items.household_id, items.owner_user_id, items.target_user_id, items.visibility, items.name, items.kind, items.category, items.note, items.photo_filename, items.availability_state, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username FROM plan_entries JOIN items ON items.id = plan_entries.item_id LEFT JOIN users AS owner ON owner.id = items.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE plan_entries.daypart_id = ? AND {visible_clause('items')} ORDER BY plan_entries.created_at DESC LIMIT ? """, [daypart_id, *visible_params(), limit * 3], ).fetchall() return decorate_items(rows) def fetch_plan_candidates(daypart_id: int, query: str | None = None): params = [daypart_id, *visible_params()] conditions = [visible_clause("items"), "items.availability_state != 'archived'"] if query: conditions.append("LOWER(items.name) LIKE ?") params.append(f"%{query.lower()}%") rows = get_db().execute( f""" SELECT items.*, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username, EXISTS( SELECT 1 FROM item_dayparts WHERE item_dayparts.item_id = items.id AND item_dayparts.daypart_id = ? ) AS matches_daypart FROM items LEFT JOIN users AS owner ON owner.id = items.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE {' AND '.join(conditions)} ORDER BY CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END, matches_daypart DESC, CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END, LOWER(items.name) """, params, ).fetchall() return decorate_items(rows) def fetch_home_food_ids() -> set[int]: rows = get_db().execute( f""" SELECT id FROM items WHERE kind = 'food' AND availability_state = 'home' AND {visible_clause('items')} """, visible_params(), ).fetchall() return {int(row["id"]) for row in rows} def get_daypart_by_id(daypart_id: int): for daypart in get_dayparts(): if int(daypart["id"]) == int(daypart_id): return daypart return None def format_item_names(items: list[dict], limit: int = 3) -> str: return ", ".join(item["name"] for item in items[:limit]) def item_matches_daypart(item: dict, daypart_id: int | None) -> bool: if daypart_id is None: return True dayparts_meta = item.get("dayparts_meta") or [] if not dayparts_meta: return True return any(int(daypart["id"]) == int(daypart_id) for daypart in dayparts_meta) def normalized_component_signature(component_ids: list[int]) -> tuple[int, ...]: return tuple(sorted({int(component_id) for component_id in component_ids})) def generated_suggestion_key(component_ids: list[int]) -> str: signature = normalized_component_signature(component_ids) return "generated:" + "-".join(str(component_id) for component_id in signature) def fetch_hidden_generated_suggestion_keys() -> set[str]: rows = get_db().execute( """ SELECT suggestion_key FROM hidden_generated_suggestions WHERE user_id = ? """, (g.user["id"],), ).fetchall() return {row["suggestion_key"] for row in rows} def build_generated_meal_name(combo: list[dict], daypart_slug: str) -> str: names = [item["name"] for item in combo] if daypart_slug in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"} and len(names) >= 2: return f"{names[0]} mit {', '.join(names[1:])}" if len(names) >= 2: return f"{names[0]} mit {', '.join(names[1:])}" return names[0] def build_dynamic_meal_suggestions(home_foods: list[dict], daypart_slug: str, limit: int) -> list[dict]: if daypart_slug in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"}: target_patterns = [ { "slots": ({"carb"}, {"dairy", "protein"}, {"fruit", "nuts", "seeds"}), "reason": "Passt gut zu Frühstück oder Snack", }, { "slots": ({"carb"}, {"dairy", "protein"}), "reason": "Zuhause schnell kombinierbar", }, { "slots": ({"dairy", "protein"}, {"fruit", "nuts", "seeds"}), "reason": "Lässt sich gut als kleiner Snack vormerken", }, ] else: target_patterns = [ { "slots": ({"protein"}, {"carb"}, {"veg"}), "reason": "Zuhause als vollständige Mahlzeit möglich", }, { "slots": ({"protein"}, {"carb"}), "reason": "Lässt sich leicht ergänzen", }, { "slots": ({"protein"}, {"veg"}), "reason": "Zuhause schon gut kombinierbar", }, { "slots": ({"carb"}, {"veg"}), "reason": "Daraus kann schnell etwas Einfaches werden", }, ] suggestions: list[dict] = [] seen_signatures: set[tuple[int, ...]] = set() def slot_matches(food: dict, slot_keys: set[str]) -> bool: return bool(slot_keys & set(food.get("builder_keys", ["neutral"]))) for pattern in target_patterns: slot_candidates = [] for slot_keys in pattern["slots"]: matches = [food for food in home_foods if slot_matches(food, slot_keys)] if not matches: slot_candidates = [] break slot_candidates.append(matches) if not slot_candidates: continue for combo in product(*slot_candidates): signature = normalized_component_signature([item["id"] for item in combo]) if len(signature) != len(combo) or signature in seen_signatures: continue seen_signatures.add(signature) combo_items = list(combo) suggestions.append( { "title": build_generated_meal_name(combo_items, daypart_slug), "reason": pattern["reason"], "component_ids": [item["id"] for item in combo_items], "existing_item_id": None, "visibility": "shared", "daypart_id": None, "missing_component_ids": [], "missing_components": [], "needs_shopping": False, "is_generated": True, "suggestion_key": generated_suggestion_key([item["id"] for item in combo_items]), } ) if len(suggestions) >= limit * 3: break if len(suggestions) >= limit * 3: break return suggestions def build_home_recipe_suggestions(daypart_id: int | None = None, limit: int = 4) -> list[dict]: settings = get_user_settings() daypart_slug = (get_daypart_by_id(daypart_id)["slug"] if daypart_id and get_daypart_by_id(daypart_id) else "") hidden_keys = fetch_hidden_generated_suggestion_keys() home_foods = [ item for item in fetch_items(kind="food", availability="home") if item_matches_daypart(item, daypart_id) ] home_food_ids = {item["id"] for item in home_foods} home_food_map = {int(item["id"]): item for item in home_foods} visible_foods = [ item for item in fetch_items(kind="food", include_archived=False) if item_matches_daypart(item, daypart_id) ] visible_food_map = {int(item["id"]): item for item in visible_foods} suggestions: list[dict] = [] seen_signatures: set[tuple[int, ...]] = set() meals = [item for item in fetch_items(kind="meal") if item_matches_daypart(item, daypart_id)] for meal in meals: if not meal["component_ids"]: continue component_ids = [int(component_id) for component_id in meal["component_ids"]] if not all(component_id in visible_food_map for component_id in component_ids): continue signature = normalized_component_signature(component_ids) if signature in seen_signatures: continue component_items = [visible_food_map[component_id] for component_id in component_ids] available_items = [home_food_map[component_id] for component_id in component_ids if component_id in home_food_map] missing_items = [visible_food_map[component_id] for component_id in component_ids if component_id not in home_food_ids] if not available_items: continue if missing_items and len(missing_items) > 2: continue seen_signatures.add(signature) if missing_items: missing_names = [item["name"] for item in missing_items] suggestions.append( { "title": meal["name"], "reason": f"Es fehlt noch: {', '.join(missing_names)}", "component_ids": component_ids, "existing_item_id": meal["id"], "visibility": meal["visibility"], "daypart_id": daypart_id or meal.get("primary_daypart_id"), "missing_component_ids": [item["id"] for item in missing_items], "missing_components": missing_names, "needs_shopping": True, "is_generated": False, "suggestion_key": None, "score": score_suggestion_components(available_items, daypart_slug=daypart_slug, settings=settings) + 18 - (len(missing_items) * 4), } ) else: suggestions.append( { "title": meal["name"], "reason": "Zuhause vorhanden", "component_ids": component_ids, "existing_item_id": meal["id"], "visibility": meal["visibility"], "daypart_id": daypart_id or meal.get("primary_daypart_id"), "missing_component_ids": [], "missing_components": [], "needs_shopping": False, "is_generated": False, "suggestion_key": None, "score": score_suggestion_components(component_items, daypart_slug=daypart_slug, settings=settings) + 40, } ) for suggestion in build_dynamic_meal_suggestions(home_foods, daypart_slug, limit=limit * 2): signature = normalized_component_signature(suggestion["component_ids"]) if signature in seen_signatures or suggestion["suggestion_key"] in hidden_keys: continue seen_signatures.add(signature) component_items = [home_food_map[component_id] for component_id in suggestion["component_ids"] if component_id in home_food_map] suggestion["score"] = score_suggestion_components(component_items, daypart_slug=daypart_slug, settings=settings) suggestions.append(suggestion) deduped: list[dict] = [] seen = set() ranked_suggestions = sorted( suggestions, key=lambda suggestion: ( -int(suggestion.get("score", 0)), 0 if suggestion.get("existing_item_id") else 1, suggestion["title"].lower(), ), ) for suggestion in ranked_suggestions: if suggestion["title"] in seen: continue seen.add(suggestion["title"]) deduped.append(suggestion) if len(deduped) >= limit: break return deduped def build_balance_suggestion(daypart_id: int, item_ids: list[int]) -> dict | None: settings = get_user_settings() if not (settings.get("reminders_enabled") and settings.get("show_meal_balancing")): return None daypart = get_daypart_by_id(daypart_id) if not daypart or daypart["slug"] not in {"lunch", "dinner"}: return None builder_map = fetch_builder_keys_for_item_ids(item_ids) present = set() for keys in builder_map.values(): present.update(keys) target_order = ["protein", "carb", "veg"] missing = [key for key in target_order if key not in present] if not missing: return None first_missing = missing[0] home_matches = [ item for item in fetch_items(kind="food", availability="home", daypart_id=daypart_id) if first_missing in item.get("builder_keys", []) ] home_matches = sorted( home_matches, key=lambda item: -score_suggestion_components([item], daypart["slug"], settings), ) text_map = { "protein": "Dazu könnte noch eine Proteinquelle gut passen.", "carb": "Das lässt sich gut mit einer Kohlenhydratquelle ergänzen.", "veg": "Dazu könnte noch etwas Gemüse gut passen.", } return { "text": text_map.get(first_missing, "Dazu könnte noch etwas Kleines gut passen."), "items": home_matches[:3], } def build_daypart_suggestions(daypart_id: int) -> list[dict]: settings = get_user_settings() if not settings.get("suggest_home_for_today"): return [] suggestions = build_home_recipe_suggestions(daypart_id, limit=3) if suggestions: return suggestions archived_items = fetch_items(availability="archived", include_archived=True, daypart_id=daypart_id) return [ { "title": item["name"], "reason": "Für später vormerken", "component_ids": [], "existing_item_id": item["id"] if item["kind"] == "meal" else None, "visibility": item["visibility"], "daypart_id": daypart_id, "missing_component_ids": [], "missing_components": [], "needs_shopping": False, "is_generated": False, "suggestion_key": None, } for item in archived_items[:2] ] def build_dashboard_hints(today: date) -> list[str]: settings = get_user_settings() if not settings.get("reminders_enabled"): return [] hints: list[str] = [] tomorrow = today + timedelta(days=1) household_settings = get_household_settings() if settings.get("remind_tomorrow_if_sparse"): breakfast = get_db().execute( f""" SELECT COUNT(*) AS count FROM plan_entries JOIN dayparts ON dayparts.id = plan_entries.daypart_id WHERE plan_entries.plan_date = ? AND dayparts.slug = 'breakfast' AND {visible_clause('plan_entries')} """, [tomorrow.isoformat(), *visible_params()], ).fetchone() if int(breakfast["count"]) == 0: hints.append("Für morgen ist noch kein Frühstück eingeplant.") if settings.get("suggest_home_for_today"): dinner_home = get_db().execute( f""" SELECT COUNT(*) AS count FROM items JOIN item_dayparts ON item_dayparts.item_id = items.id JOIN dayparts ON dayparts.id = item_dayparts.daypart_id WHERE items.availability_state = 'home' AND dayparts.slug = 'dinner' AND {visible_clause('items')} """, visible_params(), ).fetchone() if int(dinner_home["count"]) > 0: hints.append("Zuhause ist bereits etwas da, das gut zu Abendessen passt.") if settings.get("remind_before_shopping") and (today + timedelta(days=1)).weekday() == household_settings["shopping_weekday"]: upcoming = fetch_upcoming_shopping_needs(limit=3) if upcoming: hints.append("Für den nächsten Einkauf sind schon ein paar Dinge vorgemerkt.") if settings.get("remind_nuts"): nut_items = [item for item in fetch_items(kind="food", availability="home") if {"nuts", "seeds"} & set(item.get("builder_keys", []))] if nut_items: hints.append("Heute schon an Nüsse gedacht?") if settings.get("suggest_templates"): old_template = get_db().execute( f""" SELECT name FROM day_templates WHERE {visible_clause('day_templates')} AND last_used_at IS NOT NULL AND DATE(last_used_at) <= DATE('now', '-21 day') ORDER BY last_used_at ASC LIMIT 1 """, visible_params(), ).fetchone() if old_template: hints.append(f"„{old_template['name']}“ könnte diese Woche wieder passen.") return hints[:4] def build_setup_checklist(today: date) -> list[dict]: total_items = int( get_db().execute( f"SELECT COUNT(*) AS count FROM items WHERE {visible_clause('items')}", visible_params(), ).fetchone()["count"] ) meal_count = int( get_db().execute( f"SELECT COUNT(*) AS count FROM items WHERE kind = 'meal' AND {visible_clause('items')}", visible_params(), ).fetchone()["count"] ) week_end = today + timedelta(days=6) plan_count = int( get_db().execute( f""" SELECT COUNT(*) AS count FROM plan_entries WHERE plan_date BETWEEN ? AND ? AND {visible_clause('plan_entries')} """, [today.isoformat(), week_end.isoformat(), *visible_params()], ).fetchone()["count"] ) template_count = int( get_db().execute( f"SELECT COUNT(*) AS count FROM day_templates WHERE {visible_clause('day_templates')}", visible_params(), ).fetchone()["count"] ) checklist = [] if total_items == 0: checklist.append( { "title": "Fang mit einem ersten Lebensmittel an", "text": "Ein kleines Frühstück, ein Snack oder etwas für zuhause reicht völlig für den Start.", "url": url_for("main.item_create", kind="food"), "label": "Lebensmittel anlegen", } ) if total_items > 0 and meal_count == 0: checklist.append( { "title": "Lege eine erste Mahlzeitenidee an", "text": "Einfach zwei oder drei vertraute Dinge zusammenklicken und für später merken.", "url": url_for("main.item_create", kind="meal"), "label": "Mahlzeit anlegen", } ) if plan_count == 0: checklist.append( { "title": "Plane einen ruhigen ersten Tag", "text": "Mit einem kleinen Eintrag für Frühstück oder Abendessen fühlt sich die Woche sofort greifbarer an.", "url": url_for("main.planner_day", date=today.isoformat()), "label": "Tag öffnen", } ) if total_items > 0 and template_count == 0: checklist.append( { "title": "Merke dir einen gelungenen Tag als Vorlage", "text": "So wird Wiederverwendung später noch leichter.", "url": url_for("main.template_library"), "label": "Vorlagen ansehen", } ) return checklist[:3] def build_day_hints(selected_date: date) -> list[str]: settings = get_user_settings() if not settings.get("reminders_enabled"): return [] hints: list[str] = [] if settings.get("remind_tomorrow_if_sparse"): breakfast_count = get_db().execute( f""" SELECT COUNT(*) AS count FROM plan_entries JOIN dayparts ON dayparts.id = plan_entries.daypart_id WHERE plan_entries.plan_date = ? AND dayparts.slug = 'breakfast' AND {visible_clause('plan_entries')} """, [selected_date.isoformat(), *visible_params()], ).fetchone() if int(breakfast_count["count"]) == 0: hints.append("Für diesen Tag ist noch kein Frühstück eingeplant.") if settings.get("remind_small_snack"): afternoon_options = get_db().execute( f""" SELECT COUNT(*) AS count FROM items JOIN item_dayparts ON item_dayparts.item_id = items.id JOIN dayparts ON dayparts.id = item_dayparts.daypart_id WHERE items.availability_state != 'archived' AND dayparts.slug = 'afternoon-snack' AND {visible_clause('items')} """, visible_params(), ).fetchone() if int(afternoon_options["count"]) < 2: hints.append("Für den Nachmittag wäre noch etwas Kleines möglich.") if settings.get("show_planned_not_shopped"): pending_for_day = fetch_upcoming_shopping_needs(limit=20) pending_for_day = [entry for entry in pending_for_day if entry["needed_for_date"] == selected_date.isoformat()] if pending_for_day: hints.append("Ein paar Dinge sind für diesen Tag schon vorgemerkt, aber noch nicht auf der Einkaufsliste.") return hints[:4] def build_week_hints(week_start: date) -> list[str]: settings = get_user_settings() if not settings.get("reminders_enabled"): return [] hints: list[str] = [] week_end = week_start + timedelta(days=6) if settings.get("remind_week_if_sparse"): breakfast_days = get_db().execute( f""" SELECT COUNT(DISTINCT plan_entries.plan_date) AS count FROM plan_entries JOIN dayparts ON dayparts.id = plan_entries.daypart_id WHERE plan_entries.plan_date BETWEEN ? AND ? AND dayparts.slug = 'breakfast' AND {visible_clause('plan_entries')} """, [week_start.isoformat(), week_end.isoformat(), *visible_params()], ).fetchone() missing_breakfasts = 7 - int(breakfast_days["count"]) if missing_breakfasts > 0: hints.append(f"Für diese Woche sind noch {missing_breakfasts} Tage ohne Frühstück eingeplant.") if settings.get("show_missing_for_upcoming_week"): due_entries = [entry for entry in fetch_upcoming_shopping_needs(limit=20) if week_start.isoformat() <= entry["needed_for_date"] <= week_end.isoformat()] if due_entries: hints.append("Für diese Woche fehlt noch etwas, das später zum Einkauf dazukommen kann.") if settings.get("remind_on_shopping_day") and week_start.weekday() <= get_household_settings()["shopping_weekday"] <= week_end.weekday(): hints.append("Der Einkaufstag liegt in dieser Woche schon bereit im Blick.") return hints[:4] def build_home_sections(items: list[dict], dayparts: list, selected_daypart_id: int | None): sections = [] if selected_daypart_id: selected_daypart = next((daypart for daypart in dayparts if daypart["id"] == selected_daypart_id), None) matching_items = [item for item in items if any(dp["id"] == selected_daypart_id for dp in item["dayparts_meta"])] sections.append( { "title": selected_daypart["name"] if selected_daypart else "Ausgewählte Tageszeit", "items": matching_items, } ) return sections for daypart in dayparts: matching_items = [item for item in items if any(dp["id"] == daypart["id"] for dp in item["dayparts_meta"])] sections.append({"title": daypart["name"], "items": matching_items}) anytime_items = [item for item in items if not item["dayparts_meta"]] if anytime_items: sections.append({"title": "Ohne feste Tageszeit", "items": anytime_items}) return sections def dedupe_items(items: list[dict], limit: int = 6) -> list[dict]: result = [] seen_ids = set() for item in items: if item["id"] in seen_ids: continue seen_ids.add(item["id"]) result.append(item) if len(result) >= limit: break return result def build_selected_quick_action( *, daypart_id: int, selected_item_id: int | None, selected_meal_name: str, selected_component_ids: list[int], candidates: list[dict], ) -> dict | None: if selected_item_id: selected_item = next((item for item in candidates if int(item["id"]) == int(selected_item_id)), None) if selected_item is None: try: selected_item = get_item(selected_item_id) except ValueError: selected_item = None if selected_item is not None: return { "type": "existing", "title": selected_item["name"], "subtitle": "Ausgewählt. Du kannst es jetzt direkt eintragen.", "item_id": int(selected_item["id"]), "visibility": selected_item["visibility"], "daypart_id": daypart_id, } if selected_meal_name and selected_component_ids: return { "type": "generated", "title": selected_meal_name, "subtitle": "Ausgewählt aus dem, was zuhause gut passt.", "component_ids": selected_component_ids, "visibility": "shared", "daypart_id": daypart_id, } return None def build_day_planner_sections( selected_date: date, selected_item_id: int | None, selected_daypart_id: int | None, selected_meal_name: str = "", selected_component_ids: list[int] | None = None, ): selected_component_ids = selected_component_ids or [] sections = [] day_entries = fetch_day_plan_entries(selected_date) for daypart in get_dayparts(): candidates = fetch_plan_candidates(daypart["id"]) entries = day_entries.get((selected_date.isoformat(), daypart["id"]), []) meal_candidates = dedupe_items( [item for item in candidates if item["kind"] == "meal" and item["availability_state"] == "home"] + [item for item in candidates if item["kind"] == "meal"], limit=6, ) food_candidates = dedupe_items( [item for item in candidates if item["kind"] == "food" and item["availability_state"] == "home"] + fetch_recent_plan_items(daypart["id"]) + [item for item in candidates if item["kind"] == "food"], limit=20, ) search_candidates = dedupe_items(meal_candidates + food_candidates, limit=24) entry_item_ids = [int(entry["item_id"]) for entry in entries] sections.append( { "daypart": daypart, "entries": entries, "candidates": candidates, "meal_candidates": meal_candidates, "food_candidates": food_candidates, "search_candidates": search_candidates, "recipe_suggestions": build_home_recipe_suggestions(int(daypart["id"]), limit=3), "suggestions": build_daypart_suggestions(daypart["id"]), "balance_suggestion": build_balance_suggestion(int(daypart["id"]), entry_item_ids), "selected_item_id": selected_item_id if selected_daypart_id == daypart["id"] else None, "selected_quick_action": build_selected_quick_action( daypart_id=int(daypart["id"]), selected_item_id=selected_item_id if selected_daypart_id == daypart["id"] else None, selected_meal_name=selected_meal_name if selected_daypart_id == daypart["id"] else "", selected_component_ids=selected_component_ids if selected_daypart_id == daypart["id"] else [], candidates=candidates, ), "is_open": selected_daypart_id == daypart["id"], "is_primary_daypart": daypart["slug"] in PRIMARY_DAYPART_SLUGS, "is_snack_daypart": daypart["slug"] in SNACK_DAYPART_SLUGS, "visible_by_default": daypart["slug"] in PRIMARY_DAYPART_SLUGS or bool(entries) or selected_daypart_id == daypart["id"], "summary_items": [entry["item_name"] for entry in entries][:2], "default_visibility": "shared", } ) return sections def build_template_day_sections(selected_map: dict[int, list[int]] | None = None): selected_map = selected_map or {} sections = [] for daypart in get_dayparts(): candidates = fetch_plan_candidates(int(daypart["id"])) quick_items = dedupe_items( [item for item in candidates if item["availability_state"] == "home"] + candidates, limit=10, ) quick_ids = {item["id"] for item in quick_items} sections.append( { "daypart": daypart, "candidates": candidates, "quick_items": quick_items, "list_items": [item for item in candidates if item["id"] not in quick_ids], "selected_ids": selected_map.get(int(daypart["id"]), []), } ) return sections def fetch_week_cards(week_start: date): days = [week_start + timedelta(days=index) for index in range(7)] week_end = week_start + timedelta(days=6) grouped_entries = fetch_plan_entries_for_range(week_start, week_start + timedelta(days=6)) picker_map = {} for daypart in get_dayparts(): candidates = fetch_plan_candidates(int(daypart["id"])) meal_candidates = dedupe_items( [item for item in candidates if item["kind"] == "meal" and item["availability_state"] == "home"] + [item for item in candidates if item["kind"] == "meal"], limit=4, ) picker_map[int(daypart["id"])] = { "meal_candidates": meal_candidates, "recipe_suggestions": build_home_recipe_suggestions(int(daypart["id"]), limit=3), } cards = [] for current_day in days: filled_dayparts = [] planned_count = 0 preview_items = [] slots = [] for daypart in get_dayparts(): slot_entries = grouped_entries.get((current_day.isoformat(), daypart["id"]), []) is_snack_daypart = daypart["slug"] in SNACK_DAYPART_SLUGS visible_by_default = (not is_snack_daypart) or bool(slot_entries) slots.append( { "daypart": dict(daypart), "entries": slot_entries, "copy_allowed": bool(slot_entries) and current_day < week_end, "picker": picker_map.get(int(daypart["id"]), {"meal_candidates": [], "recipe_suggestions": []}), "is_snack_daypart": is_snack_daypart, "visible_by_default": visible_by_default, } ) if slot_entries: filled_dayparts.append({"id": daypart["id"], "name": daypart["name"], "count": len(slot_entries)}) planned_count += len(slot_entries) preview_items.extend(entry["item_name"] for entry in slot_entries[:2]) hidden_snack_slots = [ {"id": int(slot["daypart"]["id"]), "name": slot["daypart"]["name"]} for slot in slots if slot["is_snack_daypart"] and not slot["visible_by_default"] ] cards.append( { "date": current_day, "filled_dayparts": filled_dayparts, "planned_count": planned_count, "preview_items": preview_items[:4], "slots": slots, "hidden_snack_slots": hidden_snack_slots, } ) return cards def format_week_pdf_entry(entry: dict) -> str: return entry["item_name"] def normalize_pdf_export_mode(raw: str | None) -> str: return "household" if raw == "household" else "mine" def fetch_plan_entries_for_range_export(start_date: date, end_date: date, *, mode: str): params: list[object] = [start_date.isoformat(), end_date.isoformat()] if mode == "household": where_clause = "plan_entries.household_id = ?" params.append(current_household_id()) else: where_clause = visible_clause("plan_entries") params.extend(visible_params()) rows = get_db().execute( f""" SELECT plan_entries.*, items.name AS item_name, items.kind AS item_kind, items.photo_filename, items.availability_state, dayparts.name AS daypart_name, dayparts.slug AS daypart_slug, dayparts.sort_order, owner.display_name AS owner_display_name, owner.username AS owner_username, target.display_name AS target_display_name, target.username AS target_username FROM plan_entries JOIN items ON items.id = plan_entries.item_id JOIN dayparts ON dayparts.id = plan_entries.daypart_id LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id LEFT JOIN users AS target ON target.id = items.target_user_id WHERE plan_date BETWEEN ? AND ? AND {where_clause} ORDER BY plan_date, dayparts.sort_order, items.name """, params, ).fetchall() grouped = defaultdict(list) for row in describe_records(rows): grouped[(row["plan_date"], row["daypart_id"])].append(row) return grouped def format_week_pdf_entry(entry: dict, *, mode: str) -> str: label = format_pdf_cell_label(entry["item_name"]) if mode == "household": if entry.get("target_name"): return f"{label} (Für {entry['target_name']})" if entry.get("is_personal"): return f"{label} (Für {entry['owner_name']})" return f"{label} (Für alle)" if entry.get("is_shared"): return f"{label} (gemeinsam)" return label def format_pdf_cell_label(label: str) -> str: cleaned = " ".join((label or "").split()) if not cleaned: return "" if " - " in cleaned: return cleaned.replace(" - ", "\n") if ", " in cleaned and len(cleaned) > 20: return cleaned.replace(", ", ",\n") if "-" in cleaned: return "-\n".join(part for part in cleaned.split("-") if part) return cleaned def build_week_pdf_rows(week_start: date, *, mode: str) -> tuple[list, list[list[str]]]: days = [week_start + timedelta(days=index) for index in range(7)] grouped_entries = fetch_plan_entries_for_range_export(week_start, week_start + timedelta(days=6), mode=mode) rows: list[list[str]] = [] dayparts = get_dayparts() visible_dayparts = [] for daypart in dayparts: row_cells: list[str] = [] has_content = False row_label = PDF_DAYPART_LABELS.get(daypart["slug"], daypart["name"]) row = [daypart["name"]] for current_day in days: entries = grouped_entries.get((current_day.isoformat(), daypart["id"]), []) cell_value = "\n".join(format_week_pdf_entry(entry, mode=mode) for entry in entries) row_cells.append(cell_value) has_content = has_content or bool(cell_value.strip()) if daypart["slug"] in PRIMARY_DAYPART_SLUGS or has_content: visible_dayparts.append([row_label, *row_cells]) return days, visible_dayparts def build_week_plan_pdf(week_start: date, *, mode: str = "mine") -> bytes: try: from fpdf import FPDF from fpdf.fonts import FontFace except ImportError as exc: # pragma: no cover - depends on optional package in local env raise RuntimeError("Für den PDF-Export fehlt noch die Abhängigkeit aus der requirements.txt.") from exc mode = normalize_pdf_export_mode(mode) week_end = week_start + timedelta(days=6) week_number = week_start.isocalendar().week days, rows = build_week_pdf_rows(week_start, mode=mode) plan_label = "Mein Essensplan" if mode == "mine" else "Unser Essensplan" pdf = FPDF(orientation="L", unit="mm", format="A4") pdf.set_auto_page_break(auto=True, margin=14) pdf.set_margins(left=14, top=14, right=14) pdf.add_page() pdf.set_title(f"{plan_label} KW {week_number:02d}") pdf.set_author("Nouri") pdf.set_creator("Nouri") pdf.set_font("Helvetica", "B", 18) pdf.cell(0, 9, f"{plan_label} vom {week_start.strftime('%d.%m.%Y')} bis {week_end.strftime('%d.%m.%Y')}", new_x="LMARGIN", new_y="NEXT") pdf.set_font("Helvetica", "", 11) pdf.set_text_color(82, 82, 82) pdf.cell(0, 6, f"KW {week_number:02d}", new_x="LMARGIN", new_y="NEXT") pdf.ln(3) pdf.set_text_color(20, 20, 20) pdf.set_font("Helvetica", "", 10) headings = [" "] + [f"{format_weekday(day)}\n{day.strftime('%d.%m.%Y')}" for day in days] first_column_width = 34 remaining_width = pdf.w - pdf.l_margin - pdf.r_margin - first_column_width day_column_width = remaining_width / 7 column_widths = (first_column_width, *([day_column_width] * 7)) header_style = FontFace(emphasis="B", fill_color=(240, 240, 240)) first_column_style = FontFace(emphasis="B", fill_color=(248, 248, 248)) body_style = FontFace(fill_color=(255, 255, 255)) with pdf.table( borders_layout="SINGLE_TOP_LINE", cell_fill_color=(255, 255, 255), cell_fill_mode="ROWS", col_widths=column_widths, gutter_height=0, gutter_width=0, headings_style=header_style, line_height=5.5, text_align=("LEFT", "LEFT", "LEFT", "LEFT", "LEFT", "LEFT", "LEFT", "LEFT"), width=pdf.w - pdf.l_margin - pdf.r_margin, ) as table: header_row = table.row() for heading in headings: header_row.cell(heading, padding=(2.8, 2.5, 2.8, 2.5), v_align="M") for row in rows: table_row = table.row() table_row.cell(row[0], style=first_column_style, padding=(2.5, 2.8, 2.5, 2.8), v_align="M") for value in row[1:]: table_row.cell(value or " ", style=body_style, padding=(2.5, 2.8, 2.5, 2.8), v_align="TOP") return bytes(pdf.output()) def count_visible_items(availability_state: str) -> int: row = get_db().execute( f"SELECT COUNT(*) AS count FROM items WHERE availability_state = ? AND {visible_clause('items')}", [availability_state, *visible_params()], ).fetchone() return int(row["count"]) def fetch_day_templates(query: str | None = None, visibility: str | None = None) -> list[dict]: conditions = [visible_clause("day_templates")] params = visible_params() if query: conditions.append("LOWER(day_templates.name) LIKE ?") params.append(f"%{query.lower()}%") if visibility: conditions.append("day_templates.visibility = ?") params.append(visibility) rows = get_db().execute( f""" SELECT day_templates.*, owner.display_name AS owner_display_name, owner.username AS owner_username, ( SELECT COUNT(*) FROM day_template_entries WHERE day_template_entries.day_template_id = day_templates.id ) AS entry_count FROM day_templates LEFT JOIN users AS owner ON owner.id = day_templates.owner_user_id WHERE {' AND '.join(conditions)} ORDER BY LOWER(day_templates.name) """, params, ).fetchall() return [describe_template_record(dict(row)) for row in rows] def get_day_template(template_id: int) -> dict: row = get_db().execute( f""" SELECT day_templates.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM day_templates LEFT JOIN users AS owner ON owner.id = day_templates.owner_user_id WHERE day_templates.id = ? AND {visible_clause('day_templates')} """, [template_id, *visible_params()], ).fetchone() if row is None: raise ValueError("Die Tagesvorlage wurde nicht gefunden.") return describe_template_record(dict(row)) def get_day_template_selected_map(template_id: int) -> dict[int, list[int]]: rows = get_db().execute( """ SELECT daypart_id, item_id FROM day_template_entries WHERE day_template_id = ? ORDER BY sort_order, id """, (template_id,), ).fetchall() grouped: dict[int, list[int]] = defaultdict(list) for row in rows: grouped[int(row["daypart_id"])].append(int(row["item_id"])) return grouped def sync_day_template_entries(template_id: int, selected_map: dict[int, list[int]]) -> None: get_db().execute("DELETE FROM day_template_entries WHERE day_template_id = ?", (template_id,)) for daypart in get_dayparts(): for sort_order, item_id in enumerate(selected_map.get(int(daypart["id"]), []), start=10): get_db().execute( """ INSERT INTO day_template_entries (day_template_id, daypart_id, item_id, sort_order) VALUES (?, ?, ?, ?) """, (template_id, daypart["id"], item_id, sort_order), ) def day_template_form_data(template: dict | None = None, source_date: date | None = None) -> dict: selected_map: dict[int, list[int]] = defaultdict(list) if template: selected_map.update(get_day_template_selected_map(template["id"])) elif source_date: for (plan_date_key, daypart_id), entries in fetch_day_plan_entries(source_date).items(): if plan_date_key == source_date.isoformat(): selected_map[int(daypart_id)] = [int(entry["item_id"]) for entry in entries] form_data: dict[str, object] = { "name": template["name"] if template else "", "description": template["description"] if template else "", "visibility": template["visibility"] if template else "shared", "selected_map": {key: value[:] for key, value in selected_map.items()}, } return form_data def extract_day_template_form_data(existing: dict | None = None) -> dict: form_data = existing or {} selected_map: dict[int, list[int]] = {} for daypart in get_dayparts(): selected_map[int(daypart["id"])] = [ int(value) for value in request.form.getlist(f"daypart_{daypart['id']}_item_ids") if value.isdigit() ] form_data.update( { "name": request.form.get("name", "").strip(), "description": request.form.get("description", "").strip(), "visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")), "selected_map": selected_map, } ) return form_data def apply_day_template(template_id: int, selected_date: date) -> int: template = get_day_template(template_id) entries_map = get_day_template_selected_map(template_id) inserted = 0 for daypart_id, item_ids in entries_map.items(): for item_id in item_ids: existing = get_db().execute( """ SELECT id FROM plan_entries WHERE plan_date = ? AND daypart_id = ? AND item_id = ? AND visibility = ? """, (selected_date.isoformat(), daypart_id, item_id, template["visibility"]), ).fetchone() if existing: continue get_db().execute( """ INSERT INTO plan_entries (household_id, owner_user_id, visibility, plan_date, daypart_id, item_id, created_by) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], template["visibility"], selected_date.isoformat(), daypart_id, item_id, g.user["id"], ), ) insert_result = ensure_item_or_missing_components_are_shopped( item_id, g.user["id"], template["visibility"], needed_for_date=selected_date.isoformat(), needed_for_daypart_id=daypart_id, source_item_id=item_id, ) inserted += 1 get_db().commit() get_db().execute( "UPDATE day_templates SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?", (template_id,), ) get_db().commit() return inserted def create_day_template_snapshot(name: str, description: str, visibility: str, source_date: date) -> int | None: selected_map = day_template_form_data(source_date=source_date)["selected_map"] if not any(selected_map.values()): return None cursor = get_db().execute( """ INSERT INTO day_templates (household_id, owner_user_id, visibility, name, description) VALUES (?, ?, ?, ?, ?) """, (current_household_id(), g.user["id"], visibility, name, description), ) template_id = int(cursor.lastrowid) sync_day_template_entries(template_id, selected_map) return template_id def fetch_week_templates(query: str | None = None, visibility: str | None = None) -> list[dict]: conditions = [visible_clause("week_templates")] params = visible_params() if query: conditions.append("LOWER(week_templates.name) LIKE ?") params.append(f"%{query.lower()}%") if visibility: conditions.append("week_templates.visibility = ?") params.append(visibility) rows = get_db().execute( f""" SELECT week_templates.*, owner.display_name AS owner_display_name, owner.username AS owner_username, ( SELECT COUNT(*) FROM week_template_days WHERE week_template_days.week_template_id = week_templates.id ) AS day_count FROM week_templates LEFT JOIN users AS owner ON owner.id = week_templates.owner_user_id WHERE {' AND '.join(conditions)} ORDER BY LOWER(week_templates.name) """, params, ).fetchall() return [describe_template_record(dict(row)) for row in rows] def get_week_template(template_id: int) -> dict: row = get_db().execute( f""" SELECT week_templates.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM week_templates LEFT JOIN users AS owner ON owner.id = week_templates.owner_user_id WHERE week_templates.id = ? AND {visible_clause('week_templates')} """, [template_id, *visible_params()], ).fetchone() if row is None: raise ValueError("Die Wochenvorlage wurde nicht gefunden.") return describe_template_record(dict(row)) def get_week_template_selected_map(template_id: int) -> dict[int, int]: rows = get_db().execute( """ SELECT weekday_index, day_template_id FROM week_template_days WHERE week_template_id = ? ORDER BY weekday_index """, (template_id,), ).fetchall() return {int(row["weekday_index"]): int(row["day_template_id"]) for row in rows} def week_template_form_data(template: dict | None = None, source_week: date | None = None) -> dict: selected_map = get_week_template_selected_map(template["id"]) if template else {} source_days = {} if source_week: for weekday_index in range(7): source_days[weekday_index] = fetch_day_plan_entries(source_week + timedelta(days=weekday_index)) return { "name": template["name"] if template else "", "description": template["description"] if template else "", "visibility": template["visibility"] if template else "shared", "selected_map": selected_map, "source_week": source_week.isoformat() if source_week else "", "copy_from_source": { index: bool(source_days.get(index)) for index in range(7) }, } def extract_week_template_form_data(existing: dict | None = None) -> dict: form_data = existing or {} selected_map: dict[int, int | None] = {} copy_from_source: dict[int, bool] = {} for weekday_index in range(7): raw_value = request.form.get(f"weekday_{weekday_index}_day_template_id", "").strip() selected_map[weekday_index] = int(raw_value) if raw_value.isdigit() else None copy_from_source[weekday_index] = request.form.get(f"weekday_{weekday_index}_copy_source") == "1" form_data.update( { "name": request.form.get("name", "").strip(), "description": request.form.get("description", "").strip(), "visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")), "selected_map": selected_map, "source_week": request.form.get("source_week", "").strip(), "copy_from_source": copy_from_source, } ) return form_data def sync_week_template_days(template_id: int, selected_map: dict[int, int | None]) -> None: get_db().execute("DELETE FROM week_template_days WHERE week_template_id = ?", (template_id,)) for weekday_index, day_template_id in selected_map.items(): if day_template_id: get_db().execute( """ INSERT INTO week_template_days (week_template_id, weekday_index, day_template_id) VALUES (?, ?, ?) """, (template_id, weekday_index, day_template_id), ) def apply_week_template(template_id: int, week_start: date) -> int: selected_map = get_week_template_selected_map(template_id) inserted = 0 for weekday_index, day_template_id in selected_map.items(): inserted += apply_day_template(day_template_id, week_start + timedelta(days=weekday_index)) get_db().execute( "UPDATE week_templates SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?", (template_id,), ) get_db().commit() return inserted def fetch_item_sets(query: str | None = None, visibility: str | None = None) -> list[dict]: conditions = [visible_clause("item_sets")] params = visible_params() if query: conditions.append("LOWER(item_sets.name) LIKE ?") params.append(f"%{query.lower()}%") if visibility: conditions.append("item_sets.visibility = ?") params.append(visibility) rows = get_db().execute( f""" SELECT item_sets.*, owner.display_name AS owner_display_name, owner.username AS owner_username, ( SELECT COUNT(*) FROM item_set_items WHERE item_set_items.item_set_id = item_sets.id ) AS item_count FROM item_sets LEFT JOIN users AS owner ON owner.id = item_sets.owner_user_id WHERE {' AND '.join(conditions)} ORDER BY LOWER(item_sets.name) """, params, ).fetchall() return [describe_template_record(dict(row)) for row in rows] def get_item_set(set_id: int) -> dict: row = get_db().execute( f""" SELECT item_sets.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM item_sets LEFT JOIN users AS owner ON owner.id = item_sets.owner_user_id WHERE item_sets.id = ? AND {visible_clause('item_sets')} """, [set_id, *visible_params()], ).fetchone() if row is None: raise ValueError("Das Paket wurde nicht gefunden.") return describe_template_record(dict(row)) def get_item_set_selected_ids(set_id: int) -> list[int]: rows = get_db().execute( """ SELECT item_id FROM item_set_items WHERE item_set_id = ? ORDER BY sort_order, id """, (set_id,), ).fetchall() return [int(row["item_id"]) for row in rows] def sync_item_set_items(set_id: int, item_ids: list[int]) -> None: get_db().execute("DELETE FROM item_set_items WHERE item_set_id = ?", (set_id,)) for sort_order, item_id in enumerate(item_ids, start=10): get_db().execute( """ INSERT OR IGNORE INTO item_set_items (item_set_id, item_id, sort_order) VALUES (?, ?, ?) """, (set_id, item_id, sort_order), ) def extract_item_set_form_data(existing: dict | None = None) -> dict: form_data = existing or {} form_data.update( { "name": request.form.get("name", "").strip(), "description": request.form.get("description", "").strip(), "visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")), "item_ids": [int(value) for value in request.form.getlist("item_ids") if value.isdigit()], "item_search": request.form.get("item_search", "").strip(), } ) return form_data def apply_item_set_to_shopping(set_id: int) -> dict: item_set = get_item_set(set_id) selected_ids = get_item_set_selected_ids(set_id) added_names: list[str] = [] for item_id in selected_ids: result = ensure_item_or_missing_components_are_shopped( item_id, g.user["id"], item_set["visibility"], ) added_names.extend(result["names"]) get_db().execute( "UPDATE item_sets SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?", (set_id,), ) get_db().commit() return {"count": len(added_names), "names": added_names} def render_item_form(kind: str, *, item: dict | None, form_data: dict): food_search = form_data.get("food_search") or None foods = fetch_food_options(query=food_search if kind == "meal" else None) return render_template( "items/form.html", kind=kind, item=item, dayparts=get_dayparts(), food_groups=group_items_by_availability(foods), categories=get_category_options( form_data.get("category") or form_data.get("quick_food_category") ), form_data=form_data, energy_density_options=ENERGY_DENSITY_OPTIONS, visibility_options=VISIBILITY_FORM_OPTIONS, target_user_options=get_target_user_options(), ) def create_or_get_generated_meal( *, name: str, component_ids: list[int], daypart_id: int, visibility: str, ) -> int: normalized_ids = normalized_component_signature(component_ids) existing_meals = [ item for item in fetch_items(kind="meal") if normalized_component_signature(item.get("component_ids", [])) == normalized_ids ] if existing_meals: meal_id = int(existing_meals[0]["id"]) current_dayparts = get_item_daypart_ids(meal_id) if daypart_id not in current_dayparts: sync_item_dayparts(meal_id, current_dayparts + [daypart_id]) get_db().execute( """ UPDATE items SET updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (g.user["id"], meal_id), ) get_db().commit() return meal_id existing = get_db().execute( f""" SELECT items.id FROM items WHERE items.kind = 'meal' AND LOWER(items.name) = LOWER(?) AND {visible_clause('items')} ORDER BY items.id LIMIT 1 """, [name, *visible_params()], ).fetchone() if existing: meal_id = int(existing["id"]) sync_meal_components(meal_id, list(normalized_ids)) current_dayparts = get_item_daypart_ids(meal_id) if daypart_id not in current_dayparts: sync_item_dayparts(meal_id, current_dayparts + [daypart_id]) get_db().execute( """ UPDATE items SET updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (g.user["id"], meal_id), ) get_db().commit() return meal_id cursor = get_db().execute( """ INSERT INTO items ( household_id, owner_user_id, visibility, kind, name, category, created_by, updated_by ) VALUES (?, ?, ?, 'meal', ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], visibility, name, "Kleines Essen" if get_daypart_by_id(daypart_id)["slug"] in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"} else "Warmes", g.user["id"], g.user["id"], ), ) meal_id = int(cursor.lastrowid) sync_item_dayparts(meal_id, [daypart_id]) sync_meal_components(meal_id, list(normalized_ids)) get_db().commit() return meal_id def insert_plan_entry(*, item_id: int, daypart_id: int, plan_date: date, visibility: str, note: str = "") -> dict: get_db().execute( """ INSERT INTO plan_entries (household_id, owner_user_id, visibility, plan_date, daypart_id, item_id, note, created_by) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], visibility, plan_date.isoformat(), daypart_id, item_id, note, g.user["id"], ), ) get_db().commit() return ensure_item_or_missing_components_are_shopped( item_id, g.user["id"], visibility, needed_for_date=plan_date.isoformat(), needed_for_daypart_id=daypart_id, source_item_id=item_id, ) def update_plan_entry(entry_id: int, *, visibility: str, note: str) -> None: get_db().execute( """ UPDATE plan_entries SET visibility = ?, owner_user_id = CASE WHEN ? = 'personal' THEN ? ELSE owner_user_id END, note = ? WHERE id = ? """, (visibility, visibility, g.user["id"], note, entry_id), ) get_db().commit() def planner_template_options(): return fetch_day_templates() @main_bp.get("/") @login_required def dashboard(): today = date.today() home_count = count_visible_items("home") archive_count = count_visible_items("archived") shopping_count = get_db().execute( f"SELECT COUNT(*) AS count FROM shopping_entries WHERE is_checked = 0 AND {visible_clause('shopping_entries')}", visible_params(), ).fetchone()["count"] today_entries = [] for entries in fetch_day_plan_entries(today).values(): today_entries.extend(entries) today_entries.sort(key=lambda entry: (entry["sort_order"], entry["item_name"].lower())) week_cards = fetch_week_cards(today - timedelta(days=today.weekday())) home_items = fetch_items(availability="home") return render_template( "dashboard.html", home_count=home_count, shopping_count=shopping_count, archive_count=archive_count, today_entries=today_entries, home_items=home_items[:8], today=today, week_cards=week_cards[:3], dashboard_hints=build_dashboard_hints(today), recipe_suggestions=build_home_recipe_suggestions(limit=4), upcoming_entries=fetch_upcoming_shopping_needs(limit=4), day_templates=fetch_day_templates()[:3], week_templates=fetch_week_templates()[:3], setup_checklist=build_setup_checklist(today), ) @main_bp.get("/templates") @login_required def template_library(): query = request.args.get("q", "").strip() selected_visibility = request.args.get("visibility", "").strip() visibility = selected_visibility or None day_templates = fetch_day_templates(query=query or None, visibility=visibility) week_templates = fetch_week_templates(query=query or None, visibility=visibility) item_sets = fetch_item_sets(query=query or None, visibility=visibility) template_hints = build_dashboard_hints(date.today()) return render_template( "library/index.html", query=query, selected_visibility=selected_visibility, visibility_options=VISIBILITY_FILTER_OPTIONS, day_templates=day_templates, week_templates=week_templates, item_sets=item_sets, template_hints=template_hints, ) @main_bp.post("/suggestions/hide") @login_required def suggestion_hide(): component_ids = [int(value) for value in request.form.getlist("component_ids") if value.isdigit()] if not component_ids: flash("Diese Kombination konnte gerade nicht ausgeblendet werden.", "error") return redirect(request.referrer or url_for("main.dashboard")) get_db().execute( """ INSERT OR IGNORE INTO hidden_generated_suggestions (user_id, suggestion_key) VALUES (?, ?) """, (g.user["id"], generated_suggestion_key(component_ids)), ) get_db().commit() flash("Diese generierte Mahlzeit wird dir künftig nicht mehr vorgeschlagen.", "info") return redirect(request.referrer or url_for("main.dashboard")) @main_bp.route("/templates/day/new", methods=("GET", "POST")) @login_required def day_template_create(): source_date = parse_plan_date(request.values.get("source_date"), fallback=None) if request.values.get("source_date") else None form_data = day_template_form_data(source_date=source_date) if request.method == "POST": form_data = extract_day_template_form_data(form_data) if not form_data["name"]: flash("Bitte einen Namen für die Tagesvorlage eintragen.", "error") else: cursor = get_db().execute( """ INSERT INTO day_templates (household_id, owner_user_id, visibility, name, description) VALUES (?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["visibility"], form_data["name"], form_data["description"], ), ) template_id = int(cursor.lastrowid) sync_day_template_entries(template_id, form_data["selected_map"]) get_db().commit() flash("Die Tagesvorlage wurde gespeichert.", "success") return redirect(url_for("main.template_library")) return render_template( "library/day_form.html", template=None, form_data=form_data, dayparts=get_dayparts(), daypart_sections=build_template_day_sections(form_data["selected_map"]), visibility_options=VISIBILITY_FORM_OPTIONS, name_suggestions=DAY_TEMPLATE_NAME_SUGGESTIONS, source_date=source_date, ) @main_bp.route("/templates/day//edit", methods=("GET", "POST")) @login_required def day_template_edit(template_id: int): try: template = get_day_template(template_id) ensure_can_edit(template, "Diese Tagesvorlage kannst du gerade nicht bearbeiten.") except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.template_library")) form_data = day_template_form_data(template=template) if request.method == "POST": form_data = extract_day_template_form_data(form_data) if not form_data["name"]: flash("Bitte einen Namen für die Tagesvorlage eintragen.", "error") else: get_db().execute( """ UPDATE day_templates SET name = ?, description = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( form_data["name"], form_data["description"], form_data["visibility"], template_id, ), ) sync_day_template_entries(template_id, form_data["selected_map"]) get_db().commit() flash("Die Tagesvorlage wurde aktualisiert.", "success") return redirect(url_for("main.template_library")) return render_template( "library/day_form.html", template=template, form_data=form_data, dayparts=get_dayparts(), daypart_sections=build_template_day_sections(form_data["selected_map"]), visibility_options=VISIBILITY_FORM_OPTIONS, name_suggestions=DAY_TEMPLATE_NAME_SUGGESTIONS, source_date=None, ) @main_bp.post("/templates/day//apply") @login_required def day_template_apply(template_id: int): selected_date = parse_plan_date(request.form.get("target_date")) inserted = apply_day_template(template_id, selected_date) flash(f"Die Tagesvorlage wurde angewendet und hat {inserted} Einträge ergänzt.", "success") return redirect(url_for("main.planner_day", date=selected_date.isoformat())) @main_bp.route("/templates/week/new", methods=("GET", "POST")) @login_required def week_template_create(): source_week_raw = request.values.get("source_week", "").strip() source_week = parse_week_start(source_week_raw) if source_week_raw else None form_data = week_template_form_data(source_week=source_week) if request.method == "POST": form_data = extract_week_template_form_data(form_data) if not form_data["name"]: flash("Bitte einen Namen für die Wochenvorlage eintragen.", "error") else: cursor = get_db().execute( """ INSERT INTO week_templates (household_id, owner_user_id, visibility, name, description) VALUES (?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["visibility"], form_data["name"], form_data["description"], ), ) template_id = int(cursor.lastrowid) selected_map = dict(form_data["selected_map"]) if form_data["source_week"]: source_start = parse_week_start(form_data["source_week"]) for weekday_index in range(7): if selected_map.get(weekday_index): continue if form_data["copy_from_source"].get(weekday_index): snapshot_date = source_start + timedelta(days=weekday_index) snapshot_name = f"{form_data['name']} · {WEEKDAY_LABELS[weekday_index]}" snapshot_id = create_day_template_snapshot( snapshot_name, f"Automatisch aus der Woche {source_start.strftime('%d.%m.%Y')} übernommen.", form_data["visibility"], snapshot_date, ) if snapshot_id: selected_map[weekday_index] = snapshot_id sync_week_template_days(template_id, selected_map) get_db().commit() flash("Die Wochenvorlage wurde gespeichert.", "success") return redirect(url_for("main.template_library")) return render_template( "library/week_form.html", template=None, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, name_suggestions=WEEK_TEMPLATE_NAME_SUGGESTIONS, day_templates=fetch_day_templates(), weekday_labels=WEEKDAY_LABELS, source_week=source_week, ) @main_bp.route("/templates/week//edit", methods=("GET", "POST")) @login_required def week_template_edit(template_id: int): try: template = get_week_template(template_id) ensure_can_edit(template, "Diese Wochenvorlage kannst du gerade nicht bearbeiten.") except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.template_library")) form_data = week_template_form_data(template=template) if request.method == "POST": form_data = extract_week_template_form_data(form_data) if not form_data["name"]: flash("Bitte einen Namen für die Wochenvorlage eintragen.", "error") else: get_db().execute( """ UPDATE week_templates SET name = ?, description = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( form_data["name"], form_data["description"], form_data["visibility"], template_id, ), ) sync_week_template_days(template_id, form_data["selected_map"]) get_db().commit() flash("Die Wochenvorlage wurde aktualisiert.", "success") return redirect(url_for("main.template_library")) return render_template( "library/week_form.html", template=template, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, name_suggestions=WEEK_TEMPLATE_NAME_SUGGESTIONS, day_templates=fetch_day_templates(), weekday_labels=WEEKDAY_LABELS, source_week=None, ) @main_bp.post("/templates/week//apply") @login_required def week_template_apply(template_id: int): week_start = parse_week_start(request.form.get("target_week")) inserted = apply_week_template(template_id, week_start) flash(f"Die Wochenvorlage wurde angewendet und hat {inserted} Einträge ergänzt.", "success") return redirect(url_for("main.planner", week=week_start.isoformat())) @main_bp.route("/templates/set/new", methods=("GET", "POST")) @login_required def item_set_create(): form_data = { "name": "", "description": "", "visibility": "shared", "item_ids": [], "item_search": "", } if request.method == "POST": form_data = extract_item_set_form_data(form_data) if not form_data["name"]: flash("Bitte einen Namen für das Paket eintragen.", "error") else: cursor = get_db().execute( """ INSERT INTO item_sets (household_id, owner_user_id, visibility, name, description) VALUES (?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["visibility"], form_data["name"], form_data["description"], ), ) set_id = int(cursor.lastrowid) sync_item_set_items(set_id, form_data["item_ids"]) get_db().commit() flash("Das Paket wurde gespeichert.", "success") return redirect(url_for("main.template_library")) items = fetch_items(include_archived=False, query=form_data["item_search"] or None) return render_template( "library/set_form.html", item_set=None, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, name_suggestions=ITEM_SET_NAME_SUGGESTIONS, item_groups=group_items_by_availability(items), ) @main_bp.route("/templates/set//edit", methods=("GET", "POST")) @login_required def item_set_edit(set_id: int): try: item_set = get_item_set(set_id) ensure_can_edit(item_set, "Dieses Paket kannst du gerade nicht bearbeiten.") except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.template_library")) form_data = { "name": item_set["name"], "description": item_set["description"] or "", "visibility": item_set["visibility"], "item_ids": get_item_set_selected_ids(set_id), "item_search": "", } if request.method == "POST": form_data = extract_item_set_form_data(form_data) if not form_data["name"]: flash("Bitte einen Namen für das Paket eintragen.", "error") else: get_db().execute( """ UPDATE item_sets SET name = ?, description = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( form_data["name"], form_data["description"], form_data["visibility"], set_id, ), ) sync_item_set_items(set_id, form_data["item_ids"]) get_db().commit() flash("Das Paket wurde aktualisiert.", "success") return redirect(url_for("main.template_library")) items = fetch_items(include_archived=False, query=form_data["item_search"] or None) return render_template( "library/set_form.html", item_set=item_set, form_data=form_data, visibility_options=VISIBILITY_FORM_OPTIONS, name_suggestions=ITEM_SET_NAME_SUGGESTIONS, item_groups=group_items_by_availability(items), ) @main_bp.post("/templates/set//apply") @login_required def item_set_apply(set_id: int): result = apply_item_set_to_shopping(set_id) if result["count"]: flash(f"Das Paket wurde auf die Einkaufsliste übernommen: {', '.join(result['names'][:4])}.", "success") else: flash("Das Paket ist bereits vollständig auf der Einkaufsliste oder zuhause vorhanden.", "info") return redirect(url_for("main.shopping_list")) @main_bp.route("/settings", methods=("GET", "POST")) @login_required def settings_view(): if request.method == "POST": form_name = request.form.get("form_name", "").strip() if form_name == "household": shopping_weekday = normalize_weekday(request.form.get("shopping_weekday"), 5) raw_prep_days = request.form.get("shopping_prep_days", "1").strip() shopping_prep_days = max(0, min(7, int(raw_prep_days))) if raw_prep_days.isdigit() else 1 shopping_reminder_time = request.form.get("shopping_reminder_time", "18:00").strip() or "18:00" get_db().execute( """ UPDATE households SET shopping_weekday = ?, shopping_prep_days = ?, shopping_reminder_time = ? WHERE id = ? """, (shopping_weekday, shopping_prep_days, shopping_reminder_time, current_household_id()), ) get_db().commit() flash("Die Einkaufsrhythmus-Einstellungen wurden gespeichert.", "success") elif form_name == "reminders": ensure_user_settings_row() suggestion_style = normalize_suggestion_style(request.form.get("suggestion_style"), "balanced") get_db().execute( """ UPDATE user_settings SET reminders_enabled = ?, push_enabled = ?, notification_channel = ?, suggestion_style = ?, energy_preference = ?, remind_before_shopping = ?, remind_on_shopping_day = ?, show_missing_for_upcoming_week = ?, show_planned_not_shopped = ?, remind_tomorrow_if_sparse = ?, remind_week_if_sparse = ?, push_missing_breakfast = ?, push_missing_lunch = ?, push_missing_dinner = ?, push_small_snack = ?, suggest_home_for_today = ?, remind_small_snack = ?, remind_nuts = ?, show_meal_balancing = ?, suggest_templates = ?, suggest_patterns = ?, updated_at = CURRENT_TIMESTAMP WHERE user_id = ? """, ( parse_checkbox("reminders_enabled", True), parse_checkbox("push_enabled", False), normalize_notification_channel(request.form.get("notification_channel"), "in_app"), suggestion_style, suggestion_style_energy_preference(suggestion_style), parse_checkbox("remind_before_shopping", True), parse_checkbox("remind_on_shopping_day", True), parse_checkbox("show_missing_for_upcoming_week", True), parse_checkbox("show_planned_not_shopped", True), parse_checkbox("remind_tomorrow_if_sparse", True), parse_checkbox("remind_week_if_sparse", True), parse_checkbox("push_missing_breakfast", False), parse_checkbox("push_missing_lunch", False), parse_checkbox("push_missing_dinner", False), parse_checkbox("push_small_snack", False), parse_checkbox("suggest_home_for_today", True), parse_checkbox("remind_small_snack", False), parse_checkbox("remind_nuts", False), parse_checkbox("show_meal_balancing", True), parse_checkbox("suggest_templates", True), parse_checkbox("suggest_patterns", True), g.user["id"], ), ) get_db().commit() flash("Deine Erinnerungen und Hinweise wurden gespeichert.", "success") elif form_name == "push_test": subscription = get_db().execute( """ SELECT endpoint, p256dh, auth FROM push_subscriptions WHERE user_id = ? AND is_active = 1 ORDER BY updated_at DESC LIMIT 1 """, (g.user["id"],), ).fetchone() if subscription is None: flash("Bitte aktiviere zuerst Push im Browser oder auf dem Home-Bildschirm.", "error") else: ok, error = send_push_message( { "endpoint": subscription["endpoint"], "keys": {"p256dh": subscription["p256dh"], "auth": subscription["auth"]}, }, title="Nouri", body="Push ist bereit. Erinnerungen können später darüber kommen.", url=url_for("main.settings_view", _external=True), ) if ok: get_db().execute( "UPDATE push_subscriptions SET last_test_at = CURRENT_TIMESTAMP WHERE endpoint = ?", (subscription["endpoint"],), ) get_db().commit() flash("Die Test-Mitteilung wurde versendet.", "success") else: flash(error or "Die Test-Mitteilung konnte gerade nicht gesendet werden.", "error") return redirect(url_for("main.settings_view")) household_settings = get_household_settings() user_settings = get_user_settings() push_subscription = get_db().execute( """ SELECT COUNT(*) AS count FROM push_subscriptions WHERE user_id = ? AND is_active = 1 """, (g.user["id"],), ).fetchone() return render_template( "settings.html", household_settings=household_settings, user_settings=user_settings, push_subscription_count=int(push_subscription["count"]), push_ready=push_is_configured(), push_public_key_value=push_public_key(), restore_confirmation_text=RESTORE_CONFIRMATION_TEXT, ) @main_bp.get("/settings/backup/export") @login_required @admin_required def backup_export(): archive_path, download_name = export_backup_archive( get_db(), current_app.config["UPLOAD_FOLDER"], current_app.config["APP_VERSION"], ) @after_this_request def cleanup_backup(response): Path(archive_path).unlink(missing_ok=True) return response return send_file( archive_path, as_attachment=True, download_name=download_name, mimetype="application/zip", max_age=0, ) @main_bp.post("/settings/backup/restore") @login_required @admin_required def backup_restore(): confirmation = request.form.get("restore_confirmation", "").strip().upper() backup_file = request.files.get("backup_file") if confirmation != RESTORE_CONFIRMATION_TEXT: flash("Bitte die Bestätigung genau eintragen, bevor das Backup wiederhergestellt wird.", "error") return redirect(url_for("main.settings_view")) if not backup_file or not backup_file.filename: flash("Bitte zuerst eine Backup-Datei auswählen.", "error") return redirect(url_for("main.settings_view")) try: metadata = restore_backup_archive(get_db(), current_app.config["UPLOAD_FOLDER"], backup_file) get_db().commit() except Exception as exc: get_db().rollback() flash(str(exc) or "Das Backup konnte gerade nicht wiederhergestellt werden.", "error") return redirect(url_for("main.settings_view")) version_label = metadata.get("app_version") or "einer älteren Version" flash(f"Das Backup aus {version_label} wurde wiederhergestellt.", "success") return redirect(url_for("main.settings_view")) @main_bp.post("/push/subscribe") @login_required def push_subscribe(): if not push_is_configured(): return jsonify({"ok": False, "error": "Push ist noch nicht konfiguriert."}), 400 endpoint = request.form.get("endpoint", "").strip() p256dh = request.form.get("p256dh", "").strip() auth_key = request.form.get("auth", "").strip() if not endpoint or not p256dh or not auth_key: return jsonify({"ok": False, "error": "Unvollständige Push-Daten."}), 400 get_db().execute( """ INSERT INTO push_subscriptions (user_id, endpoint, p256dh, auth, user_agent, is_active) VALUES (?, ?, ?, ?, ?, 1) ON CONFLICT(endpoint) DO UPDATE SET user_id = excluded.user_id, p256dh = excluded.p256dh, auth = excluded.auth, user_agent = excluded.user_agent, is_active = 1, updated_at = CURRENT_TIMESTAMP """, (g.user["id"], endpoint, p256dh, auth_key, request.headers.get("User-Agent", "")), ) ensure_user_settings_row() get_db().execute( "UPDATE user_settings SET push_enabled = 1, updated_at = CURRENT_TIMESTAMP WHERE user_id = ?", (g.user["id"],), ) get_db().commit() return jsonify({"ok": True}) @main_bp.post("/push/unsubscribe") @login_required def push_unsubscribe(): endpoint = request.form.get("endpoint", "").strip() if endpoint: get_db().execute( "UPDATE push_subscriptions SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE endpoint = ? AND user_id = ?", (endpoint, g.user["id"]), ) else: get_db().execute( "UPDATE push_subscriptions SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE user_id = ?", (g.user["id"],), ) get_db().commit() return jsonify({"ok": True}) @main_bp.route("/items/") @login_required def item_list(kind: str): if kind not in ITEM_KIND_LABELS: return redirect(url_for("main.dashboard")) query = request.args.get("q", "").strip() state = request.args.get("state", "").strip() scope = request.args.get("visibility", "").strip() raw_daypart_id = request.args.get("daypart_id", "").strip() daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None items = fetch_items( kind=kind, availability=state or None, query=query or None, daypart_id=daypart_id, visibility=scope or None, ) return render_template( "items/list.html", kind=kind, items=items, availability_labels=AVAILABILITY_LABELS, query=query, selected_state=state, selected_visibility=scope, selected_daypart_id=daypart_id, dayparts=get_dayparts(), state_options=ACTIVE_STATE_OPTIONS, visibility_options=VISIBILITY_FILTER_OPTIONS, today=date.today(), ) @main_bp.route("/items//new", methods=("GET", "POST")) @login_required def item_create(kind: str): if kind not in ITEM_KIND_LABELS: return redirect(url_for("main.dashboard")) form_data = { "name": request.args.get("name", "").strip(), "category": "", "energy_density": "neutral", "note": "", "visibility": "shared", "target_user_id": None, "target_user_raw": TARGET_USER_OPTIONS_DEFAULT, "food_search": "", "daypart_ids": [], "component_ids": [int(value) for value in request.args.getlist("component_ids") if value.isdigit()], "quick_food_name": "", "quick_food_category": "", "quick_food_energy_density": "neutral", "quick_food_note": "", } if request.method == "POST": form_action = request.form.get("form_action", "save_item") form_data = extract_item_form_data(form_data) if kind == "meal" and form_action == "filter_foods": return render_item_form(kind, item=None, form_data=form_data) if kind == "meal" and form_action == "quick_add_food": if not form_data["quick_food_name"]: flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error") else: new_food_id = create_quick_food_from_form(form_data) if new_food_id not in form_data["component_ids"]: form_data["component_ids"].append(new_food_id) form_data["quick_food_name"] = "" form_data["quick_food_category"] = "" form_data["quick_food_note"] = "" flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success") return render_item_form(kind, item=None, form_data=form_data) error = None if not form_data["name"]: error = "Bitte einen Namen eintragen." photo_filename = None if error is None: try: photo_filename = save_photo(request.files.get("photo")) except ValueError as exc: error = str(exc) if error is None: cursor = get_db().execute( """ INSERT INTO items ( household_id, owner_user_id, target_user_id, visibility, kind, name, category, energy_density, note, photo_filename, created_by, updated_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( current_household_id(), g.user["id"], form_data["target_user_id"], form_data["visibility"], kind, form_data["name"], form_data["category"], form_data["energy_density"], form_data["note"], photo_filename, g.user["id"], g.user["id"], ), ) item_id = int(cursor.lastrowid) sync_item_dayparts(item_id, form_data["daypart_ids"]) if kind == "meal": sync_meal_components(item_id, form_data["component_ids"]) get_db().commit() flash(f"{ITEM_KIND_SINGULAR_LABELS[kind]} wurde angelegt.", "success") return redirect(url_for("main.item_list", kind=kind)) flash(error, "error") return render_item_form(kind, item=None, form_data=form_data) @main_bp.route("/items//edit", methods=("GET", "POST")) @login_required def item_edit(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.dashboard")) form_data = { "name": item["name"], "category": item["category"] or "", "energy_density": item.get("energy_density") or "neutral", "note": item["note"] or "", "visibility": item["visibility"], "target_user_id": item["target_user_id"], "target_user_raw": str(item["target_user_id"]) if item["target_user_id"] else TARGET_USER_OPTIONS_DEFAULT, "food_search": "", "daypart_ids": get_item_daypart_ids(item_id), "component_ids": get_meal_component_ids(item_id) if item["kind"] == "meal" else [], "quick_food_name": "", "quick_food_category": "", "quick_food_energy_density": "neutral", "quick_food_note": "", } if request.method == "POST": form_action = request.form.get("form_action", "save_item") form_data = extract_item_form_data(form_data) if item["kind"] == "meal" and form_action == "filter_foods": return render_item_form(item["kind"], item=item, form_data=form_data) if item["kind"] == "meal" and form_action == "quick_add_food": if not form_data["quick_food_name"]: flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error") else: new_food_id = create_quick_food_from_form(form_data) if new_food_id not in form_data["component_ids"]: form_data["component_ids"].append(new_food_id) form_data["quick_food_name"] = "" form_data["quick_food_category"] = "" form_data["quick_food_note"] = "" flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success") return render_item_form(item["kind"], item=item, form_data=form_data) error = None if not form_data["name"]: error = "Bitte einen Namen eintragen." photo_filename = item["photo_filename"] if error is None: try: photo_filename = save_photo(request.files.get("photo"), current_filename=item["photo_filename"]) except ValueError as exc: error = str(exc) if error is None: get_db().execute( """ UPDATE items SET name = ?, category = ?, energy_density = ?, note = ?, visibility = ?, target_user_id = ?, photo_filename = ?, updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( form_data["name"], form_data["category"], form_data["energy_density"], form_data["note"], form_data["visibility"], form_data["target_user_id"], photo_filename, g.user["id"], item_id, ), ) sync_item_dayparts(item_id, form_data["daypart_ids"]) if item["kind"] == "meal": sync_meal_components(item_id, form_data["component_ids"]) get_db().commit() flash("Der Eintrag wurde aktualisiert.", "success") return redirect(url_for("main.item_list", kind=item["kind"])) flash(error, "error") return render_item_form(item["kind"], item=item, form_data=form_data) @main_bp.post("/items//shopping") @login_required def item_add_to_shopping(item_id: int): try: item = get_item(item_id) except ValueError as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.shopping_list")) result = ensure_item_or_missing_components_are_shopped( item_id, g.user["id"], item["visibility"], ) if result["count"]: if result["used_components"]: flash(f"Für „{item['name']}“ wurden fehlende Lebensmittel auf die Einkaufsliste gesetzt.", "success") else: flash(f"{item['name']} steht jetzt auf der Einkaufsliste.", "success") elif result["scheduled_count"]: flash(f"Für „{item['name']}“ sind fehlende Lebensmittel für einen späteren Einkauf vorgemerkt.", "info") else: flash(f"Für „{item['name']}“ ist gerade nichts zusätzlich nötig.", "info") return redirect(request.referrer or url_for("main.shopping_list")) @main_bp.post("/items//set-home") @login_required def item_set_home(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.home_view")) get_db().execute( "UPDATE items SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (g.user["id"], item_id), ) get_db().commit() flash(f"{item['name']} ist jetzt unter Zuhause sichtbar.", "success") return redirect(request.referrer or url_for("main.home_view")) @main_bp.post("/items//archive") @login_required def item_archive(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.archive_view")) get_db().execute( "UPDATE items SET availability_state = 'archived', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (g.user["id"], item_id), ) get_db().commit() flash(f"{item['name']} liegt jetzt im Archiv und bleibt später leicht wiederfindbar.", "info") return redirect(request.referrer or url_for("main.archive_view")) @main_bp.post("/items//restore") @login_required def item_restore(item_id: int): try: item = get_item(item_id) ensure_can_edit(item) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(request.referrer or url_for("main.archive_view")) get_db().execute( "UPDATE items SET availability_state = 'idea', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (g.user["id"], item_id), ) get_db().commit() flash(f"{item['name']} ist wieder in der aktiven Liste.", "success") return redirect(request.referrer or url_for("main.archive_view")) @main_bp.route("/shopping", methods=("GET", "POST")) @login_required def shopping_list(): if request.method == "POST": selected_item_id = request.form.get("item_id", "").strip() if not selected_item_id.isdigit(): flash("Bitte zuerst etwas auswählen.", "error") else: try: item = get_item(int(selected_item_id)) result = ensure_item_or_missing_components_are_shopped( item["id"], g.user["id"], item["visibility"], ) if result["count"]: flash(f"Die Einkaufsliste wurde ergänzt: {', '.join(result['names'][:4])}.", "success") elif result["scheduled_count"]: flash("Ein paar Dinge sind für einen späteren Einkauf vorgemerkt.", "info") else: flash("Dafür ist gerade nichts zusätzlich nötig.", "info") except ValueError as exc: flash(str(exc), "error") return redirect(url_for("main.shopping_list")) entries = fetch_shopping_entries() upcoming_entries = fetch_upcoming_shopping_needs() addable_items = fetch_items(include_archived=False) addable_items = [item for item in addable_items if not item["is_on_shopping_list"]] household_settings = get_household_settings() shopping_weekday_label = dict(WEEKDAY_OPTIONS).get(household_settings["shopping_weekday"], "gesetzt") return render_template( "shopping/list.html", entries=entries, upcoming_entries=upcoming_entries, addable_items=addable_items, household_settings=household_settings, shopping_weekday_label=shopping_weekday_label, ) @main_bp.post("/shopping//check") @login_required def shopping_check(entry_id: int): entry = get_db().execute( f""" SELECT shopping_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM shopping_entries LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Einkaufseintrag wurde nicht gefunden.", "error") return redirect(url_for("main.shopping_list")) entry_dict = describe_record(dict(entry)) try: ensure_can_edit(entry_dict, "Diesen Einkaufseintrag kannst du gerade nicht ändern.") item = get_item(entry["item_id"]) except (ValueError, PermissionError) as exc: flash(str(exc), "error") return redirect(url_for("main.shopping_list")) get_db().execute( "UPDATE shopping_entries SET is_checked = 1, checked_at = CURRENT_TIMESTAMP, checked_by = ? WHERE id = ?", (g.user["id"], entry_id), ) get_db().execute( "UPDATE items SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (g.user["id"], item["id"]), ) get_db().commit() flash(f"{item['name']} ist jetzt als Zuhause vorhanden markiert.", "success") return redirect(url_for("main.shopping_list")) @main_bp.post("/shopping//remove") @login_required def shopping_remove(entry_id: int): entry = get_db().execute( f""" SELECT shopping_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM shopping_entries LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Eintrag wurde nicht gefunden.", "error") return redirect(url_for("main.shopping_list")) try: ensure_can_edit(describe_record(dict(entry)), "Diesen Einkaufseintrag kannst du gerade nicht entfernen.") except PermissionError as exc: flash(str(exc), "error") return redirect(url_for("main.shopping_list")) get_db().execute("DELETE FROM shopping_entries WHERE id = ?", (entry_id,)) get_db().commit() flash("Der Eintrag wurde von der Einkaufsliste entfernt.", "info") return redirect(url_for("main.shopping_list")) @main_bp.get("/home") @login_required def home_view(): query = request.args.get("q", "").strip() scope = request.args.get("visibility", "").strip() raw_daypart_id = request.args.get("daypart_id", "").strip() daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None dayparts = get_dayparts() items = fetch_items( availability="home", query=query or None, daypart_id=daypart_id, visibility=scope or None, ) return render_template( "home/list.html", sections=build_home_sections(items, dayparts, daypart_id), recipe_suggestions=build_home_recipe_suggestions(daypart_id, limit=4), query=query, dayparts=dayparts, selected_daypart_id=daypart_id, selected_visibility=scope, visibility_options=VISIBILITY_FILTER_OPTIONS, today=date.today(), ) @main_bp.get("/archive") @login_required def archive_view(): query = request.args.get("q", "").strip() selected_kind = request.args.get("kind", "").strip() selected_visibility = request.args.get("visibility", "").strip() kind = selected_kind if selected_kind in ITEM_KIND_LABELS else None items = fetch_items( kind=kind, availability="archived", include_archived=True, query=query or None, visibility=selected_visibility or None, ) return render_template( "archive/list.html", items=items, query=query, selected_kind=selected_kind, selected_visibility=selected_visibility, kind_options=KIND_FILTER_OPTIONS, visibility_options=VISIBILITY_FILTER_OPTIONS, today=date.today(), ) @main_bp.get("/planner") @login_required def planner(): week_start = parse_week_start(request.args.get("week")) return render_template( "planner/week.html", week_start=week_start, week_end=week_start + timedelta(days=6), prev_week=week_start - timedelta(days=7), next_week=week_start + timedelta(days=7), week_cards=fetch_week_cards(week_start), today=date.today(), week_templates=fetch_week_templates()[:6], week_hints=build_week_hints(week_start), upcoming_entries=fetch_upcoming_shopping_needs(limit=8), household_settings=get_household_settings(), ) @main_bp.get("/planner/export.pdf") @login_required def planner_export_pdf(): week_start = parse_week_start(request.args.get("week")) mode = normalize_pdf_export_mode(request.args.get("mode")) try: pdf_bytes = build_week_plan_pdf(week_start, mode=mode) except RuntimeError as exc: flash(str(exc), "error") return redirect(url_for("main.planner", week=week_start.isoformat())) week_number = week_start.isocalendar().week prefix = "mein-essensplan" if mode == "mine" else "unser-essensplan" filename = f"{prefix}-kw-{week_number:02d}-{week_start.year}.pdf" return send_file( BytesIO(pdf_bytes), mimetype="application/pdf", as_attachment=True, download_name=filename, ) @main_bp.route("/planner/day", methods=("GET", "POST")) @login_required def planner_day(): selected_date = parse_plan_date(request.values.get("date")) if request.method == "POST": item_id_raw = request.form.get("item_id", "").strip() daypart_id_raw = request.form.get("daypart_id", "").strip() note = request.form.get("note", "").strip() selected_date = parse_plan_date(request.form.get("plan_date")) visibility = normalize_visibility(request.form.get("visibility"), "shared") if not item_id_raw.isdigit(): flash("Bitte etwas für den Tagesplan auswählen.", "error") elif not daypart_id_raw.isdigit(): flash("Bitte eine Tageszeit auswählen.", "error") else: item_id = int(item_id_raw) daypart_id = int(daypart_id_raw) try: get_item(item_id) shopping_result = insert_plan_entry( item_id=item_id, daypart_id=daypart_id, plan_date=selected_date, visibility=visibility, note=note, ) if shopping_result["count"]: flash("Fehlende Lebensmittel wurden für den nächsten Einkauf ergänzt.", "info") elif shopping_result["scheduled_count"]: flash("Fehlende Lebensmittel wurden für einen späteren Einkauf vorgemerkt.", "info") flash("Der Eintrag wurde in den Tagesplan gelegt.", "success") return redirect( f"{url_for('main.planner_day', date=selected_date.isoformat(), daypart_id=daypart_id)}#daypart-{daypart_id}" ) except ValueError as exc: flash(str(exc), "error") selected_item_raw = request.args.get("item_id", "").strip() selected_daypart_raw = request.args.get("daypart_id", "").strip() selected_meal_name = request.args.get("meal_name", "").strip() selected_components_raw = request.args.get("component_ids", "").strip() selected_item_id = int(selected_item_raw) if selected_item_raw.isdigit() else None selected_daypart_id = int(selected_daypart_raw) if selected_daypart_raw.isdigit() else None selected_component_ids = [int(value) for value in selected_components_raw.split(",") if value.isdigit()] return render_template( "planner/day.html", selected_date=selected_date, previous_day=selected_date - timedelta(days=1), next_day=selected_date + timedelta(days=1), sections=build_day_planner_sections( selected_date, selected_item_id, selected_daypart_id, selected_meal_name=selected_meal_name, selected_component_ids=selected_component_ids, ), today=date.today(), visibility_options=VISIBILITY_FORM_OPTIONS, day_templates=fetch_day_templates()[:6], day_hints=build_day_hints(selected_date), ) @main_bp.post("/planner/day/generated-meal") @login_required def planner_generated_meal(): selected_date = parse_plan_date(request.form.get("plan_date")) daypart_raw = request.form.get("daypart_id", "").strip() meal_name = request.form.get("meal_name", "").strip() component_ids = [int(value) for value in request.form.getlist("component_ids") if value.isdigit()] visibility = normalize_visibility(request.form.get("visibility"), "shared") if not daypart_raw.isdigit() or not meal_name or not component_ids: flash("Die vorgeschlagene Kombination konnte gerade nicht übernommen werden.", "error") return redirect(url_for("main.planner_day", date=selected_date.isoformat())) daypart_id = int(daypart_raw) meal_id = create_or_get_generated_meal( name=meal_name, component_ids=component_ids, daypart_id=daypart_id, visibility=visibility, ) shopping_result = insert_plan_entry( item_id=meal_id, daypart_id=daypart_id, plan_date=selected_date, visibility=visibility, ) if shopping_result["count"]: flash("Die Kombination wurde eingeplant und fehlende Lebensmittel wurden ergänzt.", "info") elif shopping_result["scheduled_count"]: flash("Die Kombination wurde eingeplant. Fehlende Lebensmittel sind für später vorgemerkt.", "info") flash("Die Kombination wurde als Mahlzeitenidee übernommen.", "success") return redirect(f"{url_for('main.planner_day', date=selected_date.isoformat(), daypart_id=daypart_id)}#daypart-{daypart_id}") @main_bp.post("/planner//update") @login_required def planner_update(entry_id: int): selected_date = parse_plan_date(request.form.get("plan_date")) entry = get_db().execute( f""" SELECT plan_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_entries.id = ? AND {visible_clause('plan_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Planeintrag wurde nicht gefunden.", "error") return redirect(url_for("main.planner_day", date=selected_date.isoformat())) try: ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht bearbeiten.") except PermissionError as exc: flash(str(exc), "error") return redirect(url_for("main.planner_day", date=selected_date.isoformat())) visibility = normalize_visibility(request.form.get("visibility"), entry["visibility"]) note = request.form.get("note", "").strip() update_plan_entry(entry_id, visibility=visibility, note=note) flash("Der Planeintrag wurde angepasst.", "success") return redirect(url_for("main.planner_day", date=selected_date.isoformat(), daypart_id=entry["daypart_id"])) @main_bp.post("/planner//remove") @login_required def planner_remove(entry_id: int): selected_date = request.args.get("date", "") entry = get_db().execute( f""" SELECT plan_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_entries.id = ? AND {visible_clause('plan_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: flash("Der Planeintrag wurde nicht gefunden.", "error") else: try: ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht entfernen.") get_db().execute("DELETE FROM plan_entries WHERE id = ?", (entry_id,)) get_db().commit() flash("Der Planeintrag wurde entfernt.", "info") except PermissionError as exc: flash(str(exc), "error") if selected_date: return redirect(url_for("main.planner_day", date=selected_date)) return redirect(url_for("main.planner")) @main_bp.post("/planner//move") @login_required def planner_move(entry_id: int): target_date = parse_plan_date(request.form.get("target_date")) target_daypart_raw = request.form.get("target_daypart_id", "").strip() if not target_daypart_raw.isdigit(): return jsonify({"ok": False, "error": "Ungültige Tageszeit"}), 400 entry = get_db().execute( f""" SELECT plan_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_entries.id = ? AND {visible_clause('plan_entries')} """, [entry_id, *visible_params()], ).fetchone() if entry is None: return jsonify({"ok": False, "error": "Eintrag nicht gefunden"}), 404 try: ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht verschieben.") except PermissionError as exc: return jsonify({"ok": False, "error": str(exc)}), 403 target_daypart_id = int(target_daypart_raw) get_db().execute( "UPDATE plan_entries SET plan_date = ?, daypart_id = ? WHERE id = ?", (target_date.isoformat(), target_daypart_id, entry_id), ) get_db().commit() shopping_result = ensure_item_or_missing_components_are_shopped( entry["item_id"], g.user["id"], entry["visibility"], needed_for_date=target_date.isoformat(), needed_for_daypart_id=target_daypart_id, source_item_id=entry["item_id"], ) if shopping_result["count"]: flash("Fehlende Lebensmittel wurden nach dem Verschieben ergänzt.", "info") elif shopping_result["scheduled_count"]: flash("Fehlende Lebensmittel sind für den passenden Einkauf vorgemerkt.", "info") return jsonify( { "ok": True, "added_to_shopping": bool(shopping_result["count"]), "redirect_url": url_for("main.planner", week=parse_week_start(target_date.isoformat()).isoformat()), } ) @main_bp.post("/planner/slot/copy-forward") @login_required def planner_slot_copy_forward(): source_date = parse_plan_date(request.form.get("source_date")) target_date = source_date + timedelta(days=1) daypart_raw = request.form.get("daypart_id", "").strip() if not daypart_raw.isdigit(): flash("Die Tageszeit konnte nicht erkannt werden.", "error") return redirect(url_for("main.planner", week=parse_week_start(source_date.isoformat()).isoformat())) daypart_id = int(daypart_raw) entries = get_db().execute( f""" SELECT plan_entries.*, owner.display_name AS owner_display_name, owner.username AS owner_username FROM plan_entries LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id WHERE plan_entries.plan_date = ? AND plan_entries.daypart_id = ? AND {visible_clause('plan_entries')} ORDER BY plan_entries.id """, [source_date.isoformat(), daypart_id, *visible_params()], ).fetchall() copied_count = 0 shopping_added = 0 shopping_scheduled = 0 for raw_entry in entries: entry = describe_record(dict(raw_entry)) try: ensure_can_edit(entry, "Diesen Planeintrag kannst du gerade nicht kopieren.") except PermissionError: continue duplicate = get_db().execute( """ SELECT id FROM plan_entries WHERE household_id = ? AND plan_date = ? AND daypart_id = ? AND item_id = ? AND visibility = ? AND COALESCE(note, '') = COALESCE(?, '') LIMIT 1 """, ( current_household_id(), target_date.isoformat(), daypart_id, entry["item_id"], entry["visibility"], entry.get("note", ""), ), ).fetchone() if duplicate: continue shopping_result = insert_plan_entry( item_id=entry["item_id"], daypart_id=daypart_id, plan_date=target_date, visibility=entry["visibility"], note=entry.get("note", "") or "", ) copied_count += 1 shopping_added += int(shopping_result["count"]) shopping_scheduled += int(shopping_result["scheduled_count"]) if copied_count == 0: flash("Für diese Tageszeit gab es nichts Neues zum Kopieren.", "info") else: if shopping_added: flash("Fehlende Lebensmittel wurden für den passenden Einkauf ergänzt.", "info") elif shopping_scheduled: flash("Fehlende Lebensmittel wurden für später vorgemerkt.", "info") flash(f"{copied_count} Eintrag{' wurde' if copied_count == 1 else 'e wurden'} zum nächsten Tag kopiert.", "success") return redirect(url_for("main.planner", week=parse_week_start(source_date.isoformat()).isoformat()))