181 lines
6.0 KiB
Python
181 lines
6.0 KiB
Python
from __future__ import annotations
|
|
|
|
from flask import Blueprint, flash, g, redirect, render_template, request, url_for
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
from .auth import admin_required, can_remove_last_admin, validate_admin_user_form
|
|
from .constants import ROLE_LABELS
|
|
from .db import get_db
|
|
|
|
|
|
admin_bp = Blueprint("admin", __name__, url_prefix="/admin")
|
|
|
|
|
|
def get_household_user(user_id: int):
|
|
user = get_db().execute(
|
|
"""
|
|
SELECT users.*, households.name AS household_name
|
|
FROM users
|
|
LEFT JOIN households ON households.id = users.household_id
|
|
WHERE users.id = ? AND users.household_id = ?
|
|
""",
|
|
(user_id, g.user["household_id"]),
|
|
).fetchone()
|
|
if user is None:
|
|
raise ValueError("Der Nutzer wurde nicht gefunden.")
|
|
return user
|
|
|
|
|
|
@admin_bp.get("/users")
|
|
@admin_required
|
|
def user_list():
|
|
users = get_db().execute(
|
|
"""
|
|
SELECT *
|
|
FROM users
|
|
WHERE household_id = ?
|
|
ORDER BY is_active DESC, LOWER(COALESCE(display_name, username))
|
|
""",
|
|
(g.user["household_id"],),
|
|
).fetchall()
|
|
return render_template("admin/users_list.html", users=users, role_labels=ROLE_LABELS)
|
|
|
|
|
|
@admin_bp.route("/users/new", methods=("GET", "POST"))
|
|
@admin_required
|
|
def user_create():
|
|
form_data = {
|
|
"display_name": "",
|
|
"username": "",
|
|
"email": "",
|
|
"role": "member",
|
|
"is_active": True,
|
|
}
|
|
|
|
if request.method == "POST":
|
|
database = get_db()
|
|
form_data = {
|
|
"display_name": request.form.get("display_name", "").strip(),
|
|
"username": request.form.get("username", "").strip().lower(),
|
|
"email": request.form.get("email", "").strip().lower(),
|
|
"role": request.form.get("role", "member").strip(),
|
|
"is_active": request.form.get("is_active", "1") == "1",
|
|
}
|
|
password = request.form.get("password", "")
|
|
password_repeat = request.form.get("password_repeat", "")
|
|
|
|
error = validate_admin_user_form(
|
|
database,
|
|
username=form_data["username"],
|
|
email=form_data["email"] or None,
|
|
role=form_data["role"],
|
|
is_active=form_data["is_active"],
|
|
password=password,
|
|
password_repeat=password_repeat,
|
|
)
|
|
|
|
if error is None:
|
|
database.execute(
|
|
"""
|
|
INSERT INTO users (household_id, username, email, display_name, role, is_active, password_hash)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
g.user["household_id"],
|
|
form_data["username"],
|
|
form_data["email"] or None,
|
|
form_data["display_name"],
|
|
form_data["role"],
|
|
1 if form_data["is_active"] else 0,
|
|
generate_password_hash(password),
|
|
),
|
|
)
|
|
database.commit()
|
|
flash("Der Nutzer wurde angelegt.", "success")
|
|
return redirect(url_for("admin.user_list"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("admin/user_form.html", user=None, form_data=form_data, role_labels=ROLE_LABELS)
|
|
|
|
|
|
@admin_bp.route("/users/<int:user_id>/edit", methods=("GET", "POST"))
|
|
@admin_required
|
|
def user_edit(user_id: int):
|
|
try:
|
|
user = get_household_user(user_id)
|
|
except ValueError as exc:
|
|
flash(str(exc), "error")
|
|
return redirect(url_for("admin.user_list"))
|
|
|
|
form_data = {
|
|
"display_name": user["display_name"] or "",
|
|
"username": user["username"],
|
|
"email": user["email"] or "",
|
|
"role": user["role"],
|
|
"is_active": bool(user["is_active"]),
|
|
}
|
|
|
|
if request.method == "POST":
|
|
database = get_db()
|
|
form_data = {
|
|
"display_name": request.form.get("display_name", "").strip(),
|
|
"username": request.form.get("username", "").strip().lower(),
|
|
"email": request.form.get("email", "").strip().lower(),
|
|
"role": request.form.get("role", "member").strip(),
|
|
"is_active": request.form.get("is_active", "0") == "1",
|
|
}
|
|
password = request.form.get("password", "")
|
|
password_repeat = request.form.get("password_repeat", "")
|
|
|
|
error = validate_admin_user_form(
|
|
database,
|
|
username=form_data["username"],
|
|
email=form_data["email"] or None,
|
|
role=form_data["role"],
|
|
is_active=form_data["is_active"],
|
|
password=password,
|
|
password_repeat=password_repeat,
|
|
current_user_id=user_id,
|
|
)
|
|
if error is None and can_remove_last_admin(user_id, form_data["role"], form_data["is_active"]):
|
|
error = "Mindestens ein aktiver Admin sollte im Haushalt bleiben."
|
|
|
|
if error is None:
|
|
database.execute(
|
|
"""
|
|
UPDATE users
|
|
SET username = ?,
|
|
email = ?,
|
|
display_name = ?,
|
|
role = ?,
|
|
is_active = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
form_data["username"],
|
|
form_data["email"] or None,
|
|
form_data["display_name"],
|
|
form_data["role"],
|
|
1 if form_data["is_active"] else 0,
|
|
user_id,
|
|
),
|
|
)
|
|
if password:
|
|
database.execute(
|
|
"""
|
|
UPDATE users
|
|
SET password_hash = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(generate_password_hash(password), user_id),
|
|
)
|
|
database.commit()
|
|
flash("Der Nutzer wurde aktualisiert.", "success")
|
|
return redirect(url_for("admin.user_list"))
|
|
|
|
flash(error, "error")
|
|
|
|
return render_template("admin/user_form.html", user=user, form_data=form_data, role_labels=ROLE_LABELS)
|