Files
nouri-App/nouri/auth.py
T

310 lines
10 KiB
Python

from __future__ import annotations
import functools
import secrets
from flask import (
Blueprint,
flash,
g,
redirect,
render_template,
request,
session,
url_for,
)
from markupsafe import Markup
from werkzeug.security import check_password_hash, generate_password_hash
from .constants import ROLE_LABELS
from .db import active_admin_count, ensure_default_household, get_db, user_count
auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for("auth.login"))
return view(**kwargs)
return wrapped_view
def admin_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for("auth.login"))
if g.user["role"] != "admin":
flash("Dieser Bereich ist für Admins gedacht.", "error")
return redirect(url_for("main.dashboard"))
return view(**kwargs)
return wrapped_view
def ensure_csrf_token() -> str:
token = session.get("_csrf_token")
if not token:
token = session["_csrf_token"] = secrets.token_hex(24)
return token
def normalize_login_value(raw: str) -> str:
return raw.strip().lower()
def validate_identity_fields(database, username: str, email: str | None, current_user_id: int | None = None) -> str | None:
if not username:
return "Bitte einen Benutzernamen eintragen."
existing_user = database.execute(
"SELECT id FROM users WHERE LOWER(username) = LOWER(?)",
(username,),
).fetchone()
if existing_user and int(existing_user["id"]) != current_user_id:
return "Dieser Benutzername ist bereits vergeben."
if email:
existing_email = database.execute(
"SELECT id FROM users WHERE LOWER(email) = LOWER(?)",
(email,),
).fetchone()
if existing_email and int(existing_email["id"]) != current_user_id:
return "Diese E-Mail-Adresse wird bereits verwendet."
return None
@auth_bp.app_context_processor
def inject_csrf_input():
return {
"csrf_input": lambda: Markup(
f'<input type="hidden" name="csrf_token" value="{ensure_csrf_token()}">'
),
"csrf_token_value": ensure_csrf_token(),
}
@auth_bp.before_app_request
def load_logged_in_user():
user_id = session.get("user_id")
if user_id is None:
g.user = None
else:
g.user = get_db().execute(
"""
SELECT users.*,
households.name AS household_name
FROM users
LEFT JOIN households ON households.id = users.household_id
WHERE users.id = ?
""",
(user_id,),
).fetchone()
if g.user is not None and not g.user["is_active"]:
session.clear()
g.user = None
endpoint = request.endpoint or ""
if user_count() == 0 and endpoint not in {"auth.setup", "static", "uploaded_file"}:
return redirect(url_for("auth.setup"))
if request.method == "POST" and endpoint != "static":
token = session.get("_csrf_token")
form_token = request.form.get("csrf_token")
if not token or token != form_token:
flash("Die Sitzung muss kurz neu geladen werden. Bitte versuche es noch einmal.", "error")
return redirect(request.referrer or url_for("main.dashboard"))
@auth_bp.route("/setup", methods=("GET", "POST"))
def setup():
if user_count() > 0:
return redirect(url_for("auth.login"))
if request.method == "POST":
household_name = request.form.get("household_name", "").strip() or "Unser Haushalt"
username = normalize_login_value(request.form.get("username", ""))
email = normalize_login_value(request.form.get("email", "")) or None
display_name = request.form.get("display_name", "").strip()
password = request.form.get("password", "")
password_repeat = request.form.get("password_repeat", "")
database = get_db()
error = validate_identity_fields(database, username, email)
if error is None and not password:
error = "Bitte ein Passwort vergeben."
elif error is None and password != password_repeat:
error = "Die Passwörter stimmen nicht überein."
if error is None:
database.execute(
"INSERT INTO households (name) VALUES (?)",
(household_name,),
)
household_id = ensure_default_household(database)
database.execute(
"""
INSERT INTO users (household_id, username, email, display_name, role, password_hash)
VALUES (?, ?, ?, ?, 'admin', ?)
""",
(household_id, username, email, display_name, generate_password_hash(password)),
)
database.commit()
flash("Der erste Haushalt-Zugang ist angelegt. Du kannst dich jetzt anmelden.", "success")
return redirect(url_for("auth.login"))
flash(error, "error")
return render_template("auth/setup.html")
@auth_bp.route("/login", methods=("GET", "POST"))
def login():
if user_count() == 0:
return redirect(url_for("auth.setup"))
if request.method == "POST":
identity = normalize_login_value(request.form.get("username", ""))
password = request.form.get("password", "")
remember_me = request.form.get("remember_me") == "1"
database = get_db()
user = database.execute(
"""
SELECT users.*, households.name AS household_name
FROM users
LEFT JOIN households ON households.id = users.household_id
WHERE LOWER(users.username) = LOWER(?) OR LOWER(COALESCE(users.email, '')) = LOWER(?)
""",
(identity, identity),
).fetchone()
error = None
if user is None or not check_password_hash(user["password_hash"], password):
error = "Login oder Passwort passen nicht zusammen."
elif not user["is_active"]:
error = "Dieser Zugang ist derzeit nicht aktiv."
if error is None:
session.clear()
session.permanent = remember_me
session["user_id"] = user["id"]
ensure_csrf_token()
return redirect(url_for("main.dashboard"))
flash(error, "error")
return render_template("auth/login.html")
@auth_bp.route("/profile", methods=("GET", "POST"))
@login_required
def profile():
database = get_db()
if request.method == "POST":
username = normalize_login_value(request.form.get("username", ""))
email = normalize_login_value(request.form.get("email", "")) or None
display_name = request.form.get("display_name", "").strip()
error = validate_identity_fields(database, username, email, current_user_id=g.user["id"])
if error is None:
database.execute(
"""
UPDATE users
SET username = ?, email = ?, display_name = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(username, email, display_name, g.user["id"]),
)
database.commit()
flash("Dein Profil wurde aktualisiert.", "success")
return redirect(url_for("auth.profile"))
flash(error, "error")
return render_template("auth/profile.html", role_labels=ROLE_LABELS)
@auth_bp.post("/password")
@login_required
def change_password():
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
new_password_repeat = request.form.get("new_password_repeat", "")
error = None
if not check_password_hash(g.user["password_hash"], current_password):
error = "Das aktuelle Passwort stimmt nicht."
elif not new_password:
error = "Bitte ein neues Passwort eintragen."
elif new_password != new_password_repeat:
error = "Die neuen Passwörter stimmen nicht überein."
if error is None:
get_db().execute(
"""
UPDATE users
SET password_hash = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(generate_password_hash(new_password), g.user["id"]),
)
get_db().commit()
flash("Dein Passwort wurde geändert.", "success")
else:
flash(error, "error")
return redirect(url_for("auth.profile"))
@auth_bp.post("/logout")
def logout():
session.clear()
flash("Du bist abgemeldet.", "info")
return redirect(url_for("auth.login"))
def validate_admin_user_form(
database,
*,
username: str,
email: str | None,
role: str,
is_active: bool,
password: str,
password_repeat: str,
current_user_id: int | None = None,
) -> str | None:
error = validate_identity_fields(database, username, email, current_user_id=current_user_id)
if error:
return error
if role not in ROLE_LABELS:
return "Bitte eine gültige Rolle auswählen."
if current_user_id is None and not password:
return "Bitte ein Passwort vergeben."
if password and password != password_repeat:
return "Die Passwörter stimmen nicht überein."
if current_user_id == g.user["id"] and not is_active:
return "Du kannst deinen eigenen Zugang hier nicht deaktivieren."
return None
def can_remove_last_admin(target_user_id: int, new_role: str, is_active: bool) -> bool:
if g.user is None:
return False
if target_user_id != g.user["id"] and g.user["role"] == "admin":
target = get_db().execute("SELECT * FROM users WHERE id = ?", (target_user_id,)).fetchone()
if target is None:
return False
if target["role"] != "admin" or not target["is_active"]:
return False
if new_role == "admin" and is_active:
return False
return active_admin_count(g.user["household_id"]) <= 1
return False