from __future__ import annotations import functools import secrets from flask import ( Blueprint, flash, g, redirect, render_template, request, session, url_for, ) from markupsafe import Markup from werkzeug.security import check_password_hash, generate_password_hash from .db import get_db, user_count auth_bp = Blueprint("auth", __name__, url_prefix="/auth") def login_required(view): @functools.wraps(view) def wrapped_view(**kwargs): if g.user is None: return redirect(url_for("auth.login")) return view(**kwargs) return wrapped_view def ensure_csrf_token() -> str: token = session.get("_csrf_token") if not token: token = session["_csrf_token"] = secrets.token_hex(24) return token @auth_bp.app_context_processor def inject_csrf_input(): return { "csrf_input": lambda: Markup( f'' ), "csrf_token_value": ensure_csrf_token(), } @auth_bp.before_app_request def load_logged_in_user(): user_id = session.get("user_id") if user_id is None: g.user = None else: g.user = get_db().execute( "SELECT * FROM users WHERE id = ?", (user_id,), ).fetchone() endpoint = request.endpoint or "" if user_count() == 0 and endpoint not in {"auth.setup", "static", "uploaded_file"}: return redirect(url_for("auth.setup")) if request.method == "POST" and endpoint != "static": token = session.get("_csrf_token") form_token = request.form.get("csrf_token") if not token or token != form_token: flash("Die Sitzung muss kurz neu geladen werden. Bitte versuche es noch einmal.", "error") return redirect(request.referrer or url_for("main.dashboard")) @auth_bp.route("/setup", methods=("GET", "POST")) def setup(): if user_count() > 0: return redirect(url_for("auth.login")) if request.method == "POST": username = request.form.get("username", "").strip().lower() display_name = request.form.get("display_name", "").strip() password = request.form.get("password", "") password_repeat = request.form.get("password_repeat", "") error = None if not username: error = "Bitte einen Benutzernamen eintragen." elif not password: error = "Bitte ein Passwort vergeben." elif password != password_repeat: error = "Die Passwörter stimmen nicht überein." if error is None: database = get_db() database.execute( """ INSERT INTO users (username, display_name, password_hash) VALUES (?, ?, ?) """, (username, display_name, generate_password_hash(password)), ) database.commit() flash("Der erste Haushalt-Zugang ist angelegt. Du kannst dich jetzt anmelden.", "success") return redirect(url_for("auth.login")) flash(error, "error") return render_template("auth/setup.html") @auth_bp.route("/login", methods=("GET", "POST")) def login(): if user_count() == 0: return redirect(url_for("auth.setup")) if request.method == "POST": username = request.form.get("username", "").strip().lower() password = request.form.get("password", "") remember_me = request.form.get("remember_me") == "1" database = get_db() user = database.execute( "SELECT * FROM users WHERE username = ?", (username,), ).fetchone() error = None if user is None or not check_password_hash(user["password_hash"], password): error = "Benutzername oder Passwort passen nicht zusammen." if error is None: session.clear() # Opt-in long-lived session so the shared household device stays low-friction. session.permanent = remember_me session["user_id"] = user["id"] ensure_csrf_token() return redirect(url_for("main.dashboard")) flash(error, "error") return render_template("auth/login.html") @auth_bp.post("/logout") def logout(): session.clear() flash("Du bist abgemeldet.", "info") return redirect(url_for("auth.login"))