Files
nouri-App/nouri/main.py
T

1397 lines
48 KiB
Python

from __future__ import annotations
import uuid
from collections import defaultdict
from datetime import date, datetime, timedelta
from pathlib import Path
from flask import (
Blueprint,
current_app,
flash,
g,
jsonify,
redirect,
render_template,
request,
url_for,
)
from werkzeug.utils import secure_filename
from .auth import login_required
from .constants import (
AVAILABILITY_LABELS,
CATEGORIES,
ITEM_KIND_LABELS,
ITEM_KIND_SINGULAR_LABELS,
VISIBILITY_DESCRIPTIONS,
VISIBILITY_LABELS,
)
from .db import get_db
main_bp = Blueprint("main", __name__)
ALLOWED_IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
ACTIVE_STATE_OPTIONS = [
("", "Alle aktiven"),
("home", "Zuhause"),
("idea", "Merkliste"),
]
KIND_FILTER_OPTIONS = [
("", "Alles"),
("food", "Lebensmittel"),
("meal", "Mahlzeitenideen"),
]
VISIBILITY_FILTER_OPTIONS = [
("", "Alles Sichtbare"),
("shared", "Für alle"),
("personal", "Persönlich"),
]
VISIBILITY_FORM_OPTIONS = [
("shared", "Für alle"),
("personal", "Persönlich"),
]
def get_dayparts() -> list:
return get_db().execute("SELECT * FROM dayparts ORDER BY sort_order").fetchall()
def parse_week_start(raw: str | None) -> date:
if raw:
try:
parsed = datetime.strptime(raw, "%Y-%m-%d").date()
return parsed - timedelta(days=parsed.weekday())
except ValueError:
pass
today = date.today()
return today - timedelta(days=today.weekday())
def parse_plan_date(raw: str | None, fallback: date | None = None) -> date:
if raw:
try:
return datetime.strptime(raw, "%Y-%m-%d").date()
except ValueError:
pass
return fallback or date.today()
def normalize_visibility(raw: str | None, default: str = "shared") -> str:
if raw in VISIBILITY_LABELS:
return raw
return default
def current_household_id() -> int:
return int(g.user["household_id"])
def visible_clause(table_alias: str) -> str:
return (
f"{table_alias}.household_id = ? "
f"AND ({table_alias}.visibility = 'shared' OR {table_alias}.owner_user_id = ?)"
)
def visible_params() -> list[int]:
return [current_household_id(), int(g.user["id"])]
def user_display_name(display_name: str | None, username: str | None) -> str:
return display_name or username or "Haushalt"
def describe_record(entry: dict) -> dict:
owner_name = user_display_name(entry.get("owner_display_name"), entry.get("owner_username"))
entry["owner_name"] = owner_name
entry["is_personal"] = entry.get("visibility") == "personal"
entry["is_shared"] = entry.get("visibility") == "shared"
entry["is_mine"] = entry.get("owner_user_id") == g.user["id"]
entry["visibility_label"] = VISIBILITY_LABELS.get(entry.get("visibility"), "Für alle")
entry["visibility_description"] = VISIBILITY_DESCRIPTIONS.get(entry.get("visibility"), "")
entry["owner_label"] = "Von mir" if entry["is_mine"] else f"Von {owner_name}"
entry["context_label"] = "Gemeinsam" if entry["is_shared"] else "Persönlich"
entry["can_edit"] = entry["is_shared"] or entry["is_mine"] or g.user["role"] == "admin"
return entry
def describe_records(rows) -> list[dict]:
return [describe_record(dict(row)) for row in rows]
def ensure_can_edit(entry: dict, error_message: str = "Diesen Eintrag kannst du gerade nicht bearbeiten.") -> None:
if not (entry.get("can_edit") or g.user["role"] == "admin"):
raise PermissionError(error_message)
def allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_IMAGE_EXTENSIONS
def save_photo(upload, current_filename: str | None = None) -> str | None:
if not upload or not upload.filename:
return current_filename
if not allowed_file(upload.filename):
raise ValueError("Bitte ein Bild als PNG, JPG, GIF oder WEBP hochladen.")
original_name = secure_filename(upload.filename)
extension = original_name.rsplit(".", 1)[1].lower()
filename = f"{uuid.uuid4().hex}.{extension}"
destination = Path(current_app.config["UPLOAD_FOLDER"]) / filename
upload.save(destination)
if current_filename:
old_path = Path(current_app.config["UPLOAD_FOLDER"]) / current_filename
if old_path.exists():
old_path.unlink()
return filename
def get_item(item_id: int) -> dict:
item = get_db().execute(
f"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
EXISTS(
SELECT 1
FROM shopping_entries
WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0
) AS is_on_shopping_list
FROM items
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
WHERE items.id = ? AND {visible_clause('items')}
""",
[item_id, *visible_params()],
).fetchone()
if item is None:
raise ValueError("Der Eintrag wurde nicht gefunden.")
return describe_record(dict(item))
def get_item_daypart_ids(item_id: int) -> list[int]:
rows = get_db().execute(
"SELECT daypart_id FROM item_dayparts WHERE item_id = ?",
(item_id,),
).fetchall()
return [int(row["daypart_id"]) for row in rows]
def get_meal_component_ids(meal_id: int) -> list[int]:
rows = get_db().execute(
"""
SELECT meal_components.food_item_id
FROM meal_components
JOIN items ON items.id = meal_components.food_item_id
WHERE meal_components.meal_item_id = ?
AND items.household_id = ?
AND (items.visibility = 'shared' OR items.owner_user_id = ?)
""",
(meal_id, current_household_id(), g.user["id"]),
).fetchall()
return [int(row["food_item_id"]) for row in rows]
def attach_dayparts(items: list[dict]) -> list[dict]:
if not items:
return []
item_ids = [item["id"] for item in items]
placeholders = ",".join("?" for _ in item_ids)
rows = get_db().execute(
f"""
SELECT item_dayparts.item_id, dayparts.id, dayparts.slug, dayparts.name
FROM item_dayparts
JOIN dayparts ON dayparts.id = item_dayparts.daypart_id
WHERE item_dayparts.item_id IN ({placeholders})
ORDER BY dayparts.sort_order
""",
item_ids,
).fetchall()
grouped = defaultdict(list)
for row in rows:
grouped[row["item_id"]].append(
{
"id": row["id"],
"slug": row["slug"],
"name": row["name"],
}
)
enriched = []
for item in items:
item["dayparts_meta"] = grouped.get(item["id"], [])
item["dayparts"] = [daypart["name"] for daypart in item["dayparts_meta"]]
item["primary_daypart_id"] = item["dayparts_meta"][0]["id"] if item["dayparts_meta"] else None
enriched.append(item)
return enriched
def attach_components(items: list[dict]) -> list[dict]:
meal_ids = [item["id"] for item in items if item["kind"] == "meal"]
if not meal_ids:
for item in items:
item["components"] = []
return items
placeholders = ",".join("?" for _ in meal_ids)
rows = get_db().execute(
f"""
SELECT meal_components.meal_item_id, items.name
FROM meal_components
JOIN items ON items.id = meal_components.food_item_id
WHERE meal_components.meal_item_id IN ({placeholders})
AND items.household_id = ?
AND (items.visibility = 'shared' OR items.owner_user_id = ?)
ORDER BY LOWER(items.name)
""",
[*meal_ids, current_household_id(), g.user["id"]],
).fetchall()
grouped = defaultdict(list)
for row in rows:
grouped[row["meal_item_id"]].append(row["name"])
for item in items:
item["components"] = grouped.get(item["id"], [])
return items
def fetch_items(
*,
kind: str | None = None,
availability: str | None = None,
include_archived: bool = False,
query: str | None = None,
daypart_id: int | None = None,
visibility: str | None = None,
):
database = get_db()
conditions = [visible_clause("items")]
params = visible_params()
if kind:
conditions.append("items.kind = ?")
params.append(kind)
if availability:
conditions.append("items.availability_state = ?")
params.append(availability)
elif not include_archived:
conditions.append("items.availability_state != 'archived'")
if query:
conditions.append("LOWER(items.name) LIKE ?")
params.append(f"%{query.lower()}%")
if daypart_id:
conditions.append(
"""
EXISTS (
SELECT 1
FROM item_dayparts
WHERE item_dayparts.item_id = items.id
AND item_dayparts.daypart_id = ?
)
"""
)
params.append(daypart_id)
if visibility:
conditions.append("items.visibility = ?")
params.append(visibility)
rows = database.execute(
f"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
EXISTS(
SELECT 1
FROM shopping_entries
WHERE shopping_entries.item_id = items.id AND shopping_entries.is_checked = 0
) AS is_on_shopping_list
FROM items
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
WHERE {' AND '.join(conditions)}
ORDER BY
CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END,
CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END,
LOWER(items.name)
""",
params,
).fetchall()
return attach_components(attach_dayparts(describe_records(rows)))
def fetch_food_options():
return fetch_items(kind="food", include_archived=True)
def group_items_by_availability(items: list[dict]) -> list[dict]:
grouped = defaultdict(list)
for item in items:
grouped[item["availability_state"]].append(item)
ordered_states = ["home", "idea", "archived"]
result = []
for state in ordered_states:
entries = grouped.get(state, [])
if entries:
result.append(
{
"state": state,
"title": AVAILABILITY_LABELS[state],
"items": entries,
}
)
return result
def extract_item_form_data(existing: dict | None = None) -> dict:
form_data = existing or {}
form_data.update(
{
"name": request.form.get("name", "").strip(),
"category": request.form.get("category", "").strip(),
"note": request.form.get("note", "").strip(),
"visibility": normalize_visibility(request.form.get("visibility"), form_data.get("visibility", "shared")),
"daypart_ids": [int(value) for value in request.form.getlist("daypart_ids") if value.isdigit()],
"component_ids": [int(value) for value in request.form.getlist("component_ids") if value.isdigit()],
"quick_food_name": request.form.get("quick_food_name", "").strip(),
"quick_food_category": request.form.get("quick_food_category", "").strip(),
"quick_food_note": request.form.get("quick_food_note", "").strip(),
}
)
return form_data
def create_quick_food_from_form(form_data: dict) -> int:
database = get_db()
cursor = database.execute(
"""
INSERT INTO items (
household_id, owner_user_id, visibility, kind, name, category, note, created_by, updated_by
)
VALUES (?, ?, ?, 'food', ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["visibility"],
form_data["quick_food_name"],
form_data["quick_food_category"],
form_data["quick_food_note"],
g.user["id"],
g.user["id"],
),
)
food_id = int(cursor.lastrowid)
sync_item_dayparts(food_id, form_data["daypart_ids"])
database.commit()
return food_id
def add_to_shopping_list(item_id: int, user_id: int, visibility_override: str | None = None) -> bool:
database = get_db()
item = get_item(item_id)
existing = database.execute(
"""
SELECT id FROM shopping_entries
WHERE item_id = ? AND is_checked = 0
""",
(item_id,),
).fetchone()
if existing:
return False
visibility = normalize_visibility(visibility_override, item["visibility"])
owner_user_id = user_id if visibility == "personal" else item["owner_user_id"]
database.execute(
"""
INSERT INTO shopping_entries (household_id, owner_user_id, visibility, item_id, added_by)
VALUES (?, ?, ?, ?, ?)
""",
(current_household_id(), owner_user_id, visibility, item_id, user_id),
)
database.commit()
return True
def ensure_planned_item_is_shopped(item_id: int, user_id: int, visibility: str) -> bool:
item = get_item(item_id)
if item["availability_state"] == "home":
return False
return add_to_shopping_list(item_id, user_id, visibility_override=visibility)
def sync_item_dayparts(item_id: int, daypart_ids: list[int]) -> None:
database = get_db()
database.execute("DELETE FROM item_dayparts WHERE item_id = ?", (item_id,))
for daypart_id in daypart_ids:
database.execute(
"INSERT INTO item_dayparts (item_id, daypart_id) VALUES (?, ?)",
(item_id, daypart_id),
)
def sync_meal_components(meal_id: int, food_ids: list[int]) -> None:
database = get_db()
database.execute("DELETE FROM meal_components WHERE meal_item_id = ?", (meal_id,))
visible_foods = {
row["id"]
for row in database.execute(
f"""
SELECT items.id
FROM items
WHERE items.kind = 'food' AND {visible_clause('items')}
""",
visible_params(),
).fetchall()
}
for food_id in food_ids:
if food_id not in visible_foods:
continue
database.execute(
"""
INSERT INTO meal_components (meal_item_id, food_item_id)
VALUES (?, ?)
""",
(meal_id, food_id),
)
def fetch_shopping_entries():
rows = get_db().execute(
f"""
SELECT shopping_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
added_by_user.display_name AS added_by_display_name,
added_by_user.username AS added_by_username
FROM shopping_entries
JOIN items ON items.id = shopping_entries.item_id
LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id
LEFT JOIN users AS added_by_user ON added_by_user.id = shopping_entries.added_by
WHERE shopping_entries.is_checked = 0 AND {visible_clause('shopping_entries')}
ORDER BY
CASE shopping_entries.visibility WHEN 'shared' THEN 0 ELSE 1 END,
shopping_entries.added_at DESC
""",
visible_params(),
).fetchall()
return describe_records(rows)
def fetch_plan_entries_for_range(start_date: date, end_date: date):
rows = get_db().execute(
f"""
SELECT plan_entries.*,
items.name AS item_name,
items.kind AS item_kind,
items.photo_filename,
items.availability_state,
dayparts.name AS daypart_name,
dayparts.slug AS daypart_slug,
dayparts.sort_order,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
JOIN dayparts ON dayparts.id = plan_entries.daypart_id
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_date BETWEEN ? AND ? AND {visible_clause('plan_entries')}
ORDER BY plan_date, dayparts.sort_order, items.name
""",
[start_date.isoformat(), end_date.isoformat(), *visible_params()],
).fetchall()
grouped = defaultdict(list)
for row in describe_records(rows):
grouped[(row["plan_date"], row["daypart_id"])].append(row)
return grouped
def fetch_day_plan_entries(selected_date: date):
return fetch_plan_entries_for_range(selected_date, selected_date)
def fetch_recent_plan_items(daypart_id: int, limit: int = 6):
rows = get_db().execute(
f"""
SELECT DISTINCT items.id,
items.household_id,
items.owner_user_id,
items.visibility,
items.name,
items.kind,
items.category,
items.note,
items.photo_filename,
items.availability_state,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
JOIN items ON items.id = plan_entries.item_id
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
WHERE plan_entries.daypart_id = ? AND {visible_clause('items')}
ORDER BY plan_entries.created_at DESC
LIMIT ?
""",
[daypart_id, *visible_params(), limit * 3],
).fetchall()
return attach_components(attach_dayparts(describe_records(rows)))
def fetch_plan_candidates(daypart_id: int, query: str | None = None):
params = [daypart_id, *visible_params()]
conditions = [visible_clause("items"), "items.availability_state != 'archived'"]
if query:
conditions.append("LOWER(items.name) LIKE ?")
params.append(f"%{query.lower()}%")
rows = get_db().execute(
f"""
SELECT items.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username,
EXISTS(
SELECT 1
FROM item_dayparts
WHERE item_dayparts.item_id = items.id AND item_dayparts.daypart_id = ?
) AS matches_daypart
FROM items
LEFT JOIN users AS owner ON owner.id = items.owner_user_id
WHERE {' AND '.join(conditions)}
ORDER BY
CASE items.availability_state WHEN 'home' THEN 0 WHEN 'idea' THEN 1 ELSE 2 END,
matches_daypart DESC,
CASE items.visibility WHEN 'shared' THEN 0 ELSE 1 END,
LOWER(items.name)
""",
params,
).fetchall()
return attach_components(attach_dayparts(describe_records(rows)))
def build_home_sections(items: list[dict], dayparts: list, selected_daypart_id: int | None):
sections = []
if selected_daypart_id:
selected_daypart = next((daypart for daypart in dayparts if daypart["id"] == selected_daypart_id), None)
matching_items = [item for item in items if any(dp["id"] == selected_daypart_id for dp in item["dayparts_meta"])]
sections.append(
{
"title": selected_daypart["name"] if selected_daypart else "Ausgewählte Tageszeit",
"items": matching_items,
"slug": selected_daypart["slug"] if selected_daypart else "selected",
}
)
return sections
for daypart in dayparts:
matching_items = [item for item in items if any(dp["id"] == daypart["id"] for dp in item["dayparts_meta"])]
sections.append(
{
"title": daypart["name"],
"items": matching_items,
"slug": daypart["slug"],
}
)
anytime_items = [item for item in items if not item["dayparts_meta"]]
if anytime_items:
sections.append(
{
"title": "Ohne feste Tageszeit",
"items": anytime_items,
"slug": "anytime",
}
)
return sections
def dedupe_items(items: list[dict], limit: int = 6) -> list[dict]:
seen_ids = set()
result = []
for item in items:
if item["id"] in seen_ids:
continue
seen_ids.add(item["id"])
result.append(item)
if len(result) >= limit:
break
return result
def build_day_planner_sections(selected_date: date, selected_item_id: int | None, selected_daypart_id: int | None):
dayparts = get_dayparts()
day_entries = fetch_day_plan_entries(selected_date)
sections = []
for daypart in dayparts:
candidates = fetch_plan_candidates(daypart["id"])
home_candidates = [item for item in candidates if item["availability_state"] == "home"]
matching_candidates = [
item for item in candidates if any(meta["id"] == daypart["id"] for meta in item["dayparts_meta"])
]
recent_candidates = fetch_recent_plan_items(daypart["id"])
quick_items = dedupe_items(home_candidates + recent_candidates + matching_candidates, limit=6)
sections.append(
{
"daypart": daypart,
"entries": day_entries.get((selected_date.isoformat(), daypart["id"]), []),
"candidates": candidates,
"quick_items": quick_items,
"selected_item_id": selected_item_id if selected_daypart_id == daypart["id"] else None,
"is_open": selected_daypart_id == daypart["id"],
"summary_items": [entry["item_name"] for entry in day_entries.get((selected_date.isoformat(), daypart["id"]), [])][:2],
"default_visibility": "shared",
}
)
return sections
def fetch_week_cards(week_start: date):
days = [week_start + timedelta(days=index) for index in range(7)]
dayparts = get_dayparts()
grouped_entries = fetch_plan_entries_for_range(week_start, week_start + timedelta(days=6))
cards = []
for current_day in days:
filled_dayparts = []
planned_count = 0
preview_items = []
slots = []
for daypart in dayparts:
slot_entries = grouped_entries.get((current_day.isoformat(), daypart["id"]), [])
slots.append({"daypart": dict(daypart), "entries": slot_entries})
if slot_entries:
filled_dayparts.append(
{
"id": daypart["id"],
"name": daypart["name"],
"count": len(slot_entries),
}
)
planned_count += len(slot_entries)
preview_items.extend(entry["item_name"] for entry in slot_entries[:2])
cards.append(
{
"date": current_day,
"filled_dayparts": filled_dayparts,
"planned_count": planned_count,
"preview_items": preview_items[:4],
"slots": slots,
}
)
return cards
def count_visible_items(availability_state: str) -> int:
row = get_db().execute(
f"SELECT COUNT(*) AS count FROM items WHERE availability_state = ? AND {visible_clause('items')}",
[availability_state, *visible_params()],
).fetchone()
return int(row["count"])
@main_bp.get("/")
@login_required
def dashboard():
today = date.today()
home_count = count_visible_items("home")
archive_count = count_visible_items("archived")
shopping_count = get_db().execute(
f"SELECT COUNT(*) AS count FROM shopping_entries WHERE is_checked = 0 AND {visible_clause('shopping_entries')}",
visible_params(),
).fetchone()["count"]
today_entries = []
for entries in fetch_day_plan_entries(today).values():
today_entries.extend(entries)
today_entries.sort(key=lambda entry: (entry["sort_order"], entry["item_name"].lower()))
week_cards = fetch_week_cards(today - timedelta(days=today.weekday()))
home_items = fetch_items(availability="home")
return render_template(
"dashboard.html",
home_count=home_count,
shopping_count=shopping_count,
archive_count=archive_count,
today_entries=today_entries,
home_items=home_items[:8],
today=today,
week_cards=week_cards[:3],
)
@main_bp.get("/more")
@login_required
def more_view():
return render_template("more.html")
@main_bp.route("/items/<kind>")
@login_required
def item_list(kind: str):
if kind not in ITEM_KIND_LABELS:
return redirect(url_for("main.dashboard"))
query = request.args.get("q", "").strip()
state = request.args.get("state", "").strip()
scope = request.args.get("visibility", "").strip()
raw_daypart_id = request.args.get("daypart_id", "").strip()
daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None
items = fetch_items(
kind=kind,
availability=state or None,
query=query or None,
daypart_id=daypart_id,
visibility=scope or None,
)
return render_template(
"items/list.html",
kind=kind,
items=items,
availability_labels=AVAILABILITY_LABELS,
query=query,
selected_state=state,
selected_visibility=scope,
selected_daypart_id=daypart_id,
dayparts=get_dayparts(),
state_options=ACTIVE_STATE_OPTIONS,
visibility_options=VISIBILITY_FILTER_OPTIONS,
today=date.today(),
)
@main_bp.route("/items/<kind>/new", methods=("GET", "POST"))
@login_required
def item_create(kind: str):
if kind not in ITEM_KIND_LABELS:
return redirect(url_for("main.dashboard"))
database = get_db()
dayparts = get_dayparts()
foods = fetch_food_options()
food_groups = group_items_by_availability(foods)
form_data = {
"name": "",
"category": "",
"note": "",
"visibility": "shared",
"daypart_ids": [],
"component_ids": [],
"quick_food_name": "",
"quick_food_category": "",
"quick_food_note": "",
}
if request.method == "POST":
form_action = request.form.get("form_action", "save_item")
form_data = extract_item_form_data(form_data)
name = form_data["name"]
if kind == "meal" and form_action == "quick_add_food":
if not form_data["quick_food_name"]:
flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error")
else:
new_food_id = create_quick_food_from_form(form_data)
if new_food_id not in form_data["component_ids"]:
form_data["component_ids"].append(new_food_id)
form_data["component_ids"] = sorted(form_data["component_ids"])
form_data["quick_food_name"] = ""
form_data["quick_food_category"] = ""
form_data["quick_food_note"] = ""
foods = fetch_food_options()
food_groups = group_items_by_availability(foods)
flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success")
return render_template(
"items/form.html",
kind=kind,
item=None,
dayparts=dayparts,
food_groups=food_groups,
categories=CATEGORIES,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
)
error = None
if not name:
error = "Bitte einen Namen eintragen."
photo_filename = None
if error is None:
try:
photo_filename = save_photo(request.files.get("photo"))
except ValueError as exc:
error = str(exc)
if error is None:
cursor = database.execute(
"""
INSERT INTO items (
household_id, owner_user_id, visibility, kind, name, category, note, photo_filename, created_by, updated_by
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
form_data["visibility"],
kind,
name,
form_data["category"],
form_data["note"],
photo_filename,
g.user["id"],
g.user["id"],
),
)
item_id = int(cursor.lastrowid)
sync_item_dayparts(item_id, form_data["daypart_ids"])
if kind == "meal":
sync_meal_components(item_id, form_data["component_ids"])
database.commit()
flash(f"{ITEM_KIND_SINGULAR_LABELS[kind]} wurde angelegt.", "success")
return redirect(url_for("main.item_list", kind=kind))
flash(error, "error")
return render_template(
"items/form.html",
kind=kind,
item=None,
dayparts=dayparts,
food_groups=food_groups,
categories=CATEGORIES,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
)
@main_bp.route("/items/<int:item_id>/edit", methods=("GET", "POST"))
@login_required
def item_edit(item_id: int):
database = get_db()
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.dashboard"))
kind = item["kind"]
dayparts = get_dayparts()
foods = fetch_food_options()
food_groups = group_items_by_availability(foods)
form_data = {
"name": item["name"],
"category": item["category"] or "",
"note": item["note"] or "",
"visibility": item["visibility"],
"daypart_ids": get_item_daypart_ids(item_id),
"component_ids": get_meal_component_ids(item_id) if kind == "meal" else [],
"quick_food_name": "",
"quick_food_category": "",
"quick_food_note": "",
}
if request.method == "POST":
form_action = request.form.get("form_action", "save_item")
form_data = extract_item_form_data(form_data)
name = form_data["name"]
if kind == "meal" and form_action == "quick_add_food":
if not form_data["quick_food_name"]:
flash("Bitte einen Namen für das neue Lebensmittel eintragen.", "error")
else:
new_food_id = create_quick_food_from_form(form_data)
if new_food_id not in form_data["component_ids"]:
form_data["component_ids"].append(new_food_id)
form_data["component_ids"] = sorted(form_data["component_ids"])
form_data["quick_food_name"] = ""
form_data["quick_food_category"] = ""
form_data["quick_food_note"] = ""
foods = fetch_food_options()
food_groups = group_items_by_availability(foods)
flash("Das neue Lebensmittel wurde angelegt und direkt zur Mahlzeitenidee hinzugefügt.", "success")
return render_template(
"items/form.html",
kind=kind,
item=item,
dayparts=dayparts,
food_groups=food_groups,
categories=CATEGORIES,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
)
error = None
if not name:
error = "Bitte einen Namen eintragen."
photo_filename = item["photo_filename"]
if error is None:
try:
photo_filename = save_photo(request.files.get("photo"), current_filename=item["photo_filename"])
except ValueError as exc:
error = str(exc)
if error is None:
database.execute(
"""
UPDATE items
SET name = ?,
category = ?,
note = ?,
visibility = ?,
photo_filename = ?,
updated_by = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
name,
form_data["category"],
form_data["note"],
form_data["visibility"],
photo_filename,
g.user["id"],
item_id,
),
)
sync_item_dayparts(item_id, form_data["daypart_ids"])
if kind == "meal":
sync_meal_components(item_id, form_data["component_ids"])
database.commit()
flash("Der Eintrag wurde aktualisiert.", "success")
return redirect(url_for("main.item_list", kind=kind))
flash(error, "error")
return render_template(
"items/form.html",
kind=kind,
item=item,
dayparts=dayparts,
food_groups=food_groups,
categories=CATEGORIES,
form_data=form_data,
visibility_options=VISIBILITY_FORM_OPTIONS,
)
@main_bp.post("/items/<int:item_id>/shopping")
@login_required
def item_add_to_shopping(item_id: int):
try:
item = get_item(item_id)
except ValueError as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.shopping_list"))
added = add_to_shopping_list(item_id, g.user["id"], visibility_override=item["visibility"])
if added:
flash(f"{item['name']} steht jetzt auf der Einkaufsliste.", "success")
else:
flash(f"{item['name']} ist bereits auf der Einkaufsliste.", "info")
return redirect(request.referrer or url_for("main.shopping_list"))
@main_bp.post("/items/<int:item_id>/set-home")
@login_required
def item_set_home(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.home_view"))
get_db().execute(
"""
UPDATE items
SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item_id),
)
get_db().commit()
flash(f"{item['name']} ist jetzt unter Zuhause sichtbar.", "success")
return redirect(request.referrer or url_for("main.home_view"))
@main_bp.post("/items/<int:item_id>/archive")
@login_required
def item_archive(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.archive_view"))
get_db().execute(
"""
UPDATE items
SET availability_state = 'archived', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item_id),
)
get_db().commit()
flash(f"{item['name']} liegt jetzt im Archiv und bleibt später leicht wiederfindbar.", "info")
return redirect(request.referrer or url_for("main.archive_view"))
@main_bp.post("/items/<int:item_id>/restore")
@login_required
def item_restore(item_id: int):
try:
item = get_item(item_id)
ensure_can_edit(item)
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(request.referrer or url_for("main.archive_view"))
get_db().execute(
"""
UPDATE items
SET availability_state = 'idea', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item_id),
)
get_db().commit()
flash(f"{item['name']} ist wieder in der aktiven Liste.", "success")
return redirect(request.referrer or url_for("main.archive_view"))
@main_bp.route("/shopping", methods=("GET", "POST"))
@login_required
def shopping_list():
database = get_db()
if request.method == "POST":
selected_item_id = request.form.get("item_id", "").strip()
if not selected_item_id or not selected_item_id.isdigit():
flash("Bitte zuerst etwas auswählen.", "error")
else:
try:
item = get_item(int(selected_item_id))
added = add_to_shopping_list(item["id"], g.user["id"], visibility_override=item["visibility"])
if added:
flash(f"{item['name']} wurde auf die Einkaufsliste gesetzt.", "success")
else:
flash(f"{item['name']} ist bereits auf der Einkaufsliste.", "info")
except ValueError as exc:
flash(str(exc), "error")
return redirect(url_for("main.shopping_list"))
entries = fetch_shopping_entries()
addable_items = fetch_items(include_archived=False)
addable_items = [item for item in addable_items if not item["is_on_shopping_list"]]
return render_template("shopping/list.html", entries=entries, addable_items=addable_items)
@main_bp.post("/shopping/<int:entry_id>/check")
@login_required
def shopping_check(entry_id: int):
entry = get_db().execute(
f"""
SELECT shopping_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM shopping_entries
LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id
WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Einkaufseintrag wurde nicht gefunden.", "error")
return redirect(url_for("main.shopping_list"))
entry_dict = describe_record(dict(entry))
try:
ensure_can_edit(entry_dict, "Diesen Einkaufseintrag kannst du gerade nicht ändern.")
item = get_item(entry["item_id"])
except (ValueError, PermissionError) as exc:
flash(str(exc), "error")
return redirect(url_for("main.shopping_list"))
database = get_db()
database.execute(
"""
UPDATE shopping_entries
SET is_checked = 1, checked_at = CURRENT_TIMESTAMP, checked_by = ?
WHERE id = ?
""",
(g.user["id"], entry_id),
)
database.execute(
"""
UPDATE items
SET availability_state = 'home', updated_by = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(g.user["id"], item["id"]),
)
database.commit()
flash(f"{item['name']} ist jetzt als Zuhause vorhanden markiert.", "success")
return redirect(url_for("main.shopping_list"))
@main_bp.post("/shopping/<int:entry_id>/remove")
@login_required
def shopping_remove(entry_id: int):
entry = get_db().execute(
f"""
SELECT shopping_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM shopping_entries
LEFT JOIN users AS owner ON owner.id = shopping_entries.owner_user_id
WHERE shopping_entries.id = ? AND {visible_clause('shopping_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Eintrag wurde nicht gefunden.", "error")
return redirect(url_for("main.shopping_list"))
entry_dict = describe_record(dict(entry))
try:
ensure_can_edit(entry_dict, "Diesen Einkaufseintrag kannst du gerade nicht entfernen.")
except PermissionError as exc:
flash(str(exc), "error")
return redirect(url_for("main.shopping_list"))
get_db().execute("DELETE FROM shopping_entries WHERE id = ?", (entry_id,))
get_db().commit()
flash("Der Eintrag wurde von der Einkaufsliste entfernt.", "info")
return redirect(url_for("main.shopping_list"))
@main_bp.get("/home")
@login_required
def home_view():
query = request.args.get("q", "").strip()
scope = request.args.get("visibility", "").strip()
raw_daypart_id = request.args.get("daypart_id", "").strip()
daypart_id = int(raw_daypart_id) if raw_daypart_id.isdigit() else None
dayparts = get_dayparts()
items = fetch_items(
availability="home",
query=query or None,
daypart_id=daypart_id,
visibility=scope or None,
)
sections = build_home_sections(items, dayparts, daypart_id)
return render_template(
"home/list.html",
sections=sections,
query=query,
dayparts=dayparts,
selected_daypart_id=daypart_id,
selected_visibility=scope,
visibility_options=VISIBILITY_FILTER_OPTIONS,
today=date.today(),
)
@main_bp.get("/archive")
@login_required
def archive_view():
query = request.args.get("q", "").strip()
selected_kind = request.args.get("kind", "").strip()
selected_visibility = request.args.get("visibility", "").strip()
kind = selected_kind if selected_kind in ITEM_KIND_LABELS else None
items = fetch_items(
kind=kind,
availability="archived",
include_archived=True,
query=query or None,
visibility=selected_visibility or None,
)
return render_template(
"archive/list.html",
items=items,
query=query,
selected_kind=selected_kind,
selected_visibility=selected_visibility,
kind_options=KIND_FILTER_OPTIONS,
visibility_options=VISIBILITY_FILTER_OPTIONS,
today=date.today(),
)
@main_bp.get("/planner")
@login_required
def planner():
week_start = parse_week_start(request.args.get("week"))
return render_template(
"planner/week.html",
week_start=week_start,
week_end=week_start + timedelta(days=6),
prev_week=week_start - timedelta(days=7),
next_week=week_start + timedelta(days=7),
week_cards=fetch_week_cards(week_start),
today=date.today(),
)
@main_bp.route("/planner/day", methods=("GET", "POST"))
@login_required
def planner_day():
selected_date = parse_plan_date(request.values.get("date"))
if request.method == "POST":
item_id_raw = request.form.get("item_id", "").strip()
daypart_id_raw = request.form.get("daypart_id", "").strip()
note = request.form.get("note", "").strip()
selected_date = parse_plan_date(request.form.get("plan_date"))
visibility = normalize_visibility(request.form.get("visibility"), "shared")
error = None
if not item_id_raw or not item_id_raw.isdigit():
error = "Bitte etwas für den Tagesplan auswählen."
elif not daypart_id_raw or not daypart_id_raw.isdigit():
error = "Bitte eine Tageszeit auswählen."
if error is None:
item_id = int(item_id_raw)
daypart_id = int(daypart_id_raw)
try:
item = get_item(item_id)
except ValueError as exc:
error = str(exc)
if error is None:
get_db().execute(
"""
INSERT INTO plan_entries (household_id, owner_user_id, visibility, plan_date, daypart_id, item_id, note, created_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
current_household_id(),
g.user["id"],
visibility,
selected_date.isoformat(),
daypart_id,
item_id,
note,
g.user["id"],
),
)
get_db().commit()
if ensure_planned_item_is_shopped(item_id, g.user["id"], visibility):
flash("Der Eintrag ist noch nicht zuhause und wurde zusätzlich auf die Einkaufsliste gesetzt.", "info")
flash("Der Eintrag wurde in den Tagesplan gelegt.", "success")
return redirect(
f"{url_for('main.planner_day', date=selected_date.isoformat(), daypart_id=daypart_id)}#daypart-{daypart_id}"
)
flash(error, "error")
selected_item_raw = request.args.get("item_id", "").strip()
selected_daypart_raw = request.args.get("daypart_id", "").strip()
selected_item_id = int(selected_item_raw) if selected_item_raw.isdigit() else None
selected_daypart_id = int(selected_daypart_raw) if selected_daypart_raw.isdigit() else None
sections = build_day_planner_sections(selected_date, selected_item_id, selected_daypart_id)
return render_template(
"planner/day.html",
selected_date=selected_date,
previous_day=selected_date - timedelta(days=1),
next_day=selected_date + timedelta(days=1),
sections=sections,
today=date.today(),
visibility_options=VISIBILITY_FORM_OPTIONS,
)
@main_bp.post("/planner/<int:entry_id>/remove")
@login_required
def planner_remove(entry_id: int):
selected_date = request.args.get("date", "")
entry = get_db().execute(
f"""
SELECT plan_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_entries.id = ? AND {visible_clause('plan_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
flash("Der Planeintrag wurde nicht gefunden.", "error")
else:
try:
ensure_can_edit(describe_record(dict(entry)), "Diesen Planeintrag kannst du gerade nicht entfernen.")
get_db().execute("DELETE FROM plan_entries WHERE id = ?", (entry_id,))
get_db().commit()
flash("Der Planeintrag wurde entfernt.", "info")
except PermissionError as exc:
flash(str(exc), "error")
if selected_date:
return redirect(url_for("main.planner_day", date=selected_date))
return redirect(url_for("main.planner"))
@main_bp.post("/planner/<int:entry_id>/move")
@login_required
def planner_move(entry_id: int):
target_date = parse_plan_date(request.form.get("target_date"))
target_daypart_raw = request.form.get("target_daypart_id", "").strip()
if not target_daypart_raw.isdigit():
return jsonify({"ok": False, "error": "Ungültige Tageszeit"}), 400
database = get_db()
entry = database.execute(
f"""
SELECT plan_entries.*,
owner.display_name AS owner_display_name,
owner.username AS owner_username
FROM plan_entries
LEFT JOIN users AS owner ON owner.id = plan_entries.owner_user_id
WHERE plan_entries.id = ? AND {visible_clause('plan_entries')}
""",
[entry_id, *visible_params()],
).fetchone()
if entry is None:
return jsonify({"ok": False, "error": "Eintrag nicht gefunden"}), 404
entry_dict = describe_record(dict(entry))
try:
ensure_can_edit(entry_dict, "Diesen Planeintrag kannst du gerade nicht verschieben.")
except PermissionError as exc:
return jsonify({"ok": False, "error": str(exc)}), 403
target_daypart_id = int(target_daypart_raw)
database.execute(
"""
UPDATE plan_entries
SET plan_date = ?, daypart_id = ?
WHERE id = ?
""",
(target_date.isoformat(), target_daypart_id, entry_id),
)
database.commit()
was_added_to_shopping = ensure_planned_item_is_shopped(entry["item_id"], g.user["id"], entry["visibility"])
if was_added_to_shopping:
flash("Der verschobene Eintrag ist noch nicht zuhause und wurde auf die Einkaufsliste gesetzt.", "info")
return jsonify(
{
"ok": True,
"added_to_shopping": was_added_to_shopping,
"redirect_url": url_for("main.planner", week=parse_week_start(target_date.isoformat()).isoformat()),
}
)