149 lines
4.3 KiB
Python
149 lines
4.3 KiB
Python
from __future__ import annotations
|
|
|
|
import functools
|
|
import secrets
|
|
|
|
from flask import (
|
|
Blueprint,
|
|
flash,
|
|
g,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
session,
|
|
url_for,
|
|
)
|
|
from markupsafe import Markup
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from .db import get_db, user_count
|
|
|
|
|
|
auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
|
|
|
|
|
|
def login_required(view):
|
|
@functools.wraps(view)
|
|
def wrapped_view(**kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for("auth.login"))
|
|
return view(**kwargs)
|
|
|
|
return wrapped_view
|
|
|
|
|
|
def ensure_csrf_token() -> str:
|
|
token = session.get("_csrf_token")
|
|
if not token:
|
|
token = session["_csrf_token"] = secrets.token_hex(24)
|
|
return token
|
|
|
|
|
|
@auth_bp.app_context_processor
|
|
def inject_csrf_input():
|
|
return {
|
|
"csrf_input": lambda: Markup(
|
|
f'<input type="hidden" name="csrf_token" value="{ensure_csrf_token()}">'
|
|
),
|
|
"csrf_token_value": ensure_csrf_token(),
|
|
}
|
|
|
|
|
|
@auth_bp.before_app_request
|
|
def load_logged_in_user():
|
|
user_id = session.get("user_id")
|
|
if user_id is None:
|
|
g.user = None
|
|
else:
|
|
g.user = get_db().execute(
|
|
"SELECT * FROM users WHERE id = ?",
|
|
(user_id,),
|
|
).fetchone()
|
|
|
|
endpoint = request.endpoint or ""
|
|
if user_count() == 0 and endpoint not in {"auth.setup", "static", "uploaded_file"}:
|
|
return redirect(url_for("auth.setup"))
|
|
|
|
if request.method == "POST" and endpoint != "static":
|
|
token = session.get("_csrf_token")
|
|
form_token = request.form.get("csrf_token")
|
|
if not token or token != form_token:
|
|
flash("Die Sitzung muss kurz neu geladen werden. Bitte versuche es noch einmal.", "error")
|
|
return redirect(request.referrer or url_for("main.dashboard"))
|
|
|
|
|
|
@auth_bp.route("/setup", methods=("GET", "POST"))
|
|
def setup():
|
|
if user_count() > 0:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "").strip().lower()
|
|
display_name = request.form.get("display_name", "").strip()
|
|
password = request.form.get("password", "")
|
|
password_repeat = request.form.get("password_repeat", "")
|
|
|
|
error = None
|
|
if not username:
|
|
error = "Bitte einen Benutzernamen eintragen."
|
|
elif not password:
|
|
error = "Bitte ein Passwort vergeben."
|
|
elif password != password_repeat:
|
|
error = "Die Passwörter stimmen nicht überein."
|
|
|
|
if error is None:
|
|
database = get_db()
|
|
database.execute(
|
|
"""
|
|
INSERT INTO users (username, display_name, password_hash)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(username, display_name, generate_password_hash(password)),
|
|
)
|
|
database.commit()
|
|
flash("Der erste Haushalt-Zugang ist angelegt. Du kannst dich jetzt anmelden.", "success")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("auth/setup.html")
|
|
|
|
|
|
@auth_bp.route("/login", methods=("GET", "POST"))
|
|
def login():
|
|
if user_count() == 0:
|
|
return redirect(url_for("auth.setup"))
|
|
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "").strip().lower()
|
|
password = request.form.get("password", "")
|
|
remember_me = request.form.get("remember_me") == "1"
|
|
database = get_db()
|
|
user = database.execute(
|
|
"SELECT * FROM users WHERE username = ?",
|
|
(username,),
|
|
).fetchone()
|
|
|
|
error = None
|
|
if user is None or not check_password_hash(user["password_hash"], password):
|
|
error = "Benutzername oder Passwort passen nicht zusammen."
|
|
|
|
if error is None:
|
|
session.clear()
|
|
# Opt-in long-lived session so the shared household device stays low-friction.
|
|
session.permanent = remember_me
|
|
session["user_id"] = user["id"]
|
|
ensure_csrf_token()
|
|
return redirect(url_for("main.dashboard"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("auth/login.html")
|
|
|
|
|
|
@auth_bp.post("/logout")
|
|
def logout():
|
|
session.clear()
|
|
flash("Du bist abgemeldet.", "info")
|
|
return redirect(url_for("auth.login"))
|