from __future__ import annotations import functools import secrets from flask import ( Blueprint, flash, g, redirect, render_template, request, session, url_for, ) from markupsafe import Markup from werkzeug.security import check_password_hash, generate_password_hash from .constants import ROLE_LABELS from .db import active_admin_count, ensure_default_household, get_db, user_count auth_bp = Blueprint("auth", __name__, url_prefix="/auth") def login_required(view): @functools.wraps(view) def wrapped_view(**kwargs): if g.user is None: return redirect(url_for("auth.login")) return view(**kwargs) return wrapped_view def admin_required(view): @functools.wraps(view) def wrapped_view(**kwargs): if g.user is None: return redirect(url_for("auth.login")) if g.user["role"] != "admin": flash("Dieser Bereich ist für Admins gedacht.", "error") return redirect(url_for("main.dashboard")) return view(**kwargs) return wrapped_view def ensure_csrf_token() -> str: token = session.get("_csrf_token") if not token: token = session["_csrf_token"] = secrets.token_hex(24) return token def normalize_login_value(raw: str) -> str: return raw.strip().lower() def validate_identity_fields(database, username: str, email: str | None, current_user_id: int | None = None) -> str | None: if not username: return "Bitte einen Benutzernamen eintragen." existing_user = database.execute( "SELECT id FROM users WHERE LOWER(username) = LOWER(?)", (username,), ).fetchone() if existing_user and int(existing_user["id"]) != current_user_id: return "Dieser Benutzername ist bereits vergeben." if email: existing_email = database.execute( "SELECT id FROM users WHERE LOWER(email) = LOWER(?)", (email,), ).fetchone() if existing_email and int(existing_email["id"]) != current_user_id: return "Diese E-Mail-Adresse wird bereits verwendet." return None @auth_bp.app_context_processor def inject_csrf_input(): return { "csrf_input": lambda: Markup( f'' ), "csrf_token_value": ensure_csrf_token(), } @auth_bp.before_app_request def load_logged_in_user(): user_id = session.get("user_id") if user_id is None: g.user = None else: g.user = get_db().execute( """ SELECT users.*, households.name AS household_name FROM users LEFT JOIN households ON households.id = users.household_id WHERE users.id = ? """, (user_id,), ).fetchone() if g.user is not None and not g.user["is_active"]: session.clear() g.user = None endpoint = request.endpoint or "" if user_count() == 0 and endpoint not in {"auth.setup", "static", "uploaded_file"}: return redirect(url_for("auth.setup")) if request.method == "POST" and endpoint != "static": token = session.get("_csrf_token") form_token = request.form.get("csrf_token") if not token or token != form_token: flash("Die Sitzung muss kurz neu geladen werden. Bitte versuche es noch einmal.", "error") return redirect(request.referrer or url_for("main.dashboard")) @auth_bp.route("/setup", methods=("GET", "POST")) def setup(): if user_count() > 0: return redirect(url_for("auth.login")) if request.method == "POST": household_name = request.form.get("household_name", "").strip() or "Unser Haushalt" username = normalize_login_value(request.form.get("username", "")) email = normalize_login_value(request.form.get("email", "")) or None display_name = request.form.get("display_name", "").strip() password = request.form.get("password", "") password_repeat = request.form.get("password_repeat", "") database = get_db() error = validate_identity_fields(database, username, email) if error is None and not password: error = "Bitte ein Passwort vergeben." elif error is None and password != password_repeat: error = "Die Passwörter stimmen nicht überein." if error is None: database.execute( "INSERT INTO households (name) VALUES (?)", (household_name,), ) household_id = ensure_default_household(database) database.execute( """ INSERT INTO users (household_id, username, email, display_name, role, password_hash) VALUES (?, ?, ?, ?, 'admin', ?) """, (household_id, username, email, display_name, generate_password_hash(password)), ) database.commit() flash("Der erste Haushalt-Zugang ist angelegt. Du kannst dich jetzt anmelden.", "success") return redirect(url_for("auth.login")) flash(error, "error") return render_template("auth/setup.html") @auth_bp.route("/login", methods=("GET", "POST")) def login(): if user_count() == 0: return redirect(url_for("auth.setup")) if request.method == "POST": identity = normalize_login_value(request.form.get("username", "")) password = request.form.get("password", "") remember_me = request.form.get("remember_me") == "1" database = get_db() user = database.execute( """ SELECT users.*, households.name AS household_name FROM users LEFT JOIN households ON households.id = users.household_id WHERE LOWER(users.username) = LOWER(?) OR LOWER(COALESCE(users.email, '')) = LOWER(?) """, (identity, identity), ).fetchone() error = None if user is None or not check_password_hash(user["password_hash"], password): error = "Login oder Passwort passen nicht zusammen." elif not user["is_active"]: error = "Dieser Zugang ist derzeit nicht aktiv." if error is None: session.clear() session.permanent = remember_me session["user_id"] = user["id"] ensure_csrf_token() return redirect(url_for("main.dashboard")) flash(error, "error") return render_template("auth/login.html") @auth_bp.route("/profile", methods=("GET", "POST")) @login_required def profile(): database = get_db() if request.method == "POST": username = normalize_login_value(request.form.get("username", "")) email = normalize_login_value(request.form.get("email", "")) or None display_name = request.form.get("display_name", "").strip() error = validate_identity_fields(database, username, email, current_user_id=g.user["id"]) if error is None: database.execute( """ UPDATE users SET username = ?, email = ?, display_name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (username, email, display_name, g.user["id"]), ) database.commit() flash("Dein Profil wurde aktualisiert.", "success") return redirect(url_for("auth.profile")) flash(error, "error") return render_template("auth/profile.html", role_labels=ROLE_LABELS) @auth_bp.post("/password") @login_required def change_password(): current_password = request.form.get("current_password", "") new_password = request.form.get("new_password", "") new_password_repeat = request.form.get("new_password_repeat", "") error = None if not check_password_hash(g.user["password_hash"], current_password): error = "Das aktuelle Passwort stimmt nicht." elif not new_password: error = "Bitte ein neues Passwort eintragen." elif new_password != new_password_repeat: error = "Die neuen Passwörter stimmen nicht überein." if error is None: get_db().execute( """ UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (generate_password_hash(new_password), g.user["id"]), ) get_db().commit() flash("Dein Passwort wurde geändert.", "success") else: flash(error, "error") return redirect(url_for("auth.profile")) @auth_bp.post("/logout") def logout(): session.clear() flash("Du bist abgemeldet.", "info") return redirect(url_for("auth.login")) def validate_admin_user_form( database, *, username: str, email: str | None, role: str, is_active: bool, password: str, password_repeat: str, current_user_id: int | None = None, ) -> str | None: error = validate_identity_fields(database, username, email, current_user_id=current_user_id) if error: return error if role not in ROLE_LABELS: return "Bitte eine gültige Rolle auswählen." if current_user_id is None and not password: return "Bitte ein Passwort vergeben." if password and password != password_repeat: return "Die Passwörter stimmen nicht überein." if current_user_id == g.user["id"] and not is_active: return "Du kannst deinen eigenen Zugang hier nicht deaktivieren." return None def can_remove_last_admin(target_user_id: int, new_role: str, is_active: bool) -> bool: if g.user is None: return False if target_user_id != g.user["id"] and g.user["role"] == "admin": target = get_db().execute("SELECT * FROM users WHERE id = ?", (target_user_id,)).fetchone() if target is None: return False if target["role"] != "admin" or not target["is_active"]: return False if new_role == "admin" and is_active: return False return active_admin_count(g.user["household_id"]) <= 1 return False