Files
nouri-App/nouri/main.py
T

4118 lines
154 KiB
Python

from __future__ import annotations
from collections import defaultdict
from datetime import date, datetime, timedelta
from io import BytesIO
from itertools import product
from pathlib import Path
import sqlite3
from flask import (
Blueprint,
after_this_request,
current_app,
flash,
g,
jsonify,
redirect,
render_template,
request,
send_file,
url_for,
)
from .auth import admin_required, login_required
from .backup import RESTORE_CONFIRMATION_TEXT, export_backup_archive, restore_backup_archive
from .constants import (
AVAILABILITY_LABELS,
BUILDER_LABELS,
DEFAULT_CATEGORY_BUILDERS,
DAY_TEMPLATE_NAME_SUGGESTIONS,
DEFAULT_CATEGORIES,
ENERGY_DENSITY_LABELS,
ENERGY_DENSITY_OPTIONS,
ITEM_KIND_LABELS,
ITEM_KIND_SINGULAR_LABELS,
ITEM_SET_NAME_SUGGESTIONS,
NOTIFICATION_CHANNEL_OPTIONS,
SUGGESTION_STYLE_LABELS,
SUGGESTION_STYLE_OPTIONS,
VISIBILITY_DESCRIPTIONS,
VISIBILITY_LABELS,
WEEKDAY_OPTIONS,
WEEK_TEMPLATE_NAME_SUGGESTIONS,
)
from .db import get_db
from .images import (
allowed_image_file,
save_photo_with_variants,
upload_file_size_ok,
)
from .push import push_is_configured, push_public_key, send_push_message
main_bp = Blueprint("main", __name__)
ACTIVE_STATE_OPTIONS = [
("", "Alle aktiven"),
("home", "Zuhause"),
("idea", "Merkliste"),
]
KIND_FILTER_OPTIONS = [
("", "Alles"),
("food", "Lebensmittel"),
("meal", "Mahlzeitenideen"),
]
VISIBILITY_FILTER_OPTIONS = [
("", "Alles Sichtbare"),
("shared", "Gemeinsam"),
("personal", "Persönlich"),
]
VISIBILITY_FORM_OPTIONS = [
("shared", "Gemeinsam"),
("personal", "Persönlich"),
]
TARGET_USER_OPTIONS_DEFAULT = "__all__"
WEEKDAY_LABELS = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"]
PRIMARY_DAYPART_SLUGS = {"breakfast", "lunch", "dinner"}
SNACK_DAYPART_SLUGS = {"morning-snack", "afternoon-snack", "late-snack"}
PDF_DAYPART_LABELS = {
"morning-snack": "Snack am Vormittag",
"afternoon-snack": "Snack am Nachmittag",
"late-snack": "Später Snack",
}
@main_bp.before_app_request
def refresh_due_context():
endpoint = request.endpoint or ""
if getattr(g, "user", None) is None:
return None
if request.method == "GET" and endpoint.startswith("main."):
try:
activate_due_shopping_needs()
except sqlite3.OperationalError:
current_app.logger.warning("Due shopping needs could not be activated during this request.")
return None
def current_household_id() -> int:
return int(g.user["household_id"])
def get_dayparts() -> list:
return get_db().execute("SELECT * FROM dayparts ORDER BY sort_order").fetchall()
def format_weekday(day_value: date) -> str:
return WEEKDAY_LABELS[day_value.weekday()]
def get_household_users(active_only: bool = True):
query = """
SELECT id, username, display_name, role
FROM users
WHERE household_id = ?
"""
params: list[object] = [current_household_id()]
if active_only:
query += " AND is_active = 1"
query += " ORDER BY LOWER(COALESCE(display_name, username))"
return get_db().execute(query, params).fetchall()
def get_target_user_options() -> list[dict]:
options = [{"value": TARGET_USER_OPTIONS_DEFAULT, "label": "Für alle"}]
for user in get_household_users():
options.append(
{
"value": str(user["id"]),
"label": user["display_name"] or user["username"],
}
)
return options
def get_category_options(include_inactive_selected: str | None = None) -> list[str]:
rows = get_db().execute(
"""
SELECT name
FROM household_categories
WHERE household_id = ? AND is_active = 1
ORDER BY sort_order, LOWER(name)
""",
(current_household_id(),),
).fetchall()
categories = [row["name"] for row in rows]
if not categories:
categories = DEFAULT_CATEGORIES[:]
if include_inactive_selected and include_inactive_selected not in categories:
categories.append(include_inactive_selected)
return categories
def get_category_builder_map() -> dict[str, str]:
rows = get_db().execute(
"""
SELECT name, builder_key
FROM household_categories
WHERE household_id = ?
""",
(current_household_id(),),
).fetchall()
builder_map = {row["name"]: (row["builder_key"] or "neutral") for row in rows}
for name, builder_key in DEFAULT_CATEGORY_BUILDERS.items():
builder_map.setdefault(name, builder_key)
return builder_map
def get_household_settings() -> dict:
row = get_db().execute(
"""
SELECT shopping_weekday, shopping_prep_days, shopping_reminder_time
FROM households
WHERE id = ?
""",
(current_household_id(),),
).fetchone()
if row is None:
return {
"shopping_weekday": 5,
"shopping_prep_days": 1,
"shopping_reminder_time": "18:00",
}
return {
"shopping_weekday": int(row["shopping_weekday"] or 5),
"shopping_prep_days": int(row["shopping_prep_days"] or 1),
"shopping_reminder_time": row["shopping_reminder_time"] or "18:00",
}
def default_user_settings() -> dict:
suggestion_style = "balanced"
return {
"user_id": int(g.user["id"]),
"reminders_enabled": True,
"push_enabled": False,
"notification_channel": "in_app",
"suggestion_style": suggestion_style,
"energy_preference": suggestion_style_energy_preference(suggestion_style),
"remind_before_shopping": True,
"remind_on_shopping_day": True,
"show_missing_for_upcoming_week": True,
"show_planned_not_shopped": True,
"remind_tomorrow_if_sparse": True,
"remind_week_if_sparse": True,
"push_missing_breakfast": False,
"push_missing_lunch": False,
"push_missing_dinner": False,
"push_small_snack": False,
"suggest_home_for_today": True,
"remind_small_snack": False,
"remind_nuts": False,
"show_meal_balancing": True,
"suggest_templates": True,
"suggest_patterns": True,
}
def ensure_user_settings_row(*, commit: bool = False) -> None:
existing = get_db().execute(
"SELECT 1 FROM user_settings WHERE user_id = ? LIMIT 1",
(g.user["id"],),
).fetchone()
if existing is not None:
return
get_db().execute(
"INSERT INTO user_settings (user_id) VALUES (?)",
(g.user["id"],),
)
if commit:
get_db().commit()
def get_user_settings() -> dict:
settings = default_user_settings()
row = get_db().execute("SELECT * FROM user_settings WHERE user_id = ?", (g.user["id"],)).fetchone()
if row is None:
return settings
settings.update(dict(row))
boolean_fields = {
"reminders_enabled",
"push_enabled",
"remind_before_shopping",
"remind_on_shopping_day",
"show_missing_for_upcoming_week",
"show_planned_not_shopped",
"remind_tomorrow_if_sparse",
"remind_week_if_sparse",
"push_missing_breakfast",
"push_missing_lunch",
"push_missing_dinner",
"push_small_snack",
"suggest_home_for_today",
"remind_small_snack",
"remind_nuts",
"show_meal_balancing",
"suggest_templates",
"suggest_patterns",
}
for field in boolean_fields:
settings[field] = bool(settings.get(field))
settings["notification_channel"] = settings.get("notification_channel") or "in_app"
settings["suggestion_style"] = normalize_suggestion_style(settings.get("suggestion_style"), "balanced")
settings["energy_preference"] = suggestion_style_energy_preference(settings["suggestion_style"])
return settings
def parse_checkbox(name: str, default: bool = False) -> int:
return 1 if request.form.get(name, "1" if default else "0") == "1" else 0
def normalize_weekday(raw: str | None, default: int = 5) -> int:
if raw and raw.isdigit():
value = int(raw)
if 0 <= value <= 6:
return value
return default
def normalize_notification_channel(raw: str | None, default: str = "in_app") -> str:
allowed = {value for value, _label in NOTIFICATION_CHANNEL_OPTIONS}
return raw if raw in allowed else default
def normalize_suggestion_style(raw: str | None, default: str = "balanced") -> str:
allowed = {value for value, _label in SUGGESTION_STYLE_OPTIONS}
if raw == "easy" or raw == "snack":
return "balanced"
return raw if raw in allowed else default
def normalize_energy_density(raw: str | None, default: str = "neutral") -> str:
allowed = {value for value, _label in ENERGY_DENSITY_OPTIONS}
return raw if raw in allowed else default
def suggestion_style_energy_preference(style: str) -> str:
if style == "fitness":
return "low"
return "neutral"
def visible_clause(table_alias: str) -> str:
return (
f"{table_alias}.household_id = ? "
f"AND ({table_alias}.visibility = 'shared' OR {table_alias}.owner_user_id = ?)"
)
def visible_params() -> list[int]:
return [current_household_id(), int(g.user["id"])]
def parse_week_start(raw: str | None) -> date:
if raw:
try:
parsed = datetime.strptime(raw, "%Y-%m-%d").date()
return parsed - timedelta(days=parsed.weekday())
except ValueError:
pass
today = date.today()
return today - timedelta(days=today.weekday())
def parse_plan_date(raw: str | None, fallback: date | None = None) -> date:
if raw:
try:
return datetime.strptime(raw, "%Y-%m-%d").date()
except ValueError:
pass
return fallback or date.today()
def normalize_visibility(raw: str | None, default: str = "shared") -> str:
return raw if raw in VISIBILITY_LABELS else default
def normalize_target_user_id(raw: str | None) -> int | None:
if not raw or raw == TARGET_USER_OPTIONS_DEFAULT:
return None
if not raw.isdigit():
return None
target_id = int(raw)
allowed = {int(user["id"]) for user in get_household_users()}
return target_id if target_id in allowed else None
def save_photo(upload, current_filename: str | None = None) -> str | None:
if not upload or not upload.filename:
return current_filename
if not allowed_image_file(upload.filename):
raise ValueError("Bitte ein Bild als PNG, JPG, GIF oder WEBP hochladen.")
if not upload_file_size_ok(upload, current_app.config["MAX_CONTENT_LENGTH"]):
raise ValueError("Das Bild ist gerade zu groß. Ein etwas kleineres Foto hilft hier am besten.")
return save_photo_with_variants(upload, current_app.config["UPLOAD_FOLDER"], current_filename=current_filename)
def user_display_name(display_name: str | None, username: str | None) -> str:
return display_name or username or "Haushalt"
def describe_record(entry: dict) -> dict:
owner_name = user_display_name(entry.get("owner_display_name"), entry.get("owner_username"))
target_name = user_display_name(entry.get("target_display_name"), entry.get("target_username")) if entry.get("target_user_id") else None
entry["owner_name"] = owner_name
entry["target_name"] = target_name
entry["energy_density"] = normalize_energy_density(entry.get("energy_density"), "neutral")
entry["energy_density_label"] = ENERGY_DENSITY_LABELS.get(entry["energy_density"], ENERGY_DENSITY_LABELS["neutral"])
entry["is_personal"] = entry.get("visibility") == "personal"
entry["is_shared"] = entry.get("visibility") == "shared"
entry["is_mine"] = entry.get("owner_user_id") == g.user["id"]
entry["visibility_label"] = VISIBILITY_LABELS.get(entry.get("visibility"), "Gemeinsam")
entry["visibility_description"] = VISIBILITY_DESCRIPTIONS.get(entry.get("visibility"), "")
entry["owner_label"] = "Von mir" if entry["is_mine"] else f"Von {owner_name}"
if target_name:
entry["for_label"] = f"Für {target_name}"
elif entry["is_personal"]:
entry["for_label"] = "Für mich" if entry["is_mine"] else f"Für {owner_name}"
else:
entry["for_label"] = "Für alle"
entry["can_edit"] = entry["is_shared"] or entry["is_mine"] or g.user["role"] == "admin"
return entry
def describe_records(rows) -> list[dict]:
return [describe_record(dict(row)) for row in rows]
def describe_template_record(entry: dict) -> dict:
owner_name = user_display_name(entry.get("owner_display_name"), entry.get("owner_username"))
entry["owner_name"] = owner_name
entry["is_mine"] = entry.get("owner_user_id") == g.user["id"]
entry["visibility_label"] = VISIBILITY_LABELS.get(entry.get("visibility"), "Gemeinsam")
entry["owner_label"] = "Von mir" if entry["is_mine"] else f"Von {owner_name}"
entry["can_edit"] = entry.get("visibility") == "shared" or entry["is_mine"] or g.user["role"] == "admin"
return entry
def ensure_can_edit(entry: dict, error_message: str = "Diesen Eintrag kannst du gerade nicht bearbeiten.") -> None:
if not (entry.get("can_edit") or g.user["role"] == "admin"):
raise PermissionError(error_message)
def get_item(item_id: int) -> dict:
item = get_db().execute(
f"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username,
EXISTS(
SELECT 1
FROM shopping_entries
WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0
) AS is_on_shopping_list
FROM items
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE items.id = ? AND {visible_clause('items')}
""",
[item_id, *visible_params()],
).fetchone()
if item is None:
raise ValueError("Der Eintrag wurde nicht gefunden.")
return describe_record(dict(item))
def get_item_daypart_ids(item_id: int) -> list[int]:
rows = get_db().execute(
"SELECT daypart_id FROM item_dayparts WHERE item_id = ?",
(item_id,),
).fetchall()
return [int(row["daypart_id"]) for row in rows]
def get_meal_component_ids(meal_id: int) -> list[int]:
rows = get_db().execute(
"""
SELECT meal_components.food_item_id
FROM meal_components
JOIN items ON items.id = meal_components.food_item_id
WHERE meal_components.meal_item_id = ?
AND items.household_id = ?
AND (items.visibility = 'shared' OR items.owner_user_id = ?)
""",
(meal_id, current_household_id(), g.user["id"]),
).fetchall()
return [int(row["food_item_id"]) for row in rows]
def attach_dayparts(items: list[dict]) -> list[dict]:
if not items:
return []
item_ids = [item["id"] for item in items]
placeholders = ",".join("?" for _ in item_ids)
rows = get_db().execute(
f"""
SELECT item_dayparts.item_id, dayparts.id, dayparts.slug, dayparts.name
FROM item_dayparts
JOIN dayparts ON dayparts.id = item_dayparts.daypart_id
WHERE item_dayparts.item_id IN ({placeholders})
ORDER BY dayparts.sort_order
""",
item_ids,
).fetchall()
grouped = defaultdict(list)
for row in rows:
grouped[row["item_id"]].append(
{"id": row["id"], "slug": row["slug"], "name": row["name"]}
)
for item in items:
item["dayparts_meta"] = grouped.get(item["id"], [])
item["dayparts"] = [daypart["name"] for daypart in item["dayparts_meta"]]
item["primary_daypart_id"] = item["dayparts_meta"][0]["id"] if item["dayparts_meta"] else None
return items
def attach_components(items: list[dict]) -> list[dict]:
meal_ids = [item["id"] for item in items if item["kind"] == "meal"]
if not meal_ids:
for item in items:
item["components"] = []
item["component_ids"] = []
return items
placeholders = ",".join("?" for _ in meal_ids)
rows = get_db().execute(
f"""
SELECT meal_components.meal_item_id,
meal_components.food_item_id,
items.name
FROM meal_components
JOIN items ON items.id = meal_components.food_item_id
WHERE meal_components.meal_item_id IN ({placeholders})
AND items.household_id = ?
AND (items.visibility = 'shared' OR items.owner_user_id = ?)
ORDER BY LOWER(items.name)
""",
[*meal_ids, current_household_id(), g.user["id"]],
).fetchall()
grouped_names: dict[int, list[str]] = defaultdict(list)
grouped_ids: dict[int, list[int]] = defaultdict(list)
for row in rows:
grouped_names[row["meal_item_id"]].append(row["name"])
grouped_ids[row["meal_item_id"]].append(int(row["food_item_id"]))
for item in items:
item["components"] = grouped_names.get(item["id"], [])
item["component_ids"] = grouped_ids.get(item["id"], [])
return items
def attach_builder_keys(items: list[dict]) -> list[dict]:
if not items:
return []
category_builder_map = get_category_builder_map()
meal_ids = [item["id"] for item in items if item["kind"] == "meal"]
meal_builder_map: dict[int, set[str]] = defaultdict(set)
if meal_ids:
placeholders = ",".join("?" for _ in meal_ids)
rows = get_db().execute(
f"""
SELECT meal_components.meal_item_id,
component.category
FROM meal_components
JOIN items AS component ON component.id = meal_components.food_item_id
WHERE meal_components.meal_item_id IN ({placeholders})
""",
meal_ids,
).fetchall()
for row in rows:
builder_key = category_builder_map.get(row["category"] or "", "neutral")
meal_builder_map[int(row["meal_item_id"])].add(builder_key)
for item in items:
builder_keys: list[str]
if item["kind"] == "meal":
builder_keys = sorted(meal_builder_map.get(item["id"], set()))
if not builder_keys:
builder_keys = [category_builder_map.get(item.get("category") or "", "neutral")]
else:
builder_keys = [category_builder_map.get(item.get("category") or "", "neutral")]
item["builder_keys"] = builder_keys
item["builder_labels"] = [BUILDER_LABELS.get(key, BUILDER_LABELS["neutral"]) for key in builder_keys]
item["primary_builder_key"] = builder_keys[0] if builder_keys else "neutral"
return items
def decorate_items(rows) -> list[dict]:
return attach_builder_keys(attach_components(attach_dayparts(describe_records(rows))))
def fetch_builder_keys_for_item_ids(item_ids: list[int]) -> dict[int, set[str]]:
if not item_ids:
return {}
category_builder_map = get_category_builder_map()
placeholders = ",".join("?" for _ in item_ids)
rows = get_db().execute(
f"""
SELECT id, kind, category
FROM items
WHERE id IN ({placeholders})
""",
item_ids,
).fetchall()
builder_map: dict[int, set[str]] = {int(row["id"]): {category_builder_map.get(row["category"] or "", "neutral")} for row in rows}
meal_ids = [int(row["id"]) for row in rows if row["kind"] == "meal"]
if meal_ids:
meal_placeholders = ",".join("?" for _ in meal_ids)
component_rows = get_db().execute(
f"""
SELECT meal_components.meal_item_id, component.category
FROM meal_components
JOIN items AS component ON component.id = meal_components.food_item_id
WHERE meal_components.meal_item_id IN ({meal_placeholders})
""",
meal_ids,
).fetchall()
for row in component_rows:
builder_map.setdefault(int(row["meal_item_id"]), set()).add(
category_builder_map.get(row["category"] or "", "neutral")
)
return builder_map
def score_suggestion_components(component_items: list[dict], daypart_slug: str, settings: dict) -> int:
builder_keys = {key for item in component_items for key in item.get("builder_keys", ["neutral"])}
energy_values = [normalize_energy_density(item.get("energy_density"), "neutral") for item in component_items]
score = 0
style = settings.get("suggestion_style", "balanced")
if style == "fitness":
score += 9 if "protein" in builder_keys else 0
score += 4 if daypart_slug in {"lunch", "dinner"} and "veg" in builder_keys else 0
score += 2 if "carb" in builder_keys else 0
elif style == "protein":
score += 8 if "protein" in builder_keys else 0
score += 3 if daypart_slug in {"lunch", "dinner"} and "veg" in builder_keys else 0
else:
if daypart_slug in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"}:
score += 5 if "carb" in builder_keys else 0
score += 4 if builder_keys & {"dairy", "fruit", "nuts", "seeds"} else 0
else:
score += 5 if "protein" in builder_keys else 0
score += 4 if "carb" in builder_keys else 0
score += 4 if "veg" in builder_keys else 0
energy_preference = settings.get("energy_preference", "neutral")
if style == "fitness":
score += energy_values.count("low") * 4
score -= energy_values.count("high") * 2
elif energy_preference == "high":
score += energy_values.count("high") * 3
score -= energy_values.count("low")
elif energy_preference == "low":
score += energy_values.count("low") * 3
score -= energy_values.count("high")
else:
score += energy_values.count("neutral")
return score
def fetch_items(
*,
kind: str | None = None,
availability: str | None = None,
include_archived: bool = False,
query: str | None = None,
daypart_id: int | None = None,
visibility: str | None = None,
):
conditions = [visible_clause("items")]
params = visible_params()
if kind:
conditions.append("items.kind = ?")
params.append(kind)
if availability:
conditions.append("items.availability_state = ?")
params.append(availability)
elif not include_archived:
conditions.append("items.availability_state != 'archived'")
if query:
conditions.append("LOWER(items.name) LIKE ?")
params.append(f"%{query.lower()}%")
if daypart_id:
conditions.append(
"""
EXISTS (
SELECT 1
FROM item_dayparts
WHERE item_dayparts.item_id = items.id
AND item_dayparts.daypart_id = ?
)
"""
)
params.append(daypart_id)
if visibility:
conditions.append("items.visibility = ?")
params.append(visibility)
rows = get_db().execute(
f"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username,
EXISTS(
SELECT 1
FROM shopping_entries
WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0
) AS is_on_shopping_list
FROM items
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE {' AND '.join(conditions)}
ORDER BY
CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END,
CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END,
LOWER(items.name)
""",
params,
).fetchall()
return decorate_items(rows)
def fetch_food_options(query: str | None = None):
return fetch_items(kind="food", include_archived=True, query=query)
def group_items_by_availability(items: list[dict]) -> list[dict]:
grouped = defaultdict(list)
for item in items:
grouped[item["availability_state"]].append(item)
result = []
for state in ("home", "idea", "archived"):
entries = grouped.get(state, [])
if entries:
result.append({"state": state, "title": AVAILABILITY_LABELS[state], "items": entries})
return result
def extract_item_form_data(existing: dict | None = None) -> dict:
form_data = existing or {}
form_data.update(
{
"name": request.form.get("name", "").strip(),
"category": request.form.get("category", "").strip(),
"energy_density": normalize_energy_density(request.form.get("energy_density"), form_data.get("energy_density", "neutral")),
"note": request.form.get("note", "").strip(),
"visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")),
"target_user_id": normalize_target_user_id(request.form.get("target_user_id")),
"target_user_raw": request.form.get("target_user_id", TARGET_USER_OPTIONS_DEFAULT),
"food_search": request.form.get("food_search", "").strip(),
"daypart_ids": [int(value) for value in request.form.getlist("daypart_ids") if value.isdigit()],
"component_ids": [int(value) for value in request.form.getlist("component_ids") if value.isdigit()],
"quick_food_name": request.form.get("quick_food_name", "").strip(),
"quick_food_category": request.form.get("quick_food_category", "").strip(),
"quick_food_energy_density": normalize_energy_density(request.form.get("quick_food_energy_density"), form_data.get("quick_food_energy_density", "neutral")),
"quick_food_note": request.form.get("quick_food_note", "").strip(),
}
)
return form_data
def create_quick_food_from_form(form_data: dict) -> int:
cursor = get_db().execute(
"""
INSERT INTO items (
household_id, owner_user_id, target_user_id, visibility, kind, name, category, energy_density, note, created_by, updated_by
)
VALUES (?, ?, ?, ?, 'food', ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["target_user_id"],
form_data["visibility"],
form_data["quick_food_name"],
form_data["quick_food_category"],
form_data["quick_food_energy_density"],
form_data["quick_food_note"],
g.user["id"],
g.user["id"],
),
)
food_id = int(cursor.lastrowid)
sync_item_dayparts(food_id, form_data["daypart_ids"])
get_db().commit()
return food_id
def shopping_activation_date_for(needed_date: date) -> date:
settings = get_household_settings()
shopping_weekday = int(settings["shopping_weekday"])
prep_days = int(settings["shopping_prep_days"])
days_back = (needed_date.weekday() - shopping_weekday) % 7
shopping_date = needed_date - timedelta(days=days_back)
return shopping_date - timedelta(days=prep_days)
def should_activate_shopping_need(needed_date: date, today: date | None = None) -> bool:
return (today or date.today()) >= shopping_activation_date_for(needed_date)
def schedule_shopping_need(
*,
item_id: int,
user_id: int,
visibility: str,
needed_for_date: str,
needed_for_daypart_id: int | None,
source_item_id: int | None = None,
) -> bool:
activation_date = shopping_activation_date_for(parse_plan_date(needed_for_date))
existing = get_db().execute(
"""
SELECT id
FROM shopping_needs
WHERE household_id = ?
AND item_id = ?
AND COALESCE(source_item_id, 0) = COALESCE(?, 0)
AND needed_for_date = ?
AND COALESCE(needed_for_daypart_id, 0) = COALESCE(?, 0)
AND visibility = ?
AND is_activated = 0
LIMIT 1
""",
(
current_household_id(),
item_id,
source_item_id,
needed_for_date,
needed_for_daypart_id,
visibility,
),
).fetchone()
if existing:
get_db().execute(
"""
UPDATE shopping_needs
SET activation_date = ?,
owner_user_id = ?
WHERE id = ?
""",
(activation_date.isoformat(), user_id, existing["id"]),
)
else:
get_db().execute(
"""
INSERT INTO shopping_needs (
household_id,
owner_user_id,
visibility,
item_id,
source_item_id,
needed_for_date,
needed_for_daypart_id,
activation_date,
created_by
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
user_id,
visibility,
item_id,
source_item_id,
needed_for_date,
needed_for_daypart_id,
activation_date.isoformat(),
g.user["id"],
),
)
get_db().commit()
return True
def add_to_shopping_list(
item_id: int,
user_id: int,
*,
visibility_override: str | None = None,
needed_for_date: str | None = None,
needed_for_daypart_id: int | None = None,
) -> bool:
item = get_item(item_id)
existing = get_db().execute(
"""
SELECT id FROM shopping_entries
WHERE item_id = ? AND is_checked = 0
""",
(item_id,),
).fetchone()
if existing:
return False
visibility = normalize_visibility(visibility_override, item["visibility"])
owner_user_id = user_id if visibility == "personal" else item["owner_user_id"]
get_db().execute(
"""
INSERT INTO shopping_entries (
household_id, owner_user_id, visibility, item_id, added_by, needed_for_date, needed_for_daypart_id
)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
owner_user_id,
visibility,
item_id,
user_id,
needed_for_date,
needed_for_daypart_id,
),
)
get_db().commit()
return True
def fetch_meal_missing_components(meal_id: int) -> list[dict]:
rows = get_db().execute(
"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username,
0 AS is_on_shopping_list
FROM meal_components
JOIN items ON items.id = meal_components.food_item_id
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE meal_components.meal_item_id = ?
AND items.household_id = ?
AND (items.visibility = 'shared' OR items.owner_user_id = ?)
AND items.availability_state != 'home'
ORDER BY LOWER(items.name)
""",
(meal_id, current_household_id(), g.user["id"]),
).fetchall()
return attach_builder_keys(attach_dayparts(describe_records(rows)))
def ensure_item_or_missing_components_are_shopped(
item_id: int,
user_id: int,
visibility: str,
*,
needed_for_date: str | None = None,
needed_for_daypart_id: int | None = None,
source_item_id: int | None = None,
) -> dict:
item = get_item(item_id)
if item["kind"] == "meal":
missing_components = fetch_meal_missing_components(item_id)
if missing_components:
added_names = []
scheduled_names = []
for component in missing_components:
if needed_for_date and not should_activate_shopping_need(parse_plan_date(needed_for_date)):
schedule_shopping_need(
item_id=component["id"],
user_id=user_id,
visibility=visibility,
needed_for_date=needed_for_date,
needed_for_daypart_id=needed_for_daypart_id,
source_item_id=source_item_id or item_id,
)
scheduled_names.append(component["name"])
else:
added = add_to_shopping_list(
component["id"],
user_id,
visibility_override=visibility,
needed_for_date=needed_for_date,
needed_for_daypart_id=needed_for_daypart_id,
)
if added:
added_names.append(component["name"])
return {
"added": bool(added_names),
"count": len(added_names),
"names": added_names,
"scheduled_count": len(scheduled_names),
"scheduled_names": scheduled_names,
"used_components": True,
}
return {
"added": False,
"count": 0,
"names": [],
"scheduled_count": 0,
"scheduled_names": [],
"used_components": True,
}
if item["availability_state"] == "home":
return {
"added": False,
"count": 0,
"names": [],
"scheduled_count": 0,
"scheduled_names": [],
"used_components": False,
}
if needed_for_date and not should_activate_shopping_need(parse_plan_date(needed_for_date)):
schedule_shopping_need(
item_id=item_id,
user_id=user_id,
visibility=visibility,
needed_for_date=needed_for_date,
needed_for_daypart_id=needed_for_daypart_id,
source_item_id=source_item_id,
)
return {
"added": False,
"count": 0,
"names": [],
"scheduled_count": 1,
"scheduled_names": [item["name"]],
"used_components": False,
}
added = add_to_shopping_list(
item_id,
user_id,
visibility_override=visibility,
needed_for_date=needed_for_date,
needed_for_daypart_id=needed_for_daypart_id,
)
return {
"added": added,
"count": 1 if added else 0,
"names": [item["name"]] if added else [],
"scheduled_count": 0,
"scheduled_names": [],
"used_components": False,
}
def sync_item_dayparts(item_id: int, daypart_ids: list[int]) -> None:
get_db().execute("DELETE FROM item_dayparts WHERE item_id = ?", (item_id,))
for daypart_id in daypart_ids:
get_db().execute(
"INSERT INTO item_dayparts (item_id, daypart_id) VALUES (?, ?)",
(item_id, daypart_id),
)
def sync_meal_components(meal_id: int, food_ids: list[int]) -> None:
get_db().execute("DELETE FROM meal_components WHERE meal_item_id = ?", (meal_id,))
visible_foods = {
row["id"]
for row in get_db().execute(
f"""
SELECT items.id
FROM items
WHERE items.kind = 'food' AND {visible_clause('items')}
""",
visible_params(),
).fetchall()
}
for food_id in food_ids:
if food_id in visible_foods:
get_db().execute(
"INSERT INTO meal_components (meal_item_id, food_item_id) VALUES (?, ?)",
(meal_id, food_id),
)
def fetch_shopping_entries():
rows = get_db().execute(
f"""
SELECT shopping_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username,
dayparts.name AS needed_daypart_name
FROM shopping_entries
JOIN items ON items.id = shopping_entries.item_id
LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
LEFT JOIN dayparts ON dayparts.id = shopping_entries.needed_for_daypart_id
WHERE shopping_entries.is_checked = 0 AND {visible_clause('shopping_entries')}
ORDER BY
CASE shopping_entries.visibility WHEN 'shared' THEN 0 ELSE 1 END,
shopping_entries.added_at DESC
""",
visible_params(),
).fetchall()
entries = describe_records(rows)
for entry in entries:
if entry.get("needed_for_date"):
try:
parsed = datetime.strptime(entry["needed_for_date"], "%Y-%m-%d").date()
entry["needed_for_label"] = parsed.strftime("%d.%m.%Y")
except ValueError:
entry["needed_for_label"] = entry["needed_for_date"]
else:
entry["needed_for_label"] = None
return entries
def activate_due_shopping_needs(today: date | None = None) -> int:
today = today or date.today()
rows = get_db().execute(
"""
SELECT shopping_needs.*,
items.availability_state
FROM shopping_needs
JOIN items ON items.id = shopping_needs.item_id
WHERE shopping_needs.household_id = ?
AND shopping_needs.is_activated = 0
AND shopping_needs.activation_date <= ?
ORDER BY shopping_needs.activation_date, shopping_needs.needed_for_date
""",
(current_household_id(), today.isoformat()),
).fetchall()
activated = 0
for row in rows:
if row["availability_state"] != "home":
was_added = add_to_shopping_list(
int(row["item_id"]),
int(row["owner_user_id"] or g.user["id"]),
visibility_override=row["visibility"],
needed_for_date=row["needed_for_date"],
needed_for_daypart_id=row["needed_for_daypart_id"],
)
activated += 1 if was_added else 0
get_db().execute(
"UPDATE shopping_needs SET is_activated = 1, activated_at = CURRENT_TIMESTAMP WHERE id = ?",
(row["id"],),
)
if rows:
get_db().commit()
return activated
def fetch_upcoming_shopping_needs(limit: int | None = None) -> list[dict]:
query = f"""
SELECT shopping_needs.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username,
dayparts.name AS needed_daypart_name
FROM shopping_needs
JOIN items ON items.id = shopping_needs.item_id
LEFT JOIN users AS owner ON owner.id = shopping_needs.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
LEFT JOIN dayparts ON dayparts.id = shopping_needs.needed_for_daypart_id
WHERE shopping_needs.household_id = ?
AND shopping_needs.is_activated = 0
AND (shopping_needs.visibility = 'shared' OR shopping_needs.owner_user_id = ?)
AND items.availability_state != 'home'
ORDER BY shopping_needs.activation_date, shopping_needs.needed_for_date, LOWER(items.name)
"""
params: list[object] = [current_household_id(), g.user["id"]]
if limit:
query += " LIMIT ?"
params.append(limit)
rows = get_db().execute(query, params).fetchall()
entries = describe_records(rows)
for entry in entries:
try:
entry["needed_for_label"] = datetime.strptime(entry["needed_for_date"], "%Y-%m-%d").date().strftime("%d.%m.%Y")
entry["activation_label"] = datetime.strptime(entry["activation_date"], "%Y-%m-%d").date().strftime("%d.%m.%Y")
except ValueError:
entry["needed_for_label"] = entry["needed_for_date"]
entry["activation_label"] = entry["activation_date"]
return entries
def fetch_plan_entries_for_range(start_date: date, end_date: date):
rows = get_db().execute(
f"""
SELECT plan_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
dayparts.name AS daypart_name,
dayparts.slug AS daypart_slug,
dayparts.sort_order,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE plan_date BETWEEN ? AND ? AND {visible_clause('plan_entries')}
ORDER BY plan_date, dayparts.sort_order, items.name
""",
[start_date.isoformat(), end_date.isoformat(), *visible_params()],
).fetchall()
grouped = defaultdict(list)
for row in describe_records(rows):
grouped[(row["plan_date"], row["daypart_id"])].append(row)
return grouped
def fetch_day_plan_entries(selected_date: date):
return fetch_plan_entries_for_range(selected_date, selected_date)
def fetch_recent_plan_items(daypart_id: int, limit: int = 6):
rows = get_db().execute(
f"""
SELECT DISTINCT items.id,
items.household_id,
items.owner_user_id,
items.target_user_id,
items.visibility,
items.name,
items.kind,
items.category,
items.note,
items.photo_filename,
items.availability_state,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE plan_entries.daypart_id = ? AND {visible_clause('items')}
ORDER BY plan_entries.created_at DESC
LIMIT ?
""",
[daypart_id, *visible_params(), limit * 3],
).fetchall()
return decorate_items(rows)
def fetch_plan_candidates(daypart_id: int, query: str | None = None):
params = [daypart_id, *visible_params()]
conditions = [visible_clause("items"), "items.availability_state != 'archived'"]
if query:
conditions.append("LOWER(items.name) LIKE ?")
params.append(f"%{query.lower()}%")
rows = get_db().execute(
f"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username,
EXISTS(
SELECT 1
FROM item_dayparts
WHERE item_dayparts.item_id = items.id AND item_dayparts.daypart_id = ?
) AS matches_daypart
FROM items
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE {' AND '.join(conditions)}
ORDER BY
CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END,
matches_daypart DESC,
CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END,
LOWER(items.name)
""",
params,
).fetchall()
return decorate_items(rows)
def fetch_home_food_ids() -> set[int]:
rows = get_db().execute(
f"""
SELECT id
FROM items
WHERE kind = 'food' AND availability_state = 'home' AND {visible_clause('items')}
""",
visible_params(),
).fetchall()
return {int(row["id"]) for row in rows}
def get_daypart_by_id(daypart_id: int):
for daypart in get_dayparts():
if int(daypart["id"]) == int(daypart_id):
return daypart
return None
def format_item_names(items: list[dict], limit: int = 3) -> str:
return ", ".join(item["name"] for item in items[:limit])
def item_matches_daypart(item: dict, daypart_id: int | None) -> bool:
if daypart_id is None:
return True
dayparts_meta = item.get("dayparts_meta") or []
if not dayparts_meta:
return True
return any(int(daypart["id"]) == int(daypart_id) for daypart in dayparts_meta)
def normalized_component_signature(component_ids: list[int]) -> tuple[int, ...]:
return tuple(sorted({int(component_id) for component_id in component_ids}))
def generated_suggestion_key(component_ids: list[int]) -> str:
signature = normalized_component_signature(component_ids)
return "generated:" + "-".join(str(component_id) for component_id in signature)
def fetch_hidden_generated_suggestion_keys() -> set[str]:
rows = get_db().execute(
"""
SELECT suggestion_key
FROM hidden_generated_suggestions
WHERE user_id = ?
""",
(g.user["id"],),
).fetchall()
return {row["suggestion_key"] for row in rows}
def build_generated_meal_name(combo: list[dict], daypart_slug: str) -> str:
names = [item["name"] for item in combo]
if daypart_slug in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"} and len(names) >= 2:
return f"{names[0]} mit {', '.join(names[1:])}"
if len(names) >= 2:
return f"{names[0]} mit {', '.join(names[1:])}"
return names[0]
def build_dynamic_meal_suggestions(home_foods: list[dict], daypart_slug: str, limit: int) -> list[dict]:
if daypart_slug in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"}:
target_patterns = [
{
"slots": ({"carb"}, {"dairy", "protein"}, {"fruit", "nuts", "seeds"}),
"reason": "Passt gut zu Frühstück oder Snack",
},
{
"slots": ({"carb"}, {"dairy", "protein"}),
"reason": "Zuhause schnell kombinierbar",
},
{
"slots": ({"dairy", "protein"}, {"fruit", "nuts", "seeds"}),
"reason": "Lässt sich gut als kleiner Snack vormerken",
},
]
else:
target_patterns = [
{
"slots": ({"protein"}, {"carb"}, {"veg"}),
"reason": "Zuhause als vollständige Mahlzeit möglich",
},
{
"slots": ({"protein"}, {"carb"}),
"reason": "Lässt sich leicht ergänzen",
},
{
"slots": ({"protein"}, {"veg"}),
"reason": "Zuhause schon gut kombinierbar",
},
{
"slots": ({"carb"}, {"veg"}),
"reason": "Daraus kann schnell etwas Einfaches werden",
},
]
suggestions: list[dict] = []
seen_signatures: set[tuple[int, ...]] = set()
def slot_matches(food: dict, slot_keys: set[str]) -> bool:
return bool(slot_keys & set(food.get("builder_keys", ["neutral"])))
for pattern in target_patterns:
slot_candidates = []
for slot_keys in pattern["slots"]:
matches = [food for food in home_foods if slot_matches(food, slot_keys)]
if not matches:
slot_candidates = []
break
slot_candidates.append(matches)
if not slot_candidates:
continue
for combo in product(*slot_candidates):
signature = normalized_component_signature([item["id"] for item in combo])
if len(signature) != len(combo) or signature in seen_signatures:
continue
seen_signatures.add(signature)
combo_items = list(combo)
suggestions.append(
{
"title": build_generated_meal_name(combo_items, daypart_slug),
"reason": pattern["reason"],
"component_ids": [item["id"] for item in combo_items],
"existing_item_id": None,
"visibility": "shared",
"daypart_id": None,
"missing_component_ids": [],
"missing_components": [],
"needs_shopping": False,
"is_generated": True,
"suggestion_key": generated_suggestion_key([item["id"] for item in combo_items]),
}
)
if len(suggestions) >= limit * 3:
break
if len(suggestions) >= limit * 3:
break
return suggestions
def build_home_recipe_suggestions(daypart_id: int | None = None, limit: int = 4) -> list[dict]:
settings = get_user_settings()
daypart_slug = (get_daypart_by_id(daypart_id)["slug"] if daypart_id and get_daypart_by_id(daypart_id) else "")
hidden_keys = fetch_hidden_generated_suggestion_keys()
home_foods = [
item
for item in fetch_items(kind="food", availability="home")
if item_matches_daypart(item, daypart_id)
]
home_food_ids = {item["id"] for item in home_foods}
home_food_map = {int(item["id"]): item for item in home_foods}
visible_foods = [
item
for item in fetch_items(kind="food", include_archived=False)
if item_matches_daypart(item, daypart_id)
]
visible_food_map = {int(item["id"]): item for item in visible_foods}
suggestions: list[dict] = []
seen_signatures: set[tuple[int, ...]] = set()
meals = [item for item in fetch_items(kind="meal") if item_matches_daypart(item, daypart_id)]
for meal in meals:
if not meal["component_ids"]:
continue
component_ids = [int(component_id) for component_id in meal["component_ids"]]
if not all(component_id in visible_food_map for component_id in component_ids):
continue
signature = normalized_component_signature(component_ids)
if signature in seen_signatures:
continue
component_items = [visible_food_map[component_id] for component_id in component_ids]
available_items = [home_food_map[component_id] for component_id in component_ids if component_id in home_food_map]
missing_items = [visible_food_map[component_id] for component_id in component_ids if component_id not in home_food_ids]
if not available_items:
continue
if missing_items and len(missing_items) > 2:
continue
seen_signatures.add(signature)
if missing_items:
missing_names = [item["name"] for item in missing_items]
suggestions.append(
{
"title": meal["name"],
"reason": f"Es fehlt noch: {', '.join(missing_names)}",
"component_ids": component_ids,
"existing_item_id": meal["id"],
"visibility": meal["visibility"],
"daypart_id": daypart_id or meal.get("primary_daypart_id"),
"missing_component_ids": [item["id"] for item in missing_items],
"missing_components": missing_names,
"needs_shopping": True,
"is_generated": False,
"suggestion_key": None,
"score": score_suggestion_components(available_items, daypart_slug=daypart_slug, settings=settings) + 18 - (len(missing_items) * 4),
}
)
else:
suggestions.append(
{
"title": meal["name"],
"reason": "Zuhause vorhanden",
"component_ids": component_ids,
"existing_item_id": meal["id"],
"visibility": meal["visibility"],
"daypart_id": daypart_id or meal.get("primary_daypart_id"),
"missing_component_ids": [],
"missing_components": [],
"needs_shopping": False,
"is_generated": False,
"suggestion_key": None,
"score": score_suggestion_components(component_items, daypart_slug=daypart_slug, settings=settings) + 40,
}
)
for suggestion in build_dynamic_meal_suggestions(home_foods, daypart_slug, limit=limit * 2):
signature = normalized_component_signature(suggestion["component_ids"])
if signature in seen_signatures or suggestion["suggestion_key"] in hidden_keys:
continue
seen_signatures.add(signature)
component_items = [home_food_map[component_id] for component_id in suggestion["component_ids"] if component_id in home_food_map]
suggestion["score"] = score_suggestion_components(component_items, daypart_slug=daypart_slug, settings=settings)
suggestions.append(suggestion)
deduped: list[dict] = []
seen = set()
ranked_suggestions = sorted(
suggestions,
key=lambda suggestion: (
-int(suggestion.get("score", 0)),
0 if suggestion.get("existing_item_id") else 1,
suggestion["title"].lower(),
),
)
for suggestion in ranked_suggestions:
if suggestion["title"] in seen:
continue
seen.add(suggestion["title"])
deduped.append(suggestion)
if len(deduped) >= limit:
break
return deduped
def build_balance_suggestion(daypart_id: int, item_ids: list[int]) -> dict | None:
settings = get_user_settings()
if not (settings.get("reminders_enabled") and settings.get("show_meal_balancing")):
return None
daypart = get_daypart_by_id(daypart_id)
if not daypart or daypart["slug"] not in {"lunch", "dinner"}:
return None
builder_map = fetch_builder_keys_for_item_ids(item_ids)
present = set()
for keys in builder_map.values():
present.update(keys)
target_order = ["protein", "carb", "veg"]
missing = [key for key in target_order if key not in present]
if not missing:
return None
first_missing = missing[0]
home_matches = [
item for item in fetch_items(kind="food", availability="home", daypart_id=daypart_id)
if first_missing in item.get("builder_keys", [])
]
home_matches = sorted(
home_matches,
key=lambda item: -score_suggestion_components([item], daypart["slug"], settings),
)
text_map = {
"protein": "Dazu könnte noch eine Proteinquelle gut passen.",
"carb": "Das lässt sich gut mit einer Kohlenhydratquelle ergänzen.",
"veg": "Dazu könnte noch etwas Gemüse gut passen.",
}
return {
"text": text_map.get(first_missing, "Dazu könnte noch etwas Kleines gut passen."),
"items": home_matches[:3],
}
def build_daypart_suggestions(daypart_id: int) -> list[dict]:
settings = get_user_settings()
if not settings.get("suggest_home_for_today"):
return []
suggestions = build_home_recipe_suggestions(daypart_id, limit=3)
if suggestions:
return suggestions
archived_items = fetch_items(availability="archived", include_archived=True, daypart_id=daypart_id)
return [
{
"title": item["name"],
"reason": "Für später vormerken",
"component_ids": [],
"existing_item_id": item["id"] if item["kind"] == "meal" else None,
"visibility": item["visibility"],
"daypart_id": daypart_id,
"missing_component_ids": [],
"missing_components": [],
"needs_shopping": False,
"is_generated": False,
"suggestion_key": None,
}
for item in archived_items[:2]
]
def build_dashboard_hints(today: date) -> list[str]:
settings = get_user_settings()
if not settings.get("reminders_enabled"):
return []
hints: list[str] = []
tomorrow = today + timedelta(days=1)
household_settings = get_household_settings()
if settings.get("remind_tomorrow_if_sparse"):
breakfast = get_db().execute(
f"""
SELECT COUNT(*) AS count
FROM plan_entries
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
WHERE plan_entries.plan_date = ? AND dayparts.slug = 'breakfast' AND {visible_clause('plan_entries')}
""",
[tomorrow.isoformat(), *visible_params()],
).fetchone()
if int(breakfast["count"]) == 0:
hints.append("Für morgen ist noch kein Frühstück eingeplant.")
if settings.get("suggest_home_for_today"):
dinner_home = get_db().execute(
f"""
SELECT COUNT(*) AS count
FROM items
JOIN item_dayparts ON item_dayparts.item_id = items.id
JOIN dayparts ON dayparts.id = item_dayparts.daypart_id
WHERE items.availability_state = 'home'
AND dayparts.slug = 'dinner'
AND {visible_clause('items')}
""",
visible_params(),
).fetchone()
if int(dinner_home["count"]) > 0:
hints.append("Zuhause ist bereits etwas da, das gut zu Abendessen passt.")
if settings.get("remind_before_shopping") and (today + timedelta(days=1)).weekday() == household_settings["shopping_weekday"]:
upcoming = fetch_upcoming_shopping_needs(limit=3)
if upcoming:
hints.append("Für den nächsten Einkauf sind schon ein paar Dinge vorgemerkt.")
if settings.get("remind_nuts"):
nut_items = [item for item in fetch_items(kind="food", availability="home") if {"nuts", "seeds"} & set(item.get("builder_keys", []))]
if nut_items:
hints.append("Heute schon an Nüsse gedacht?")
if settings.get("suggest_templates"):
old_template = get_db().execute(
f"""
SELECT name
FROM day_templates
WHERE {visible_clause('day_templates')}
AND last_used_at IS NOT NULL
AND DATE(last_used_at) <= DATE('now', '-21 day')
ORDER BY last_used_at ASC
LIMIT 1
""",
visible_params(),
).fetchone()
if old_template:
hints.append(f"{old_template['name']}“ könnte diese Woche wieder passen.")
return hints[:4]
def build_setup_checklist(today: date) -> list[dict]:
total_items = int(
get_db().execute(
f"SELECT COUNT(*) AS count FROM items WHERE {visible_clause('items')}",
visible_params(),
).fetchone()["count"]
)
meal_count = int(
get_db().execute(
f"SELECT COUNT(*) AS count FROM items WHERE kind = 'meal' AND {visible_clause('items')}",
visible_params(),
).fetchone()["count"]
)
week_end = today + timedelta(days=6)
plan_count = int(
get_db().execute(
f"""
SELECT COUNT(*) AS count
FROM plan_entries
WHERE plan_date BETWEEN ? AND ? AND {visible_clause('plan_entries')}
""",
[today.isoformat(), week_end.isoformat(), *visible_params()],
).fetchone()["count"]
)
template_count = int(
get_db().execute(
f"SELECT COUNT(*) AS count FROM day_templates WHERE {visible_clause('day_templates')}",
visible_params(),
).fetchone()["count"]
)
checklist = []
if total_items == 0:
checklist.append(
{
"title": "Fang mit einem ersten Lebensmittel an",
"text": "Ein kleines Frühstück, ein Snack oder etwas für zuhause reicht völlig für den Start.",
"url": url_for("main.item_create", kind="food"),
"label": "Lebensmittel anlegen",
}
)
if total_items > 0 and meal_count == 0:
checklist.append(
{
"title": "Lege eine erste Mahlzeitenidee an",
"text": "Einfach zwei oder drei vertraute Dinge zusammenklicken und für später merken.",
"url": url_for("main.item_create", kind="meal"),
"label": "Mahlzeit anlegen",
}
)
if plan_count == 0:
checklist.append(
{
"title": "Plane einen ruhigen ersten Tag",
"text": "Mit einem kleinen Eintrag für Frühstück oder Abendessen fühlt sich die Woche sofort greifbarer an.",
"url": url_for("main.planner_day", date=today.isoformat()),
"label": "Tag öffnen",
}
)
if total_items > 0 and template_count == 0:
checklist.append(
{
"title": "Merke dir einen gelungenen Tag als Vorlage",
"text": "So wird Wiederverwendung später noch leichter.",
"url": url_for("main.template_library"),
"label": "Vorlagen ansehen",
}
)
return checklist[:3]
def build_day_hints(selected_date: date) -> list[str]:
settings = get_user_settings()
if not settings.get("reminders_enabled"):
return []
hints: list[str] = []
if settings.get("remind_tomorrow_if_sparse"):
breakfast_count = get_db().execute(
f"""
SELECT COUNT(*) AS count
FROM plan_entries
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
WHERE plan_entries.plan_date = ? AND dayparts.slug = 'breakfast' AND {visible_clause('plan_entries')}
""",
[selected_date.isoformat(), *visible_params()],
).fetchone()
if int(breakfast_count["count"]) == 0:
hints.append("Für diesen Tag ist noch kein Frühstück eingeplant.")
if settings.get("remind_small_snack"):
afternoon_options = get_db().execute(
f"""
SELECT COUNT(*) AS count
FROM items
JOIN item_dayparts ON item_dayparts.item_id = items.id
JOIN dayparts ON dayparts.id = item_dayparts.daypart_id
WHERE items.availability_state != 'archived'
AND dayparts.slug = 'afternoon-snack'
AND {visible_clause('items')}
""",
visible_params(),
).fetchone()
if int(afternoon_options["count"]) < 2:
hints.append("Für den Nachmittag wäre noch etwas Kleines möglich.")
if settings.get("show_planned_not_shopped"):
pending_for_day = fetch_upcoming_shopping_needs(limit=20)
pending_for_day = [entry for entry in pending_for_day if entry["needed_for_date"] == selected_date.isoformat()]
if pending_for_day:
hints.append("Ein paar Dinge sind für diesen Tag schon vorgemerkt, aber noch nicht auf der Einkaufsliste.")
return hints[:4]
def build_week_hints(week_start: date) -> list[str]:
settings = get_user_settings()
if not settings.get("reminders_enabled"):
return []
hints: list[str] = []
week_end = week_start + timedelta(days=6)
if settings.get("remind_week_if_sparse"):
breakfast_days = get_db().execute(
f"""
SELECT COUNT(DISTINCT plan_entries.plan_date) AS count
FROM plan_entries
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
WHERE plan_entries.plan_date BETWEEN ? AND ?
AND dayparts.slug = 'breakfast'
AND {visible_clause('plan_entries')}
""",
[week_start.isoformat(), week_end.isoformat(), *visible_params()],
).fetchone()
missing_breakfasts = 7 - int(breakfast_days["count"])
if missing_breakfasts > 0:
hints.append(f"Für diese Woche sind noch {missing_breakfasts} Tage ohne Frühstück eingeplant.")
if settings.get("show_missing_for_upcoming_week"):
due_entries = [entry for entry in fetch_upcoming_shopping_needs(limit=20) if week_start.isoformat() <= entry["needed_for_date"] <= week_end.isoformat()]
if due_entries:
hints.append("Für diese Woche fehlt noch etwas, das später zum Einkauf dazukommen kann.")
if settings.get("remind_on_shopping_day") and week_start.weekday() <= get_household_settings()["shopping_weekday"] <= week_end.weekday():
hints.append("Der Einkaufstag liegt in dieser Woche schon bereit im Blick.")
return hints[:4]
def build_home_sections(items: list[dict], dayparts: list, selected_daypart_id: int | None):
sections = []
if selected_daypart_id:
selected_daypart = next((daypart for daypart in dayparts if daypart["id"] == selected_daypart_id), None)
matching_items = [item for item in items if any(dp["id"] == selected_daypart_id for dp in item["dayparts_meta"])]
sections.append(
{
"title": selected_daypart["name"] if selected_daypart else "Ausgewählte Tageszeit",
"items": matching_items,
}
)
return sections
for daypart in dayparts:
matching_items = [item for item in items if any(dp["id"] == daypart["id"] for dp in item["dayparts_meta"])]
sections.append({"title": daypart["name"], "items": matching_items})
anytime_items = [item for item in items if not item["dayparts_meta"]]
if anytime_items:
sections.append({"title": "Ohne feste Tageszeit", "items": anytime_items})
return sections
def dedupe_items(items: list[dict], limit: int = 6) -> list[dict]:
result = []
seen_ids = set()
for item in items:
if item["id"] in seen_ids:
continue
seen_ids.add(item["id"])
result.append(item)
if len(result) >= limit:
break
return result
def build_selected_quick_action(
*,
daypart_id: int,
selected_item_id: int | None,
selected_meal_name: str,
selected_component_ids: list[int],
candidates: list[dict],
) -> dict | None:
if selected_item_id:
selected_item = next((item for item in candidates if int(item["id"]) == int(selected_item_id)), None)
if selected_item is None:
try:
selected_item = get_item(selected_item_id)
except ValueError:
selected_item = None
if selected_item is not None:
return {
"type": "existing",
"title": selected_item["name"],
"subtitle": "Ausgewählt. Du kannst es jetzt direkt eintragen.",
"item_id": int(selected_item["id"]),
"visibility": selected_item["visibility"],
"daypart_id": daypart_id,
}
if selected_meal_name and selected_component_ids:
return {
"type": "generated",
"title": selected_meal_name,
"subtitle": "Ausgewählt aus dem, was zuhause gut passt.",
"component_ids": selected_component_ids,
"visibility": "shared",
"daypart_id": daypart_id,
}
return None
def build_day_planner_sections(
selected_date: date,
selected_item_id: int | None,
selected_daypart_id: int | None,
selected_meal_name: str = "",
selected_component_ids: list[int] | None = None,
):
selected_component_ids = selected_component_ids or []
sections = []
day_entries = fetch_day_plan_entries(selected_date)
for daypart in get_dayparts():
candidates = fetch_plan_candidates(daypart["id"])
entries = day_entries.get((selected_date.isoformat(), daypart["id"]), [])
meal_candidates = dedupe_items(
[item for item in candidates if item["kind"] == "meal" and item["availability_state"] == "home"]
+ [item for item in candidates if item["kind"] == "meal"],
limit=6,
)
food_candidates = dedupe_items(
[item for item in candidates if item["kind"] == "food" and item["availability_state"] == "home"]
+ fetch_recent_plan_items(daypart["id"])
+ [item for item in candidates if item["kind"] == "food"],
limit=20,
)
search_candidates = dedupe_items(meal_candidates + food_candidates, limit=24)
entry_item_ids = [int(entry["item_id"]) for entry in entries]
sections.append(
{
"daypart": daypart,
"entries": entries,
"candidates": candidates,
"meal_candidates": meal_candidates,
"food_candidates": food_candidates,
"search_candidates": search_candidates,
"recipe_suggestions": build_home_recipe_suggestions(int(daypart["id"]), limit=3),
"suggestions": build_daypart_suggestions(daypart["id"]),
"balance_suggestion": build_balance_suggestion(int(daypart["id"]), entry_item_ids),
"selected_item_id": selected_item_id if selected_daypart_id == daypart["id"] else None,
"selected_quick_action": build_selected_quick_action(
daypart_id=int(daypart["id"]),
selected_item_id=selected_item_id if selected_daypart_id == daypart["id"] else None,
selected_meal_name=selected_meal_name if selected_daypart_id == daypart["id"] else "",
selected_component_ids=selected_component_ids if selected_daypart_id == daypart["id"] else [],
candidates=candidates,
),
"is_open": selected_daypart_id == daypart["id"],
"is_primary_daypart": daypart["slug"] in PRIMARY_DAYPART_SLUGS,
"is_snack_daypart": daypart["slug"] in SNACK_DAYPART_SLUGS,
"visible_by_default": daypart["slug"] in PRIMARY_DAYPART_SLUGS or bool(entries) or selected_daypart_id == daypart["id"],
"summary_items": [entry["item_name"] for entry in entries][:2],
"default_visibility": "shared",
}
)
return sections
def build_template_day_sections(selected_map: dict[int, list[int]] | None = None):
selected_map = selected_map or {}
sections = []
for daypart in get_dayparts():
candidates = fetch_plan_candidates(int(daypart["id"]))
quick_items = dedupe_items(
[item for item in candidates if item["availability_state"] == "home"] + candidates,
limit=10,
)
quick_ids = {item["id"] for item in quick_items}
sections.append(
{
"daypart": daypart,
"candidates": candidates,
"quick_items": quick_items,
"list_items": [item for item in candidates if item["id"] not in quick_ids],
"selected_ids": selected_map.get(int(daypart["id"]), []),
}
)
return sections
def fetch_week_cards(week_start: date):
days = [week_start + timedelta(days=index) for index in range(7)]
week_end = week_start + timedelta(days=6)
grouped_entries = fetch_plan_entries_for_range(week_start, week_start + timedelta(days=6))
picker_map = {}
for daypart in get_dayparts():
candidates = fetch_plan_candidates(int(daypart["id"]))
meal_candidates = dedupe_items(
[item for item in candidates if item["kind"] == "meal" and item["availability_state"] == "home"]
+ [item for item in candidates if item["kind"] == "meal"],
limit=4,
)
picker_map[int(daypart["id"])] = {
"meal_candidates": meal_candidates,
"recipe_suggestions": build_home_recipe_suggestions(int(daypart["id"]), limit=3),
}
cards = []
for current_day in days:
filled_dayparts = []
planned_count = 0
preview_items = []
slots = []
for daypart in get_dayparts():
slot_entries = grouped_entries.get((current_day.isoformat(), daypart["id"]), [])
is_snack_daypart = daypart["slug"] in SNACK_DAYPART_SLUGS
visible_by_default = (not is_snack_daypart) or bool(slot_entries)
slots.append(
{
"daypart": dict(daypart),
"entries": slot_entries,
"copy_allowed": bool(slot_entries) and current_day < week_end,
"picker": picker_map.get(int(daypart["id"]), {"meal_candidates": [], "recipe_suggestions": []}),
"is_snack_daypart": is_snack_daypart,
"visible_by_default": visible_by_default,
}
)
if slot_entries:
filled_dayparts.append({"id": daypart["id"], "name": daypart["name"], "count": len(slot_entries)})
planned_count += len(slot_entries)
preview_items.extend(entry["item_name"] for entry in slot_entries[:2])
hidden_snack_slots = [
{"id": int(slot["daypart"]["id"]), "name": slot["daypart"]["name"]}
for slot in slots
if slot["is_snack_daypart"] and not slot["visible_by_default"]
]
cards.append(
{
"date": current_day,
"filled_dayparts": filled_dayparts,
"planned_count": planned_count,
"preview_items": preview_items[:4],
"slots": slots,
"hidden_snack_slots": hidden_snack_slots,
}
)
return cards
def format_week_pdf_entry(entry: dict) -> str:
return entry["item_name"]
def normalize_pdf_export_mode(raw: str | None) -> str:
return "household" if raw == "household" else "mine"
def fetch_plan_entries_for_range_export(start_date: date, end_date: date, *, mode: str):
params: list[object] = [start_date.isoformat(), end_date.isoformat()]
if mode == "household":
where_clause = "plan_entries.household_id = ?"
params.append(current_household_id())
else:
where_clause = visible_clause("plan_entries")
params.extend(visible_params())
rows = get_db().execute(
f"""
SELECT plan_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
dayparts.name AS daypart_name,
dayparts.slug AS daypart_slug,
dayparts.sort_order,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
target.display_name AS target_display_name,
target.username AS target_username
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
LEFT JOIN users AS target ON target.id = items.target_user_id
WHERE plan_date BETWEEN ? AND ? AND {where_clause}
ORDER BY plan_date, dayparts.sort_order, items.name
""",
params,
).fetchall()
grouped = defaultdict(list)
for row in describe_records(rows):
grouped[(row["plan_date"], row["daypart_id"])].append(row)
return grouped
def format_week_pdf_entry(entry: dict, *, mode: str) -> str:
label = entry["item_name"]
if mode == "household":
if entry.get("target_name"):
return f"{label} (Für {entry['target_name']})"
if entry.get("is_personal"):
return f"{label} (Für {entry['owner_name']})"
return f"{label} (Für alle)"
if entry.get("is_shared"):
return f"{label} (gemeinsam)"
return label
def build_week_pdf_rows(week_start: date, *, mode: str) -> tuple[list, list[list[str]]]:
days = [week_start + timedelta(days=index) for index in range(7)]
grouped_entries = fetch_plan_entries_for_range_export(week_start, week_start + timedelta(days=6), mode=mode)
rows: list[list[str]] = []
dayparts = get_dayparts()
visible_dayparts = []
for daypart in dayparts:
row_cells: list[str] = []
has_content = False
row_label = PDF_DAYPART_LABELS.get(daypart["slug"], daypart["name"])
row = [daypart["name"]]
for current_day in days:
entries = grouped_entries.get((current_day.isoformat(), daypart["id"]), [])
cell_value = "\n".join(format_week_pdf_entry(entry, mode=mode) for entry in entries)
row_cells.append(cell_value)
has_content = has_content or bool(cell_value.strip())
if daypart["slug"] in PRIMARY_DAYPART_SLUGS or has_content:
visible_dayparts.append([row_label, *row_cells])
return days, visible_dayparts
def build_week_plan_pdf(week_start: date, *, mode: str = "mine") -> bytes:
try:
from fpdf import FPDF
from fpdf.fonts import FontFace
except ImportError as exc: # pragma: no cover - depends on optional package in local env
raise RuntimeError("Für den PDF-Export fehlt noch die Abhängigkeit aus der requirements.txt.") from exc
mode = normalize_pdf_export_mode(mode)
week_end = week_start + timedelta(days=6)
week_number = week_start.isocalendar().week
days, rows = build_week_pdf_rows(week_start, mode=mode)
plan_label = "Mein Essensplan" if mode == "mine" else "Unser Essensplan"
pdf = FPDF(orientation="L", unit="mm", format="A4")
pdf.set_auto_page_break(auto=True, margin=14)
pdf.set_margins(left=14, top=14, right=14)
pdf.add_page()
pdf.set_title(f"{plan_label} KW {week_number:02d}")
pdf.set_author("Nouri")
pdf.set_creator("Nouri")
pdf.set_font("Helvetica", "B", 18)
pdf.cell(0, 9, f"{plan_label} vom {week_start.strftime('%d.%m.%Y')} bis {week_end.strftime('%d.%m.%Y')}", new_x="LMARGIN", new_y="NEXT")
pdf.set_font("Helvetica", "", 11)
pdf.set_text_color(82, 82, 82)
pdf.cell(0, 6, f"KW {week_number:02d}", new_x="LMARGIN", new_y="NEXT")
pdf.ln(3)
pdf.set_text_color(20, 20, 20)
headings = [" "] + [f"{format_weekday(day)}\n{day.strftime('%d.%m.%Y')}" for day in days]
first_column_width = 34
remaining_width = pdf.w - pdf.l_margin - pdf.r_margin - first_column_width
day_column_width = remaining_width / 7
column_widths = (first_column_width, *([day_column_width] * 7))
header_style = FontFace(emphasis="B", fill_color=(240, 240, 240))
first_column_style = FontFace(emphasis="B", fill_color=(248, 248, 248))
body_style = FontFace(fill_color=(255, 255, 255))
with pdf.table(
borders_layout="SINGLE_TOP_LINE",
cell_fill_color=(255, 255, 255),
cell_fill_mode="ROWS",
col_widths=column_widths,
gutter_height=0,
gutter_width=0,
headings_style=header_style,
line_height=5.5,
text_align=("LEFT", "LEFT", "LEFT", "LEFT", "LEFT", "LEFT", "LEFT", "LEFT"),
width=pdf.w - pdf.l_margin - pdf.r_margin,
) as table:
header_row = table.row()
for heading in headings:
header_row.cell(heading, padding=(2.8, 2.5, 2.8, 2.5), v_align="M")
for row in rows:
table_row = table.row()
table_row.cell(row[0], style=first_column_style, padding=(2.5, 2.8, 2.5, 2.8), v_align="M")
for value in row[1:]:
table_row.cell(value or " ", style=body_style, padding=(2.5, 2.8, 2.5, 2.8), v_align="TOP")
return bytes(pdf.output())
def count_visible_items(availability_state: str) -> int:
row = get_db().execute(
f"SELECT COUNT(*) AS count FROM items WHERE availability_state = ? AND {visible_clause('items')}",
[availability_state, *visible_params()],
).fetchone()
return int(row["count"])
def fetch_day_templates(query: str | None = None, visibility: str | None = None) -> list[dict]:
conditions = [visible_clause("day_templates")]
params = visible_params()
if query:
conditions.append("LOWER(day_templates.name) LIKE ?")
params.append(f"%{query.lower()}%")
if visibility:
conditions.append("day_templates.visibility = ?")
params.append(visibility)
rows = get_db().execute(
f"""
SELECT day_templates.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
(
SELECT COUNT(*)
FROM day_template_entries
WHERE day_template_entries.day_template_id = day_templates.id
) AS entry_count
FROM day_templates
LEFT JOIN users AS owner ON owner.id = day_templates.owner_user_id
WHERE {' AND '.join(conditions)}
ORDER BY LOWER(day_templates.name)
""",
params,
).fetchall()
return [describe_template_record(dict(row)) for row in rows]
def get_day_template(template_id: int) -> dict:
row = get_db().execute(
f"""
SELECT day_templates.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM day_templates
LEFT JOIN users AS owner ON owner.id = day_templates.owner_user_id
WHERE day_templates.id = ? AND {visible_clause('day_templates')}
""",
[template_id, *visible_params()],
).fetchone()
if row is None:
raise ValueError("Die Tagesvorlage wurde nicht gefunden.")
return describe_template_record(dict(row))
def get_day_template_selected_map(template_id: int) -> dict[int, list[int]]:
rows = get_db().execute(
"""
SELECT daypart_id, item_id
FROM day_template_entries
WHERE day_template_id = ?
ORDER BY sort_order, id
""",
(template_id,),
).fetchall()
grouped: dict[int, list[int]] = defaultdict(list)
for row in rows:
grouped[int(row["daypart_id"])].append(int(row["item_id"]))
return grouped
def sync_day_template_entries(template_id: int, selected_map: dict[int, list[int]]) -> None:
get_db().execute("DELETE FROM day_template_entries WHERE day_template_id = ?", (template_id,))
for daypart in get_dayparts():
for sort_order, item_id in enumerate(selected_map.get(int(daypart["id"]), []), start=10):
get_db().execute(
"""
INSERT INTO day_template_entries (day_template_id, daypart_id, item_id, sort_order)
VALUES (?, ?, ?, ?)
""",
(template_id, daypart["id"], item_id, sort_order),
)
def day_template_form_data(template: dict | None = None, source_date: date | None = None) -> dict:
selected_map: dict[int, list[int]] = defaultdict(list)
if template:
selected_map.update(get_day_template_selected_map(template["id"]))
elif source_date:
for (plan_date_key, daypart_id), entries in fetch_day_plan_entries(source_date).items():
if plan_date_key == source_date.isoformat():
selected_map[int(daypart_id)] = [int(entry["item_id"]) for entry in entries]
form_data: dict[str, object] = {
"name": template["name"] if template else "",
"description": template["description"] if template else "",
"visibility": template["visibility"] if template else "shared",
"selected_map": {key: value[:] for key, value in selected_map.items()},
}
return form_data
def extract_day_template_form_data(existing: dict | None = None) -> dict:
form_data = existing or {}
selected_map: dict[int, list[int]] = {}
for daypart in get_dayparts():
selected_map[int(daypart["id"])] = [
int(value)
for value in request.form.getlist(f"daypart_{daypart['id']}_item_ids")
if value.isdigit()
]
form_data.update(
{
"name": request.form.get("name", "").strip(),
"description": request.form.get("description", "").strip(),
"visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")),
"selected_map": selected_map,
}
)
return form_data
def apply_day_template(template_id: int, selected_date: date) -> int:
template = get_day_template(template_id)
entries_map = get_day_template_selected_map(template_id)
inserted = 0
for daypart_id, item_ids in entries_map.items():
for item_id in item_ids:
existing = get_db().execute(
"""
SELECT id
FROM plan_entries
WHERE plan_date = ? AND daypart_id = ? AND item_id = ? AND visibility = ?
""",
(selected_date.isoformat(), daypart_id, item_id, template["visibility"]),
).fetchone()
if existing:
continue
get_db().execute(
"""
INSERT INTO plan_entries (household_id, owner_user_id, visibility, plan_date, daypart_id, item_id, created_by)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
template["visibility"],
selected_date.isoformat(),
daypart_id,
item_id,
g.user["id"],
),
)
insert_result = ensure_item_or_missing_components_are_shopped(
item_id,
g.user["id"],
template["visibility"],
needed_for_date=selected_date.isoformat(),
needed_for_daypart_id=daypart_id,
source_item_id=item_id,
)
inserted += 1
get_db().commit()
get_db().execute(
"UPDATE day_templates SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?",
(template_id,),
)
get_db().commit()
return inserted
def create_day_template_snapshot(name: str, description: str, visibility: str, source_date: date) -> int | None:
selected_map = day_template_form_data(source_date=source_date)["selected_map"]
if not any(selected_map.values()):
return None
cursor = get_db().execute(
"""
INSERT INTO day_templates (household_id, owner_user_id, visibility, name, description)
VALUES (?, ?, ?, ?, ?)
""",
(current_household_id(), g.user["id"], visibility, name, description),
)
template_id = int(cursor.lastrowid)
sync_day_template_entries(template_id, selected_map)
return template_id
def fetch_week_templates(query: str | None = None, visibility: str | None = None) -> list[dict]:
conditions = [visible_clause("week_templates")]
params = visible_params()
if query:
conditions.append("LOWER(week_templates.name) LIKE ?")
params.append(f"%{query.lower()}%")
if visibility:
conditions.append("week_templates.visibility = ?")
params.append(visibility)
rows = get_db().execute(
f"""
SELECT week_templates.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
(
SELECT COUNT(*)
FROM week_template_days
WHERE week_template_days.week_template_id = week_templates.id
) AS day_count
FROM week_templates
LEFT JOIN users AS owner ON owner.id = week_templates.owner_user_id
WHERE {' AND '.join(conditions)}
ORDER BY LOWER(week_templates.name)
""",
params,
).fetchall()
return [describe_template_record(dict(row)) for row in rows]
def get_week_template(template_id: int) -> dict:
row = get_db().execute(
f"""
SELECT week_templates.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM week_templates
LEFT JOIN users AS owner ON owner.id = week_templates.owner_user_id
WHERE week_templates.id = ? AND {visible_clause('week_templates')}
""",
[template_id, *visible_params()],
).fetchone()
if row is None:
raise ValueError("Die Wochenvorlage wurde nicht gefunden.")
return describe_template_record(dict(row))
def get_week_template_selected_map(template_id: int) -> dict[int, int]:
rows = get_db().execute(
"""
SELECT weekday_index, day_template_id
FROM week_template_days
WHERE week_template_id = ?
ORDER BY weekday_index
""",
(template_id,),
).fetchall()
return {int(row["weekday_index"]): int(row["day_template_id"]) for row in rows}
def week_template_form_data(template: dict | None = None, source_week: date | None = None) -> dict:
selected_map = get_week_template_selected_map(template["id"]) if template else {}
source_days = {}
if source_week:
for weekday_index in range(7):
source_days[weekday_index] = fetch_day_plan_entries(source_week + timedelta(days=weekday_index))
return {
"name": template["name"] if template else "",
"description": template["description"] if template else "",
"visibility": template["visibility"] if template else "shared",
"selected_map": selected_map,
"source_week": source_week.isoformat() if source_week else "",
"copy_from_source": {
index: bool(source_days.get(index))
for index in range(7)
},
}
def extract_week_template_form_data(existing: dict | None = None) -> dict:
form_data = existing or {}
selected_map: dict[int, int | None] = {}
copy_from_source: dict[int, bool] = {}
for weekday_index in range(7):
raw_value = request.form.get(f"weekday_{weekday_index}_day_template_id", "").strip()
selected_map[weekday_index] = int(raw_value) if raw_value.isdigit() else None
copy_from_source[weekday_index] = request.form.get(f"weekday_{weekday_index}_copy_source") == "1"
form_data.update(
{
"name": request.form.get("name", "").strip(),
"description": request.form.get("description", "").strip(),
"visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")),
"selected_map": selected_map,
"source_week": request.form.get("source_week", "").strip(),
"copy_from_source": copy_from_source,
}
)
return form_data
def sync_week_template_days(template_id: int, selected_map: dict[int, int | None]) -> None:
get_db().execute("DELETE FROM week_template_days WHERE week_template_id = ?", (template_id,))
for weekday_index, day_template_id in selected_map.items():
if day_template_id:
get_db().execute(
"""
INSERT INTO week_template_days (week_template_id, weekday_index, day_template_id)
VALUES (?, ?, ?)
""",
(template_id, weekday_index, day_template_id),
)
def apply_week_template(template_id: int, week_start: date) -> int:
selected_map = get_week_template_selected_map(template_id)
inserted = 0
for weekday_index, day_template_id in selected_map.items():
inserted += apply_day_template(day_template_id, week_start + timedelta(days=weekday_index))
get_db().execute(
"UPDATE week_templates SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?",
(template_id,),
)
get_db().commit()
return inserted
def fetch_item_sets(query: str | None = None, visibility: str | None = None) -> list[dict]:
conditions = [visible_clause("item_sets")]
params = visible_params()
if query:
conditions.append("LOWER(item_sets.name) LIKE ?")
params.append(f"%{query.lower()}%")
if visibility:
conditions.append("item_sets.visibility = ?")
params.append(visibility)
rows = get_db().execute(
f"""
SELECT item_sets.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
(
SELECT COUNT(*)
FROM item_set_items
WHERE item_set_items.item_set_id = item_sets.id
) AS item_count
FROM item_sets
LEFT JOIN users AS owner ON owner.id = item_sets.owner_user_id
WHERE {' AND '.join(conditions)}
ORDER BY LOWER(item_sets.name)
""",
params,
).fetchall()
return [describe_template_record(dict(row)) for row in rows]
def get_item_set(set_id: int) -> dict:
row = get_db().execute(
f"""
SELECT item_sets.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM item_sets
LEFT JOIN users AS owner ON owner.id = item_sets.owner_user_id
WHERE item_sets.id = ? AND {visible_clause('item_sets')}
""",
[set_id, *visible_params()],
).fetchone()
if row is None:
raise ValueError("Das Paket wurde nicht gefunden.")
return describe_template_record(dict(row))
def get_item_set_selected_ids(set_id: int) -> list[int]:
rows = get_db().execute(
"""
SELECT item_id
FROM item_set_items
WHERE item_set_id = ?
ORDER BY sort_order, id
""",
(set_id,),
).fetchall()
return [int(row["item_id"]) for row in rows]
def sync_item_set_items(set_id: int, item_ids: list[int]) -> None:
get_db().execute("DELETE FROM item_set_items WHERE item_set_id = ?", (set_id,))
for sort_order, item_id in enumerate(item_ids, start=10):
get_db().execute(
"""
INSERT OR IGNORE INTO item_set_items (item_set_id, item_id, sort_order)
VALUES (?, ?, ?)
""",
(set_id, item_id, sort_order),
)
def extract_item_set_form_data(existing: dict | None = None) -> dict:
form_data = existing or {}
form_data.update(
{
"name": request.form.get("name", "").strip(),
"description": request.form.get("description", "").strip(),
"visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")),
"item_ids": [int(value) for value in request.form.getlist("item_ids") if value.isdigit()],
"item_search": request.form.get("item_search", "").strip(),
}
)
return form_data
def apply_item_set_to_shopping(set_id: int) -> dict:
item_set = get_item_set(set_id)
selected_ids = get_item_set_selected_ids(set_id)
added_names: list[str] = []
for item_id in selected_ids:
result = ensure_item_or_missing_components_are_shopped(
item_id,
g.user["id"],
item_set["visibility"],
)
added_names.extend(result["names"])
get_db().execute(
"UPDATE item_sets SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?",
(set_id,),
)
get_db().commit()
return {"count": len(added_names), "names": added_names}
def render_item_form(kind: str, *, item: dict | None, form_data: dict):
food_search = form_data.get("food_search") or None
foods = fetch_food_options(query=food_search if kind == "meal" else None)
return render_template(
"items/form.html",
kind=kind,
item=item,
dayparts=get_dayparts(),
food_groups=group_items_by_availability(foods),
categories=get_category_options(
form_data.get("category") or form_data.get("quick_food_category")
),
form_data=form_data,
energy_density_options=ENERGY_DENSITY_OPTIONS,
visibility_options=VISIBILITY_FORM_OPTIONS,
target_user_options=get_target_user_options(),
)
def create_or_get_generated_meal(
*,
name: str,
component_ids: list[int],
daypart_id: int,
visibility: str,
) -> int:
normalized_ids = normalized_component_signature(component_ids)
existing_meals = [
item
for item in fetch_items(kind="meal")
if normalized_component_signature(item.get("component_ids", [])) == normalized_ids
]
if existing_meals:
meal_id = int(existing_meals[0]["id"])
current_dayparts = get_item_daypart_ids(meal_id)
if daypart_id not in current_dayparts:
sync_item_dayparts(meal_id, current_dayparts + [daypart_id])
get_db().execute(
"""
UPDATE items
SET updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], meal_id),
)
get_db().commit()
return meal_id
existing = get_db().execute(
f"""
SELECT items.id
FROM items
WHERE items.kind = 'meal' AND LOWER(items.name) = LOWER(?) AND {visible_clause('items')}
ORDER BY items.id
LIMIT 1
""",
[name, *visible_params()],
).fetchone()
if existing:
meal_id = int(existing["id"])
sync_meal_components(meal_id, list(normalized_ids))
current_dayparts = get_item_daypart_ids(meal_id)
if daypart_id not in current_dayparts:
sync_item_dayparts(meal_id, current_dayparts + [daypart_id])
get_db().execute(
"""
UPDATE items
SET updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], meal_id),
)
get_db().commit()
return meal_id
cursor = get_db().execute(
"""
INSERT INTO items (
household_id, owner_user_id, visibility, kind, name, category, created_by, updated_by
)
VALUES (?, ?, ?, 'meal', ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
visibility,
name,
"Kleines Essen" if get_daypart_by_id(daypart_id)["slug"] in {"breakfast", "morning-snack", "afternoon-snack", "late-snack"} else "Warmes",
g.user["id"],
g.user["id"],
),
)
meal_id = int(cursor.lastrowid)
sync_item_dayparts(meal_id, [daypart_id])
sync_meal_components(meal_id, list(normalized_ids))
get_db().commit()
return meal_id
def insert_plan_entry(*, item_id: int, daypart_id: int, plan_date: date, visibility: str, note: str = "") -> dict:
get_db().execute(
"""
INSERT INTO plan_entries (household_id, owner_user_id, visibility, plan_date, daypart_id, item_id, note, created_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
visibility,
plan_date.isoformat(),
daypart_id,
item_id,
note,
g.user["id"],
),
)
get_db().commit()
return ensure_item_or_missing_components_are_shopped(
item_id,
g.user["id"],
visibility,
needed_for_date=plan_date.isoformat(),
needed_for_daypart_id=daypart_id,
source_item_id=item_id,
)
def update_plan_entry(entry_id: int, *, visibility: str, note: str) -> None:
get_db().execute(
"""
UPDATE plan_entries
SET visibility = ?,
owner_user_id = CASE
WHEN ? = 'personal' THEN ?
ELSE owner_user_id
END,
note = ?
WHERE id = ?
""",
(visibility, visibility, g.user["id"], note, entry_id),
)
get_db().commit()
def planner_template_options():
return fetch_day_templates()
@main_bp.get("/")
@login_required
def dashboard():
today = date.today()
home_count = count_visible_items("home")
archive_count = count_visible_items("archived")
shopping_count = get_db().execute(
f"SELECT COUNT(*) AS count FROM shopping_entries WHERE is_checked = 0 AND {visible_clause('shopping_entries')}",
visible_params(),
).fetchone()["count"]
today_entries = []
for entries in fetch_day_plan_entries(today).values():
today_entries.extend(entries)
today_entries.sort(key=lambda entry: (entry["sort_order"], entry["item_name"].lower()))
week_cards = fetch_week_cards(today - timedelta(days=today.weekday()))
home_items = fetch_items(availability="home")
return render_template(
"dashboard.html",
home_count=home_count,
shopping_count=shopping_count,
archive_count=archive_count,
today_entries=today_entries,
home_items=home_items[:8],
today=today,
week_cards=week_cards[:3],
dashboard_hints=build_dashboard_hints(today),
recipe_suggestions=build_home_recipe_suggestions(limit=4),
upcoming_entries=fetch_upcoming_shopping_needs(limit=4),
day_templates=fetch_day_templates()[:3],
week_templates=fetch_week_templates()[:3],
setup_checklist=build_setup_checklist(today),
)
@main_bp.get("/templates")
@login_required
def template_library():
query = request.args.get("q", "").strip()
selected_visibility = request.args.get("visibility", "").strip()
visibility = selected_visibility or None
day_templates = fetch_day_templates(query=query or None, visibility=visibility)
week_templates = fetch_week_templates(query=query or None, visibility=visibility)
item_sets = fetch_item_sets(query=query or None, visibility=visibility)
template_hints = build_dashboard_hints(date.today())
return render_template(
"library/index.html",
query=query,
selected_visibility=selected_visibility,
visibility_options=VISIBILITY_FILTER_OPTIONS,
day_templates=day_templates,
week_templates=week_templates,
item_sets=item_sets,
template_hints=template_hints,
)
@main_bp.post("/suggestions/hide")
@login_required
def suggestion_hide():
component_ids = [int(value) for value in request.form.getlist("component_ids") if value.isdigit()]
if not component_ids:
flash("Diese Kombination konnte gerade nicht ausgeblendet werden.", "error")
return redirect(request.referrer or url_for("main.dashboard"))
get_db().execute(
"""
INSERT OR IGNORE INTO hidden_generated_suggestions (user_id, suggestion_key)
VALUES (?, ?)
""",
(g.user["id"], generated_suggestion_key(component_ids)),
)
get_db().commit()
flash("Diese generierte Mahlzeit wird dir künftig nicht mehr vorgeschlagen.", "info")
return redirect(request.referrer or url_for("main.dashboard"))
@main_bp.route("/templates/day/new", methods=("GET", "POST"))
@login_required
def day_template_create():
source_date = parse_plan_date(request.values.get("source_date"), fallback=None) if request.values.get("source_date") else None
form_data = day_template_form_data(source_date=source_date)
if request.method == "POST":
form_data = extract_day_template_form_data(form_data)
if not form_data["name"]:
flash("Bitte einen Namen für die Tagesvorlage eintragen.", "error")
else:
cursor = get_db().execute(
"""
INSERT INTO day_templates (household_id, owner_user_id, visibility, name, description)
VALUES (?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["visibility"],
form_data["name"],
form_data["description"],
),
)
template_id = int(cursor.lastrowid)
sync_day_template_entries(template_id, form_data["selected_map"])
get_db().commit()
flash("Die Tagesvorlage wurde gespeichert.", "success")
return redirect(url_for("main.template_library"))
return render_template(
"library/day_form.html",
template=None,
form_data=form_data,
dayparts=get_dayparts(),
daypart_sections=build_template_day_sections(form_data["selected_map"]),
visibility_options=VISIBILITY_FORM_OPTIONS,
name_suggestions=DAY_TEMPLATE_NAME_SUGGESTIONS,
source_date=source_date,
)
@main_bp.route("/templates/day/<int:template_id>/edit", methods=("GET", "POST"))
@login_required
def day_template_edit(template_id: int):
try:
template = get_day_template(template_id)
ensure_can_edit(template, "Diese Tagesvorlage kannst du gerade nicht bearbeiten.")
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.template_library"))
form_data = day_template_form_data(template=template)
if request.method == "POST":
form_data = extract_day_template_form_data(form_data)
if not form_data["name"]:
flash("Bitte einen Namen für die Tagesvorlage eintragen.", "error")
else:
get_db().execute(
"""
UPDATE day_templates
SET name = ?, description = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
form_data["name"],
form_data["description"],
form_data["visibility"],
template_id,
),
)
sync_day_template_entries(template_id, form_data["selected_map"])
get_db().commit()
flash("Die Tagesvorlage wurde aktualisiert.", "success")
return redirect(url_for("main.template_library"))
return render_template(
"library/day_form.html",
template=template,
form_data=form_data,
dayparts=get_dayparts(),
daypart_sections=build_template_day_sections(form_data["selected_map"]),
visibility_options=VISIBILITY_FORM_OPTIONS,
name_suggestions=DAY_TEMPLATE_NAME_SUGGESTIONS,
source_date=None,
)
@main_bp.post("/templates/day/<int:template_id>/apply")
@login_required
def day_template_apply(template_id: int):
selected_date = parse_plan_date(request.form.get("target_date"))
inserted = apply_day_template(template_id, selected_date)
flash(f"Die Tagesvorlage wurde angewendet und hat {inserted} Einträge ergänzt.", "success")
return redirect(url_for("main.planner_day", date=selected_date.isoformat()))
@main_bp.route("/templates/week/new", methods=("GET", "POST"))
@login_required
def week_template_create():
source_week_raw = request.values.get("source_week", "").strip()
source_week = parse_week_start(source_week_raw) if source_week_raw else None
form_data = week_template_form_data(source_week=source_week)
if request.method == "POST":
form_data = extract_week_template_form_data(form_data)
if not form_data["name"]:
flash("Bitte einen Namen für die Wochenvorlage eintragen.", "error")
else:
cursor = get_db().execute(
"""
INSERT INTO week_templates (household_id, owner_user_id, visibility, name, description)
VALUES (?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["visibility"],
form_data["name"],
form_data["description"],
),
)
template_id = int(cursor.lastrowid)
selected_map = dict(form_data["selected_map"])
if form_data["source_week"]:
source_start = parse_week_start(form_data["source_week"])
for weekday_index in range(7):
if selected_map.get(weekday_index):
continue
if form_data["copy_from_source"].get(weekday_index):
snapshot_date = source_start + timedelta(days=weekday_index)
snapshot_name = f"{form_data['name']} · {WEEKDAY_LABELS[weekday_index]}"
snapshot_id = create_day_template_snapshot(
snapshot_name,
f"Automatisch aus der Woche {source_start.strftime('%d.%m.%Y')} übernommen.",
form_data["visibility"],
snapshot_date,
)
if snapshot_id:
selected_map[weekday_index] = snapshot_id
sync_week_template_days(template_id, selected_map)
get_db().commit()
flash("Die Wochenvorlage wurde gespeichert.", "success")
return redirect(url_for("main.template_library"))
return render_template(
"library/week_form.html",
template=None,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
name_suggestions=WEEK_TEMPLATE_NAME_SUGGESTIONS,
day_templates=fetch_day_templates(),
weekday_labels=WEEKDAY_LABELS,
source_week=source_week,
)
@main_bp.route("/templates/week/<int:template_id>/edit", methods=("GET", "POST"))
@login_required
def week_template_edit(template_id: int):
try:
template = get_week_template(template_id)
ensure_can_edit(template, "Diese Wochenvorlage kannst du gerade nicht bearbeiten.")
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.template_library"))
form_data = week_template_form_data(template=template)
if request.method == "POST":
form_data = extract_week_template_form_data(form_data)
if not form_data["name"]:
flash("Bitte einen Namen für die Wochenvorlage eintragen.", "error")
else:
get_db().execute(
"""
UPDATE week_templates
SET name = ?, description = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
form_data["name"],
form_data["description"],
form_data["visibility"],
template_id,
),
)
sync_week_template_days(template_id, form_data["selected_map"])
get_db().commit()
flash("Die Wochenvorlage wurde aktualisiert.", "success")
return redirect(url_for("main.template_library"))
return render_template(
"library/week_form.html",
template=template,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
name_suggestions=WEEK_TEMPLATE_NAME_SUGGESTIONS,
day_templates=fetch_day_templates(),
weekday_labels=WEEKDAY_LABELS,
source_week=None,
)
@main_bp.post("/templates/week/<int:template_id>/apply")
@login_required
def week_template_apply(template_id: int):
week_start = parse_week_start(request.form.get("target_week"))
inserted = apply_week_template(template_id, week_start)
flash(f"Die Wochenvorlage wurde angewendet und hat {inserted} Einträge ergänzt.", "success")
return redirect(url_for("main.planner", week=week_start.isoformat()))
@main_bp.route("/templates/set/new", methods=("GET", "POST"))
@login_required
def item_set_create():
form_data = {
"name": "",
"description": "",
"visibility": "shared",
"item_ids": [],
"item_search": "",
}
if request.method == "POST":
form_data = extract_item_set_form_data(form_data)
if not form_data["name"]:
flash("Bitte einen Namen für das Paket eintragen.", "error")
else:
cursor = get_db().execute(
"""
INSERT INTO item_sets (household_id, owner_user_id, visibility, name, description)
VALUES (?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["visibility"],
form_data["name"],
form_data["description"],
),
)
set_id = int(cursor.lastrowid)
sync_item_set_items(set_id, form_data["item_ids"])
get_db().commit()
flash("Das Paket wurde gespeichert.", "success")
return redirect(url_for("main.template_library"))
items = fetch_items(include_archived=False, query=form_data["item_search"] or None)
return render_template(
"library/set_form.html",
item_set=None,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
name_suggestions=ITEM_SET_NAME_SUGGESTIONS,
item_groups=group_items_by_availability(items),
)
@main_bp.route("/templates/set/<int:set_id>/edit", methods=("GET", "POST"))
@login_required
def item_set_edit(set_id: int):
try:
item_set = get_item_set(set_id)
ensure_can_edit(item_set, "Dieses Paket kannst du gerade nicht bearbeiten.")
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.template_library"))
form_data = {
"name": item_set["name"],
"description": item_set["description"] or "",
"visibility": item_set["visibility"],
"item_ids": get_item_set_selected_ids(set_id),
"item_search": "",
}
if request.method == "POST":
form_data = extract_item_set_form_data(form_data)
if not form_data["name"]:
flash("Bitte einen Namen für das Paket eintragen.", "error")
else:
get_db().execute(
"""
UPDATE item_sets
SET name = ?, description = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
form_data["name"],
form_data["description"],
form_data["visibility"],
set_id,
),
)
sync_item_set_items(set_id, form_data["item_ids"])
get_db().commit()
flash("Das Paket wurde aktualisiert.", "success")
return redirect(url_for("main.template_library"))
items = fetch_items(include_archived=False, query=form_data["item_search"] or None)
return render_template(
"library/set_form.html",
item_set=item_set,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
name_suggestions=ITEM_SET_NAME_SUGGESTIONS,
item_groups=group_items_by_availability(items),
)
@main_bp.post("/templates/set/<int:set_id>/apply")
@login_required
def item_set_apply(set_id: int):
result = apply_item_set_to_shopping(set_id)
if result["count"]:
flash(f"Das Paket wurde auf die Einkaufsliste übernommen: {', '.join(result['names'][:4])}.", "success")
else:
flash("Das Paket ist bereits vollständig auf der Einkaufsliste oder zuhause vorhanden.", "info")
return redirect(url_for("main.shopping_list"))
@main_bp.route("/settings", methods=("GET", "POST"))
@login_required
def settings_view():
if request.method == "POST":
form_name = request.form.get("form_name", "").strip()
if form_name == "household":
shopping_weekday = normalize_weekday(request.form.get("shopping_weekday"), 5)
raw_prep_days = request.form.get("shopping_prep_days", "1").strip()
shopping_prep_days = max(0, min(7, int(raw_prep_days))) if raw_prep_days.isdigit() else 1
shopping_reminder_time = request.form.get("shopping_reminder_time", "18:00").strip() or "18:00"
get_db().execute(
"""
UPDATE households
SET shopping_weekday = ?, shopping_prep_days = ?, shopping_reminder_time = ?
WHERE id = ?
""",
(shopping_weekday, shopping_prep_days, shopping_reminder_time, current_household_id()),
)
get_db().commit()
flash("Die Einkaufsrhythmus-Einstellungen wurden gespeichert.", "success")
elif form_name == "reminders":
ensure_user_settings_row()
suggestion_style = normalize_suggestion_style(request.form.get("suggestion_style"), "balanced")
get_db().execute(
"""
UPDATE user_settings
SET reminders_enabled = ?,
push_enabled = ?,
notification_channel = ?,
suggestion_style = ?,
energy_preference = ?,
remind_before_shopping = ?,
remind_on_shopping_day = ?,
show_missing_for_upcoming_week = ?,
show_planned_not_shopped = ?,
remind_tomorrow_if_sparse = ?,
remind_week_if_sparse = ?,
push_missing_breakfast = ?,
push_missing_lunch = ?,
push_missing_dinner = ?,
push_small_snack = ?,
suggest_home_for_today = ?,
remind_small_snack = ?,
remind_nuts = ?,
show_meal_balancing = ?,
suggest_templates = ?,
suggest_patterns = ?,
updated_at = CURRENT_TIMESTAMP
WHERE user_id = ?
""",
(
parse_checkbox("reminders_enabled", True),
parse_checkbox("push_enabled", False),
normalize_notification_channel(request.form.get("notification_channel"), "in_app"),
suggestion_style,
suggestion_style_energy_preference(suggestion_style),
parse_checkbox("remind_before_shopping", True),
parse_checkbox("remind_on_shopping_day", True),
parse_checkbox("show_missing_for_upcoming_week", True),
parse_checkbox("show_planned_not_shopped", True),
parse_checkbox("remind_tomorrow_if_sparse", True),
parse_checkbox("remind_week_if_sparse", True),
parse_checkbox("push_missing_breakfast", False),
parse_checkbox("push_missing_lunch", False),
parse_checkbox("push_missing_dinner", False),
parse_checkbox("push_small_snack", False),
parse_checkbox("suggest_home_for_today", True),
parse_checkbox("remind_small_snack", False),
parse_checkbox("remind_nuts", False),
parse_checkbox("show_meal_balancing", True),
parse_checkbox("suggest_templates", True),
parse_checkbox("suggest_patterns", True),
g.user["id"],
),
)
get_db().commit()
flash("Deine Erinnerungen und Hinweise wurden gespeichert.", "success")
elif form_name == "push_test":
subscription = get_db().execute(
"""
SELECT endpoint, p256dh, auth
FROM push_subscriptions
WHERE user_id = ? AND is_active = 1
ORDER BY updated_at DESC
LIMIT 1
""",
(g.user["id"],),
).fetchone()
if subscription is None:
flash("Bitte aktiviere zuerst Push im Browser oder auf dem Home-Bildschirm.", "error")
else:
ok, error = send_push_message(
{
"endpoint": subscription["endpoint"],
"keys": {"p256dh": subscription["p256dh"], "auth": subscription["auth"]},
},
title="Nouri",
body="Push ist bereit. Erinnerungen können später darüber kommen.",
url=url_for("main.settings_view", _external=True),
)
if ok:
get_db().execute(
"UPDATE push_subscriptions SET last_test_at = CURRENT_TIMESTAMP WHERE endpoint = ?",
(subscription["endpoint"],),
)
get_db().commit()
flash("Die Test-Mitteilung wurde versendet.", "success")
else:
flash(error or "Die Test-Mitteilung konnte gerade nicht gesendet werden.", "error")
return redirect(url_for("main.settings_view"))
household_settings = get_household_settings()
user_settings = get_user_settings()
push_subscription = get_db().execute(
"""
SELECT COUNT(*) AS count
FROM push_subscriptions
WHERE user_id = ? AND is_active = 1
""",
(g.user["id"],),
).fetchone()
return render_template(
"settings.html",
household_settings=household_settings,
user_settings=user_settings,
push_subscription_count=int(push_subscription["count"]),
push_ready=push_is_configured(),
push_public_key_value=push_public_key(),
restore_confirmation_text=RESTORE_CONFIRMATION_TEXT,
)
@main_bp.get("/settings/backup/export")
@login_required
@admin_required
def backup_export():
archive_path, download_name = export_backup_archive(
get_db(),
current_app.config["UPLOAD_FOLDER"],
current_app.config["APP_VERSION"],
)
@after_this_request
def cleanup_backup(response):
Path(archive_path).unlink(missing_ok=True)
return response
return send_file(
archive_path,
as_attachment=True,
download_name=download_name,
mimetype="application/zip",
max_age=0,
)
@main_bp.post("/settings/backup/restore")
@login_required
@admin_required
def backup_restore():
confirmation = request.form.get("restore_confirmation", "").strip().upper()
backup_file = request.files.get("backup_file")
if confirmation != RESTORE_CONFIRMATION_TEXT:
flash("Bitte die Bestätigung genau eintragen, bevor das Backup wiederhergestellt wird.", "error")
return redirect(url_for("main.settings_view"))
if not backup_file or not backup_file.filename:
flash("Bitte zuerst eine Backup-Datei auswählen.", "error")
return redirect(url_for("main.settings_view"))
try:
metadata = restore_backup_archive(get_db(), current_app.config["UPLOAD_FOLDER"], backup_file)
get_db().commit()
except Exception as exc:
get_db().rollback()
flash(str(exc) or "Das Backup konnte gerade nicht wiederhergestellt werden.", "error")
return redirect(url_for("main.settings_view"))
version_label = metadata.get("app_version") or "einer älteren Version"
flash(f"Das Backup aus {version_label} wurde wiederhergestellt.", "success")
return redirect(url_for("main.settings_view"))
@main_bp.post("/push/subscribe")
@login_required
def push_subscribe():
if not push_is_configured():
return jsonify({"ok": False, "error": "Push ist noch nicht konfiguriert."}), 400
endpoint = request.form.get("endpoint", "").strip()
p256dh = request.form.get("p256dh", "").strip()
auth_key = request.form.get("auth", "").strip()
if not endpoint or not p256dh or not auth_key:
return jsonify({"ok": False, "error": "Unvollständige Push-Daten."}), 400
get_db().execute(
"""
INSERT INTO push_subscriptions (user_id, endpoint, p256dh, auth, user_agent, is_active)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(endpoint) DO UPDATE SET
user_id = excluded.user_id,
p256dh = excluded.p256dh,
auth = excluded.auth,
user_agent = excluded.user_agent,
is_active = 1,
updated_at = CURRENT_TIMESTAMP
""",
(g.user["id"], endpoint, p256dh, auth_key, request.headers.get("User-Agent", "")),
)
ensure_user_settings_row()
get_db().execute(
"UPDATE user_settings SET push_enabled = 1, updated_at = CURRENT_TIMESTAMP WHERE user_id = ?",
(g.user["id"],),
)
get_db().commit()
return jsonify({"ok": True})
@main_bp.post("/push/unsubscribe")
@login_required
def push_unsubscribe():
endpoint = request.form.get("endpoint", "").strip()
if endpoint:
get_db().execute(
"UPDATE push_subscriptions SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE endpoint = ? AND user_id = ?",
(endpoint, g.user["id"]),
)
else:
get_db().execute(
"UPDATE push_subscriptions SET is_active = 0, updated_at = CURRENT_TIMESTAMP WHERE user_id = ?",
(g.user["id"],),
)
get_db().commit()
return jsonify({"ok": True})
@main_bp.route("/items/<kind>")
@login_required
def item_list(kind: str):
if kind not in ITEM_KIND_LABELS:
return redirect(url_for("main.dashboard"))
query = request.args.get("q", "").strip()
state = request.args.get("state", "").strip()
scope = request.args.get("visibility", "").strip()
raw_daypart_id = request.args.get("daypart_id", "").strip()
daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None
items = fetch_items(
kind=kind,
availability=state or None,
query=query or None,
daypart_id=daypart_id,
visibility=scope or None,
)
return render_template(
"items/list.html",
kind=kind,
items=items,
availability_labels=AVAILABILITY_LABELS,
query=query,
selected_state=state,
selected_visibility=scope,
selected_daypart_id=daypart_id,
dayparts=get_dayparts(),
state_options=ACTIVE_STATE_OPTIONS,
visibility_options=VISIBILITY_FILTER_OPTIONS,
today=date.today(),
)
@main_bp.route("/items/<kind>/new", methods=("GET", "POST"))
@login_required
def item_create(kind: str):
if kind not in ITEM_KIND_LABELS:
return redirect(url_for("main.dashboard"))
form_data = {
"name": request.args.get("name", "").strip(),
"category": "",
"energy_density": "neutral",
"note": "",
"visibility": "shared",
"target_user_id": None,
"target_user_raw": TARGET_USER_OPTIONS_DEFAULT,
"food_search": "",
"daypart_ids": [],
"component_ids": [int(value) for value in request.args.getlist("component_ids") if value.isdigit()],
"quick_food_name": "",
"quick_food_category": "",
"quick_food_energy_density": "neutral",
"quick_food_note": "",
}
if request.method == "POST":
form_action = request.form.get("form_action", "save_item")
form_data = extract_item_form_data(form_data)
if kind == "meal" and form_action == "filter_foods":
return render_item_form(kind, item=None, form_data=form_data)
if kind == "meal" and form_action == "quick_add_food":
if not form_data["quick_food_name"]:
flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error")
else:
new_food_id = create_quick_food_from_form(form_data)
if new_food_id not in form_data["component_ids"]:
form_data["component_ids"].append(new_food_id)
form_data["quick_food_name"] = ""
form_data["quick_food_category"] = ""
form_data["quick_food_note"] = ""
flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success")
return render_item_form(kind, item=None, form_data=form_data)
error = None
if not form_data["name"]:
error = "Bitte einen Namen eintragen."
photo_filename = None
if error is None:
try:
photo_filename = save_photo(request.files.get("photo"))
except ValueError as exc:
error = str(exc)
if error is None:
cursor = get_db().execute(
"""
INSERT INTO items (
household_id, owner_user_id, target_user_id, visibility, kind, name, category, energy_density, note, photo_filename, created_by, updated_by
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["target_user_id"],
form_data["visibility"],
kind,
form_data["name"],
form_data["category"],
form_data["energy_density"],
form_data["note"],
photo_filename,
g.user["id"],
g.user["id"],
),
)
item_id = int(cursor.lastrowid)
sync_item_dayparts(item_id, form_data["daypart_ids"])
if kind == "meal":
sync_meal_components(item_id, form_data["component_ids"])
get_db().commit()
flash(f"{ITEM_KIND_SINGULAR_LABELS[kind]} wurde angelegt.", "success")
return redirect(url_for("main.item_list", kind=kind))
flash(error, "error")
return render_item_form(kind, item=None, form_data=form_data)
@main_bp.route("/items/<int:item_id>/edit", methods=("GET", "POST"))
@login_required
def item_edit(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.dashboard"))
form_data = {
"name": item["name"],
"category": item["category"] or "",
"energy_density": item.get("energy_density") or "neutral",
"note": item["note"] or "",
"visibility": item["visibility"],
"target_user_id": item["target_user_id"],
"target_user_raw": str(item["target_user_id"]) if item["target_user_id"] else TARGET_USER_OPTIONS_DEFAULT,
"food_search": "",
"daypart_ids": get_item_daypart_ids(item_id),
"component_ids": get_meal_component_ids(item_id) if item["kind"] == "meal" else [],
"quick_food_name": "",
"quick_food_category": "",
"quick_food_energy_density": "neutral",
"quick_food_note": "",
}
if request.method == "POST":
form_action = request.form.get("form_action", "save_item")
form_data = extract_item_form_data(form_data)
if item["kind"] == "meal" and form_action == "filter_foods":
return render_item_form(item["kind"], item=item, form_data=form_data)
if item["kind"] == "meal" and form_action == "quick_add_food":
if not form_data["quick_food_name"]:
flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error")
else:
new_food_id = create_quick_food_from_form(form_data)
if new_food_id not in form_data["component_ids"]:
form_data["component_ids"].append(new_food_id)
form_data["quick_food_name"] = ""
form_data["quick_food_category"] = ""
form_data["quick_food_note"] = ""
flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success")
return render_item_form(item["kind"], item=item, form_data=form_data)
error = None
if not form_data["name"]:
error = "Bitte einen Namen eintragen."
photo_filename = item["photo_filename"]
if error is None:
try:
photo_filename = save_photo(request.files.get("photo"), current_filename=item["photo_filename"])
except ValueError as exc:
error = str(exc)
if error is None:
get_db().execute(
"""
UPDATE items
SET name = ?,
category = ?,
energy_density = ?,
note = ?,
visibility = ?,
target_user_id = ?,
photo_filename = ?,
updated_by = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
form_data["name"],
form_data["category"],
form_data["energy_density"],
form_data["note"],
form_data["visibility"],
form_data["target_user_id"],
photo_filename,
g.user["id"],
item_id,
),
)
sync_item_dayparts(item_id, form_data["daypart_ids"])
if item["kind"] == "meal":
sync_meal_components(item_id, form_data["component_ids"])
get_db().commit()
flash("Der Eintrag wurde aktualisiert.", "success")
return redirect(url_for("main.item_list", kind=item["kind"]))
flash(error, "error")
return render_item_form(item["kind"], item=item, form_data=form_data)
@main_bp.post("/items/<int:item_id>/shopping")
@login_required
def item_add_to_shopping(item_id: int):
try:
item = get_item(item_id)
except ValueError as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.shopping_list"))
result = ensure_item_or_missing_components_are_shopped(
item_id,
g.user["id"],
item["visibility"],
)
if result["count"]:
if result["used_components"]:
flash(f"Für „{item['name']}“ wurden fehlende Lebensmittel auf die Einkaufsliste gesetzt.", "success")
else:
flash(f"{item['name']} steht jetzt auf der Einkaufsliste.", "success")
elif result["scheduled_count"]:
flash(f"Für „{item['name']}“ sind fehlende Lebensmittel für einen späteren Einkauf vorgemerkt.", "info")
else:
flash(f"Für „{item['name']}“ ist gerade nichts zusätzlich nötig.", "info")
return redirect(request.referrer or url_for("main.shopping_list"))
@main_bp.post("/items/<int:item_id>/set-home")
@login_required
def item_set_home(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.home_view"))
get_db().execute(
"UPDATE items SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(g.user["id"], item_id),
)
get_db().commit()
flash(f"{item['name']} ist jetzt unter Zuhause sichtbar.", "success")
return redirect(request.referrer or url_for("main.home_view"))
@main_bp.post("/items/<int:item_id>/archive")
@login_required
def item_archive(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.archive_view"))
get_db().execute(
"UPDATE items SET availability_state = 'archived', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(g.user["id"], item_id),
)
get_db().commit()
flash(f"{item['name']} liegt jetzt im Archiv und bleibt später leicht wiederfindbar.", "info")
return redirect(request.referrer or url_for("main.archive_view"))
@main_bp.post("/items/<int:item_id>/restore")
@login_required
def item_restore(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.archive_view"))
get_db().execute(
"UPDATE items SET availability_state = 'idea', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(g.user["id"], item_id),
)
get_db().commit()
flash(f"{item['name']} ist wieder in der aktiven Liste.", "success")
return redirect(request.referrer or url_for("main.archive_view"))
@main_bp.route("/shopping", methods=("GET", "POST"))
@login_required
def shopping_list():
if request.method == "POST":
selected_item_id = request.form.get("item_id", "").strip()
if not selected_item_id.isdigit():
flash("Bitte zuerst etwas auswählen.", "error")
else:
try:
item = get_item(int(selected_item_id))
result = ensure_item_or_missing_components_are_shopped(
item["id"],
g.user["id"],
item["visibility"],
)
if result["count"]:
flash(f"Die Einkaufsliste wurde ergänzt: {', '.join(result['names'][:4])}.", "success")
elif result["scheduled_count"]:
flash("Ein paar Dinge sind für einen späteren Einkauf vorgemerkt.", "info")
else:
flash("Dafür ist gerade nichts zusätzlich nötig.", "info")
except ValueError as exc:
flash(str(exc), "error")
return redirect(url_for("main.shopping_list"))
entries = fetch_shopping_entries()
upcoming_entries = fetch_upcoming_shopping_needs()
addable_items = fetch_items(include_archived=False)
addable_items = [item for item in addable_items if not item["is_on_shopping_list"]]
household_settings = get_household_settings()
shopping_weekday_label = dict(WEEKDAY_OPTIONS).get(household_settings["shopping_weekday"], "gesetzt")
return render_template(
"shopping/list.html",
entries=entries,
upcoming_entries=upcoming_entries,
addable_items=addable_items,
household_settings=household_settings,
shopping_weekday_label=shopping_weekday_label,
)
@main_bp.post("/shopping/<int:entry_id>/check")
@login_required
def shopping_check(entry_id: int):
entry = get_db().execute(
f"""
SELECT shopping_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM shopping_entries
LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id
WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Einkaufseintrag wurde nicht gefunden.", "error")
return redirect(url_for("main.shopping_list"))
entry_dict = describe_record(dict(entry))
try:
ensure_can_edit(entry_dict, "Diesen Einkaufseintrag kannst du gerade nicht ändern.")
item = get_item(entry["item_id"])
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.shopping_list"))
get_db().execute(
"UPDATE shopping_entries SET is_checked = 1, checked_at = CURRENT_TIMESTAMP, checked_by = ? WHERE id = ?",
(g.user["id"], entry_id),
)
get_db().execute(
"UPDATE items SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(g.user["id"], item["id"]),
)
get_db().commit()
flash(f"{item['name']} ist jetzt als Zuhause vorhanden markiert.", "success")
return redirect(url_for("main.shopping_list"))
@main_bp.post("/shopping/<int:entry_id>/remove")
@login_required
def shopping_remove(entry_id: int):
entry = get_db().execute(
f"""
SELECT shopping_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM shopping_entries
LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id
WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Eintrag wurde nicht gefunden.", "error")
return redirect(url_for("main.shopping_list"))
try:
ensure_can_edit(describe_record(dict(entry)), "Diesen Einkaufseintrag kannst du gerade nicht entfernen.")
except PermissionError as exc:
flash(str(exc), "error")
return redirect(url_for("main.shopping_list"))
get_db().execute("DELETE FROM shopping_entries WHERE id = ?", (entry_id,))
get_db().commit()
flash("Der Eintrag wurde von der Einkaufsliste entfernt.", "info")
return redirect(url_for("main.shopping_list"))
@main_bp.get("/home")
@login_required
def home_view():
query = request.args.get("q", "").strip()
scope = request.args.get("visibility", "").strip()
raw_daypart_id = request.args.get("daypart_id", "").strip()
daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None
dayparts = get_dayparts()
items = fetch_items(
availability="home",
query=query or None,
daypart_id=daypart_id,
visibility=scope or None,
)
return render_template(
"home/list.html",
sections=build_home_sections(items, dayparts, daypart_id),
recipe_suggestions=build_home_recipe_suggestions(daypart_id, limit=4),
query=query,
dayparts=dayparts,
selected_daypart_id=daypart_id,
selected_visibility=scope,
visibility_options=VISIBILITY_FILTER_OPTIONS,
today=date.today(),
)
@main_bp.get("/archive")
@login_required
def archive_view():
query = request.args.get("q", "").strip()
selected_kind = request.args.get("kind", "").strip()
selected_visibility = request.args.get("visibility", "").strip()
kind = selected_kind if selected_kind in ITEM_KIND_LABELS else None
items = fetch_items(
kind=kind,
availability="archived",
include_archived=True,
query=query or None,
visibility=selected_visibility or None,
)
return render_template(
"archive/list.html",
items=items,
query=query,
selected_kind=selected_kind,
selected_visibility=selected_visibility,
kind_options=KIND_FILTER_OPTIONS,
visibility_options=VISIBILITY_FILTER_OPTIONS,
today=date.today(),
)
@main_bp.get("/planner")
@login_required
def planner():
week_start = parse_week_start(request.args.get("week"))
return render_template(
"planner/week.html",
week_start=week_start,
week_end=week_start + timedelta(days=6),
prev_week=week_start - timedelta(days=7),
next_week=week_start + timedelta(days=7),
week_cards=fetch_week_cards(week_start),
today=date.today(),
week_templates=fetch_week_templates()[:6],
week_hints=build_week_hints(week_start),
upcoming_entries=fetch_upcoming_shopping_needs(limit=8),
household_settings=get_household_settings(),
)
@main_bp.get("/planner/export.pdf")
@login_required
def planner_export_pdf():
week_start = parse_week_start(request.args.get("week"))
mode = normalize_pdf_export_mode(request.args.get("mode"))
try:
pdf_bytes = build_week_plan_pdf(week_start, mode=mode)
except RuntimeError as exc:
flash(str(exc), "error")
return redirect(url_for("main.planner", week=week_start.isoformat()))
week_number = week_start.isocalendar().week
prefix = "mein-essensplan" if mode == "mine" else "unser-essensplan"
filename = f"{prefix}-kw-{week_number:02d}-{week_start.year}.pdf"
return send_file(
BytesIO(pdf_bytes),
mimetype="application/pdf",
as_attachment=True,
download_name=filename,
)
@main_bp.route("/planner/day", methods=("GET", "POST"))
@login_required
def planner_day():
selected_date = parse_plan_date(request.values.get("date"))
if request.method == "POST":
item_id_raw = request.form.get("item_id", "").strip()
daypart_id_raw = request.form.get("daypart_id", "").strip()
note = request.form.get("note", "").strip()
selected_date = parse_plan_date(request.form.get("plan_date"))
visibility = normalize_visibility(request.form.get("visibility"), "shared")
if not item_id_raw.isdigit():
flash("Bitte etwas für den Tagesplan auswählen.", "error")
elif not daypart_id_raw.isdigit():
flash("Bitte eine Tageszeit auswählen.", "error")
else:
item_id = int(item_id_raw)
daypart_id = int(daypart_id_raw)
try:
get_item(item_id)
shopping_result = insert_plan_entry(
item_id=item_id,
daypart_id=daypart_id,
plan_date=selected_date,
visibility=visibility,
note=note,
)
if shopping_result["count"]:
flash("Fehlende Lebensmittel wurden für den nächsten Einkauf ergänzt.", "info")
elif shopping_result["scheduled_count"]:
flash("Fehlende Lebensmittel wurden für einen späteren Einkauf vorgemerkt.", "info")
flash("Der Eintrag wurde in den Tagesplan gelegt.", "success")
return redirect(
f"{url_for('main.planner_day', date=selected_date.isoformat(), daypart_id=daypart_id)}#daypart-{daypart_id}"
)
except ValueError as exc:
flash(str(exc), "error")
selected_item_raw = request.args.get("item_id", "").strip()
selected_daypart_raw = request.args.get("daypart_id", "").strip()
selected_meal_name = request.args.get("meal_name", "").strip()
selected_components_raw = request.args.get("component_ids", "").strip()
selected_item_id = int(selected_item_raw) if selected_item_raw.isdigit() else None
selected_daypart_id = int(selected_daypart_raw) if selected_daypart_raw.isdigit() else None
selected_component_ids = [int(value) for value in selected_components_raw.split(",") if value.isdigit()]
return render_template(
"planner/day.html",
selected_date=selected_date,
previous_day=selected_date - timedelta(days=1),
next_day=selected_date + timedelta(days=1),
sections=build_day_planner_sections(
selected_date,
selected_item_id,
selected_daypart_id,
selected_meal_name=selected_meal_name,
selected_component_ids=selected_component_ids,
),
today=date.today(),
visibility_options=VISIBILITY_FORM_OPTIONS,
day_templates=fetch_day_templates()[:6],
day_hints=build_day_hints(selected_date),
)
@main_bp.post("/planner/day/generated-meal")
@login_required
def planner_generated_meal():
selected_date = parse_plan_date(request.form.get("plan_date"))
daypart_raw = request.form.get("daypart_id", "").strip()
meal_name = request.form.get("meal_name", "").strip()
component_ids = [int(value) for value in request.form.getlist("component_ids") if value.isdigit()]
visibility = normalize_visibility(request.form.get("visibility"), "shared")
if not daypart_raw.isdigit() or not meal_name or not component_ids:
flash("Die vorgeschlagene Kombination konnte gerade nicht übernommen werden.", "error")
return redirect(url_for("main.planner_day", date=selected_date.isoformat()))
daypart_id = int(daypart_raw)
meal_id = create_or_get_generated_meal(
name=meal_name,
component_ids=component_ids,
daypart_id=daypart_id,
visibility=visibility,
)
shopping_result = insert_plan_entry(
item_id=meal_id,
daypart_id=daypart_id,
plan_date=selected_date,
visibility=visibility,
)
if shopping_result["count"]:
flash("Die Kombination wurde eingeplant und fehlende Lebensmittel wurden ergänzt.", "info")
elif shopping_result["scheduled_count"]:
flash("Die Kombination wurde eingeplant. Fehlende Lebensmittel sind für später vorgemerkt.", "info")
flash("Die Kombination wurde als Mahlzeitenidee übernommen.", "success")
return redirect(f"{url_for('main.planner_day', date=selected_date.isoformat(), daypart_id=daypart_id)}#daypart-{daypart_id}")
@main_bp.post("/planner/<int:entry_id>/update")
@login_required
def planner_update(entry_id: int):
selected_date = parse_plan_date(request.form.get("plan_date"))
entry = get_db().execute(
f"""
SELECT plan_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_entries.id = ? AND {visible_clause('plan_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Planeintrag wurde nicht gefunden.", "error")
return redirect(url_for("main.planner_day", date=selected_date.isoformat()))
try:
ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht bearbeiten.")
except PermissionError as exc:
flash(str(exc), "error")
return redirect(url_for("main.planner_day", date=selected_date.isoformat()))
visibility = normalize_visibility(request.form.get("visibility"), entry["visibility"])
note = request.form.get("note", "").strip()
update_plan_entry(entry_id, visibility=visibility, note=note)
flash("Der Planeintrag wurde angepasst.", "success")
return redirect(url_for("main.planner_day", date=selected_date.isoformat(), daypart_id=entry["daypart_id"]))
@main_bp.post("/planner/<int:entry_id>/remove")
@login_required
def planner_remove(entry_id: int):
selected_date = request.args.get("date", "")
entry = get_db().execute(
f"""
SELECT plan_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_entries.id = ? AND {visible_clause('plan_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Planeintrag wurde nicht gefunden.", "error")
else:
try:
ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht entfernen.")
get_db().execute("DELETE FROM plan_entries WHERE id = ?", (entry_id,))
get_db().commit()
flash("Der Planeintrag wurde entfernt.", "info")
except PermissionError as exc:
flash(str(exc), "error")
if selected_date:
return redirect(url_for("main.planner_day", date=selected_date))
return redirect(url_for("main.planner"))
@main_bp.post("/planner/<int:entry_id>/move")
@login_required
def planner_move(entry_id: int):
target_date = parse_plan_date(request.form.get("target_date"))
target_daypart_raw = request.form.get("target_daypart_id", "").strip()
if not target_daypart_raw.isdigit():
return jsonify({"ok": False, "error": "Ungültige Tageszeit"}), 400
entry = get_db().execute(
f"""
SELECT plan_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_entries.id = ? AND {visible_clause('plan_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
return jsonify({"ok": False, "error": "Eintrag nicht gefunden"}), 404
try:
ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht verschieben.")
except PermissionError as exc:
return jsonify({"ok": False, "error": str(exc)}), 403
target_daypart_id = int(target_daypart_raw)
get_db().execute(
"UPDATE plan_entries SET plan_date = ?, daypart_id = ? WHERE id = ?",
(target_date.isoformat(), target_daypart_id, entry_id),
)
get_db().commit()
shopping_result = ensure_item_or_missing_components_are_shopped(
entry["item_id"],
g.user["id"],
entry["visibility"],
needed_for_date=target_date.isoformat(),
needed_for_daypart_id=target_daypart_id,
source_item_id=entry["item_id"],
)
if shopping_result["count"]:
flash("Fehlende Lebensmittel wurden nach dem Verschieben ergänzt.", "info")
elif shopping_result["scheduled_count"]:
flash("Fehlende Lebensmittel sind für den passenden Einkauf vorgemerkt.", "info")
return jsonify(
{
"ok": True,
"added_to_shopping": bool(shopping_result["count"]),
"redirect_url": url_for("main.planner", week=parse_week_start(target_date.isoformat()).isoformat()),
}
)
@main_bp.post("/planner/slot/copy-forward")
@login_required
def planner_slot_copy_forward():
source_date = parse_plan_date(request.form.get("source_date"))
target_date = source_date + timedelta(days=1)
daypart_raw = request.form.get("daypart_id", "").strip()
if not daypart_raw.isdigit():
flash("Die Tageszeit konnte nicht erkannt werden.", "error")
return redirect(url_for("main.planner", week=parse_week_start(source_date.isoformat()).isoformat()))
daypart_id = int(daypart_raw)
entries = get_db().execute(
f"""
SELECT plan_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_entries.plan_date = ? AND plan_entries.daypart_id = ? AND {visible_clause('plan_entries')}
ORDER BY plan_entries.id
""",
[source_date.isoformat(), daypart_id, *visible_params()],
).fetchall()
copied_count = 0
shopping_added = 0
shopping_scheduled = 0
for raw_entry in entries:
entry = describe_record(dict(raw_entry))
try:
ensure_can_edit(entry, "Diesen Planeintrag kannst du gerade nicht kopieren.")
except PermissionError:
continue
duplicate = get_db().execute(
"""
SELECT id
FROM plan_entries
WHERE household_id = ?
AND plan_date = ?
AND daypart_id = ?
AND item_id = ?
AND visibility = ?
AND COALESCE(note, '') = COALESCE(?, '')
LIMIT 1
""",
(
current_household_id(),
target_date.isoformat(),
daypart_id,
entry["item_id"],
entry["visibility"],
entry.get("note", ""),
),
).fetchone()
if duplicate:
continue
shopping_result = insert_plan_entry(
item_id=entry["item_id"],
daypart_id=daypart_id,
plan_date=target_date,
visibility=entry["visibility"],
note=entry.get("note", "") or "",
)
copied_count += 1
shopping_added += int(shopping_result["count"])
shopping_scheduled += int(shopping_result["scheduled_count"])
if copied_count == 0:
flash("Für diese Tageszeit gab es nichts Neues zum Kopieren.", "info")
else:
if shopping_added:
flash("Fehlende Lebensmittel wurden für den passenden Einkauf ergänzt.", "info")
elif shopping_scheduled:
flash("Fehlende Lebensmittel wurden für später vorgemerkt.", "info")
flash(f"{copied_count} Eintrag{' wurde' if copied_count == 1 else 'e wurden'} zum nächsten Tag kopiert.", "success")
return redirect(url_for("main.planner", week=parse_week_start(source_date.isoformat()).isoformat()))