324 lines
10 KiB
Python
324 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import functools
|
|
import secrets
|
|
|
|
from flask import (
|
|
Blueprint,
|
|
flash,
|
|
g,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
session,
|
|
url_for,
|
|
)
|
|
from markupsafe import Markup
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from .constants import ROLE_LABELS
|
|
from .db import active_admin_count, ensure_default_household, get_db, user_count
|
|
|
|
|
|
auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
|
|
|
|
|
|
def login_required(view):
|
|
@functools.wraps(view)
|
|
def wrapped_view(**kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for("auth.login"))
|
|
return view(**kwargs)
|
|
|
|
return wrapped_view
|
|
|
|
|
|
def admin_required(view):
|
|
@functools.wraps(view)
|
|
def wrapped_view(**kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for("auth.login"))
|
|
if g.user["role"] != "admin":
|
|
flash("Dieser Bereich ist für Admins gedacht.", "error")
|
|
return redirect(url_for("main.dashboard"))
|
|
return view(**kwargs)
|
|
|
|
return wrapped_view
|
|
|
|
|
|
def ensure_csrf_token() -> str:
|
|
token = session.get("_csrf_token")
|
|
if not token:
|
|
token = session["_csrf_token"] = secrets.token_hex(24)
|
|
return token
|
|
|
|
|
|
def normalize_login_value(raw: str) -> str:
|
|
return raw.strip().lower()
|
|
|
|
|
|
def validate_password_strength(password: str) -> str | None:
|
|
if len(password or "") < 10:
|
|
return "Bitte ein etwas längeres Passwort wählen."
|
|
return None
|
|
|
|
|
|
def validate_identity_fields(database, username: str, email: str | None, current_user_id: int | None = None) -> str | None:
|
|
if not username:
|
|
return "Bitte einen Benutzernamen eintragen."
|
|
|
|
existing_user = database.execute(
|
|
"SELECT id FROM users WHERE LOWER(username) = LOWER(?)",
|
|
(username,),
|
|
).fetchone()
|
|
if existing_user and int(existing_user["id"]) != current_user_id:
|
|
return "Dieser Benutzername ist bereits vergeben."
|
|
|
|
if email:
|
|
existing_email = database.execute(
|
|
"SELECT id FROM users WHERE LOWER(email) = LOWER(?)",
|
|
(email,),
|
|
).fetchone()
|
|
if existing_email and int(existing_email["id"]) != current_user_id:
|
|
return "Diese E-Mail-Adresse wird bereits verwendet."
|
|
|
|
return None
|
|
|
|
|
|
@auth_bp.app_context_processor
|
|
def inject_csrf_input():
|
|
return {
|
|
"csrf_input": lambda: Markup(
|
|
f'<input type="hidden" name="csrf_token" value="{ensure_csrf_token()}">'
|
|
),
|
|
"csrf_token_value": ensure_csrf_token(),
|
|
}
|
|
|
|
|
|
@auth_bp.before_app_request
|
|
def load_logged_in_user():
|
|
user_id = session.get("user_id")
|
|
if user_id is None:
|
|
g.user = None
|
|
else:
|
|
g.user = get_db().execute(
|
|
"""
|
|
SELECT users.*,
|
|
households.name AS household_name
|
|
FROM users
|
|
LEFT JOIN households ON households.id = users.household_id
|
|
WHERE users.id = ?
|
|
""",
|
|
(user_id,),
|
|
).fetchone()
|
|
if g.user is not None and not g.user["is_active"]:
|
|
session.clear()
|
|
g.user = None
|
|
|
|
endpoint = request.endpoint or ""
|
|
if user_count() == 0 and endpoint not in {"auth.setup", "static", "uploaded_file"}:
|
|
return redirect(url_for("auth.setup"))
|
|
|
|
if request.method == "POST" and endpoint != "static":
|
|
token = session.get("_csrf_token")
|
|
form_token = request.form.get("csrf_token")
|
|
if not token or token != form_token:
|
|
flash("Die Sitzung muss kurz neu geladen werden. Bitte versuche es noch einmal.", "error")
|
|
return redirect(request.referrer or url_for("main.dashboard"))
|
|
|
|
|
|
@auth_bp.route("/setup", methods=("GET", "POST"))
|
|
def setup():
|
|
if user_count() > 0:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
if request.method == "POST":
|
|
household_name = request.form.get("household_name", "").strip() or "Unser Haushalt"
|
|
username = normalize_login_value(request.form.get("username", ""))
|
|
email = normalize_login_value(request.form.get("email", "")) or None
|
|
display_name = request.form.get("display_name", "").strip()
|
|
password = request.form.get("password", "")
|
|
password_repeat = request.form.get("password_repeat", "")
|
|
database = get_db()
|
|
|
|
error = validate_identity_fields(database, username, email)
|
|
if error is None and not password:
|
|
error = "Bitte ein Passwort vergeben."
|
|
elif error is None and password != password_repeat:
|
|
error = "Die Passwörter stimmen nicht überein."
|
|
elif error is None:
|
|
error = validate_password_strength(password)
|
|
|
|
if error is None:
|
|
database.execute(
|
|
"INSERT INTO households (name) VALUES (?)",
|
|
(household_name,),
|
|
)
|
|
household_id = ensure_default_household(database)
|
|
database.execute(
|
|
"""
|
|
INSERT INTO users (household_id, username, email, display_name, role, password_hash)
|
|
VALUES (?, ?, ?, ?, 'admin', ?)
|
|
""",
|
|
(household_id, username, email, display_name, generate_password_hash(password)),
|
|
)
|
|
database.commit()
|
|
flash("Der erste Haushalt-Zugang ist angelegt. Du kannst dich jetzt anmelden.", "success")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("auth/setup.html")
|
|
|
|
|
|
@auth_bp.route("/login", methods=("GET", "POST"))
|
|
def login():
|
|
if user_count() == 0:
|
|
return redirect(url_for("auth.setup"))
|
|
|
|
if request.method == "POST":
|
|
identity = normalize_login_value(request.form.get("username", ""))
|
|
password = request.form.get("password", "")
|
|
remember_me = request.form.get("remember_me") == "1"
|
|
database = get_db()
|
|
user = database.execute(
|
|
"""
|
|
SELECT users.*, households.name AS household_name
|
|
FROM users
|
|
LEFT JOIN households ON households.id = users.household_id
|
|
WHERE LOWER(users.username) = LOWER(?) OR LOWER(COALESCE(users.email, '')) = LOWER(?)
|
|
""",
|
|
(identity, identity),
|
|
).fetchone()
|
|
|
|
error = None
|
|
if user is None or not check_password_hash(user["password_hash"], password):
|
|
error = "Login oder Passwort passen nicht zusammen."
|
|
elif not user["is_active"]:
|
|
error = "Dieser Zugang ist derzeit nicht aktiv."
|
|
|
|
if error is None:
|
|
session.clear()
|
|
session.permanent = remember_me
|
|
session["user_id"] = user["id"]
|
|
ensure_csrf_token()
|
|
return redirect(url_for("main.dashboard"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("auth/login.html")
|
|
|
|
|
|
@auth_bp.route("/profile", methods=("GET", "POST"))
|
|
@login_required
|
|
def profile():
|
|
database = get_db()
|
|
|
|
if request.method == "POST":
|
|
username = normalize_login_value(request.form.get("username", ""))
|
|
email = normalize_login_value(request.form.get("email", "")) or None
|
|
display_name = request.form.get("display_name", "").strip()
|
|
|
|
error = validate_identity_fields(database, username, email, current_user_id=g.user["id"])
|
|
if error is None:
|
|
database.execute(
|
|
"""
|
|
UPDATE users
|
|
SET username = ?, email = ?, display_name = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(username, email, display_name, g.user["id"]),
|
|
)
|
|
database.commit()
|
|
flash("Dein Profil wurde aktualisiert.", "success")
|
|
return redirect(url_for("auth.profile"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("auth/profile.html", role_labels=ROLE_LABELS)
|
|
|
|
|
|
@auth_bp.post("/password")
|
|
@login_required
|
|
def change_password():
|
|
current_password = request.form.get("current_password", "")
|
|
new_password = request.form.get("new_password", "")
|
|
new_password_repeat = request.form.get("new_password_repeat", "")
|
|
|
|
error = None
|
|
if not check_password_hash(g.user["password_hash"], current_password):
|
|
error = "Das aktuelle Passwort stimmt nicht."
|
|
elif not new_password:
|
|
error = "Bitte ein neues Passwort eintragen."
|
|
elif new_password != new_password_repeat:
|
|
error = "Die neuen Passwörter stimmen nicht überein."
|
|
else:
|
|
error = validate_password_strength(new_password)
|
|
|
|
if error is None:
|
|
get_db().execute(
|
|
"""
|
|
UPDATE users
|
|
SET password_hash = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(generate_password_hash(new_password), g.user["id"]),
|
|
)
|
|
get_db().commit()
|
|
flash("Dein Passwort wurde geändert.", "success")
|
|
else:
|
|
flash(error, "error")
|
|
|
|
return redirect(url_for("auth.profile"))
|
|
|
|
|
|
@auth_bp.post("/logout")
|
|
def logout():
|
|
session.clear()
|
|
flash("Du bist abgemeldet.", "info")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
|
|
def validate_admin_user_form(
|
|
database,
|
|
*,
|
|
username: str,
|
|
email: str | None,
|
|
role: str,
|
|
is_active: bool,
|
|
password: str,
|
|
password_repeat: str,
|
|
current_user_id: int | None = None,
|
|
) -> str | None:
|
|
error = validate_identity_fields(database, username, email, current_user_id=current_user_id)
|
|
if error:
|
|
return error
|
|
if role not in ROLE_LABELS:
|
|
return "Bitte eine gültige Rolle auswählen."
|
|
if current_user_id is None and not password:
|
|
return "Bitte ein Passwort vergeben."
|
|
if password and password != password_repeat:
|
|
return "Die Passwörter stimmen nicht überein."
|
|
if password:
|
|
password_error = validate_password_strength(password)
|
|
if password_error:
|
|
return password_error
|
|
if current_user_id == g.user["id"] and not is_active:
|
|
return "Du kannst deinen eigenen Zugang hier nicht deaktivieren."
|
|
return None
|
|
|
|
|
|
def can_remove_last_admin(target_user_id: int, new_role: str, is_active: bool) -> bool:
|
|
if g.user is None:
|
|
return False
|
|
if target_user_id != g.user["id"] and g.user["role"] == "admin":
|
|
target = get_db().execute("SELECT * FROM users WHERE id = ?", (target_user_id,)).fetchone()
|
|
if target is None:
|
|
return False
|
|
if target["role"] != "admin" or not target["is_active"]:
|
|
return False
|
|
if new_role == "admin" and is_active:
|
|
return False
|
|
return active_admin_count(g.user["household_id"]) <= 1
|
|
return False
|