173 lines
6.0 KiB
Python
173 lines
6.0 KiB
Python
from __future__ import annotations
|
|
|
|
from itertools import combinations
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.extensions import db
|
|
from app.models import CostParticipant, User
|
|
|
|
|
|
GERMAN_MONTH_NAMES = {
|
|
1: "Januar",
|
|
2: "Februar",
|
|
3: "Maerz",
|
|
4: "April",
|
|
5: "Mai",
|
|
6: "Juni",
|
|
7: "Juli",
|
|
8: "August",
|
|
9: "September",
|
|
10: "Oktober",
|
|
11: "November",
|
|
12: "Dezember",
|
|
}
|
|
|
|
|
|
def active_users() -> list[User]:
|
|
users = db.session.scalars(
|
|
select(User).where(User.is_active.is_(True)).order_by(User.role.asc(), User.display_name.asc())
|
|
).all()
|
|
return list(users)
|
|
|
|
|
|
def personal_users() -> list[User]:
|
|
users = active_users()
|
|
non_admin = [user for user in users if not user.is_admin()]
|
|
admin_users = [user for user in users if user.is_admin()]
|
|
return (non_admin + admin_users)[:2]
|
|
|
|
|
|
def personal_account_names() -> dict[str, str]:
|
|
users = personal_users()
|
|
names = {
|
|
"persoenlich-flo": users[0].ui_name if len(users) > 0 else "Persoenlich 1",
|
|
"persoenlich-desi": users[1].ui_name if len(users) > 1 else "Persoenlich 2",
|
|
}
|
|
return names
|
|
|
|
|
|
def format_planning_month(label: str) -> str:
|
|
year, month = [int(part) for part in label.split("-", 1)]
|
|
return f"{GERMAN_MONTH_NAMES.get(month, label)} {year}"
|
|
|
|
|
|
def sync_user_participants() -> bool:
|
|
changed = False
|
|
users = db.session.scalars(select(User).order_by(User.id.asc())).all()
|
|
existing_by_user_id = {
|
|
participant.linked_user_id: participant
|
|
for participant in db.session.scalars(
|
|
select(CostParticipant).where(CostParticipant.is_app_user.is_(True))
|
|
).all()
|
|
if participant.linked_user_id is not None
|
|
}
|
|
|
|
for user in users:
|
|
participant = existing_by_user_id.get(user.id)
|
|
if participant is None:
|
|
participant = CostParticipant(
|
|
name=user.ui_name,
|
|
avatar_url=user.avatar_url,
|
|
is_app_user=True,
|
|
linked_user_id=user.id,
|
|
is_external=False,
|
|
is_active=user.is_active,
|
|
)
|
|
db.session.add(participant)
|
|
changed = True
|
|
continue
|
|
if participant.name != user.ui_name:
|
|
participant.name = user.ui_name
|
|
changed = True
|
|
if participant.avatar_url != user.avatar_url:
|
|
participant.avatar_url = user.avatar_url
|
|
changed = True
|
|
if participant.is_external:
|
|
participant.is_external = False
|
|
changed = True
|
|
if participant.is_app_user is not True:
|
|
participant.is_app_user = True
|
|
changed = True
|
|
if participant.is_active != user.is_active:
|
|
participant.is_active = user.is_active
|
|
changed = True
|
|
return changed
|
|
|
|
|
|
def encode_benefit_scope(selected_user_ids: list[int] | set[int], available_users: list[User]) -> str:
|
|
available_ids = [user.id for user in available_users]
|
|
selected_ids = [user_id for user_id in available_ids if user_id in set(selected_user_ids)]
|
|
if not selected_ids or selected_ids == available_ids:
|
|
return "all-users"
|
|
return "users:" + ",".join(str(user_id) for user_id in selected_ids)
|
|
|
|
|
|
def decode_benefit_scope(scope: str | None, available_users: list[User]) -> list[int]:
|
|
available_ids = [user.id for user in available_users]
|
|
if not available_ids:
|
|
return []
|
|
if scope in {None, "", "both", "all-users"}:
|
|
return available_ids
|
|
if scope in {"flo", "desi"}:
|
|
mapping = personal_account_names()
|
|
label = mapping["persoenlich-flo"] if scope == "flo" else mapping["persoenlich-desi"]
|
|
return [user.id for user in available_users if user.ui_name == label][:1] or available_ids
|
|
if scope.startswith("users:"):
|
|
parsed_ids = []
|
|
for raw_id in scope.removeprefix("users:").split(","):
|
|
raw_id = raw_id.strip()
|
|
if raw_id.isdigit():
|
|
parsed_ids.append(int(raw_id))
|
|
normalized = [user_id for user_id in available_ids if user_id in parsed_ids]
|
|
return normalized or available_ids
|
|
return available_ids
|
|
|
|
|
|
def benefit_scope_label(scope: str | None, available_users: list[User]) -> str:
|
|
selected_ids = decode_benefit_scope(scope, available_users)
|
|
users_by_id = {user.id: user for user in available_users}
|
|
names = [users_by_id[user_id].ui_name for user_id in selected_ids if user_id in users_by_id]
|
|
if not names:
|
|
return "Alle Nutzer"
|
|
if len(names) == len(available_users):
|
|
return "Alle Nutzer"
|
|
if len(names) == 1:
|
|
return names[0]
|
|
if len(names) == 2:
|
|
return " & ".join(names)
|
|
return ", ".join(names[:-1]) + f" & {names[-1]}"
|
|
|
|
|
|
def benefit_scope_options(available_users: list[User]) -> list[dict[str, str]]:
|
|
if not available_users:
|
|
return [{"value": "all-users", "label": "Alle Nutzer"}]
|
|
|
|
options: list[dict[str, str]] = []
|
|
available_ids = [user.id for user in available_users]
|
|
for size in range(len(available_users), 0, -1):
|
|
for user_combo in combinations(available_users, size):
|
|
user_ids = [user.id for user in user_combo]
|
|
value = encode_benefit_scope(user_ids, available_users)
|
|
if size == len(available_users):
|
|
label = "Alle Nutzer"
|
|
elif size == 1:
|
|
label = user_combo[0].ui_name
|
|
elif size == 2:
|
|
label = " & ".join(user.ui_name for user in user_combo)
|
|
else:
|
|
label = ", ".join(user.ui_name for user in user_combo[:-1]) + f" & {user_combo[-1].ui_name}"
|
|
options.append({"value": value, "label": label})
|
|
|
|
seen_values = set()
|
|
deduplicated = []
|
|
for option in options:
|
|
if option["value"] in seen_values:
|
|
continue
|
|
seen_values.add(option["value"])
|
|
deduplicated.append(option)
|
|
|
|
if "all-users" not in seen_values:
|
|
deduplicated.insert(0, {"value": encode_benefit_scope(available_ids, available_users), "label": "Alle Nutzer"})
|
|
return deduplicated
|