from __future__ import annotations from decimal import Decimal from flask import Blueprint, current_app, g, render_template, send_from_directory from flask_login import login_required from app.models import ( Account, Category, CommunityAccount, CostParticipant, InAppNotification, Month, to_decimal, ) from app.utils.users import ( active_users, benefit_scope_label, personal_account_names, personal_users, sync_user_participants, ) main_bp = Blueprint("main", __name__) def _community_account_cards(month, previous_month): community_accounts = CommunityAccount.query.filter_by(is_active=True).order_by( CommunityAccount.sort_order.asc(), CommunityAccount.name.asc() ).all() current_entry_values = {item.entry_id: to_decimal(item.planned_amount) for item in month.entry_values} previous_entry_values = ( {item.entry_id: to_decimal(item.planned_amount) for item in previous_month.entry_values} if previous_month is not None else {} ) current_allocations = { item.target_account.slug: to_decimal(item.amount) for item in month.allocations if item.target_account } previous_allocations = ( { item.target_account.slug: to_decimal(item.amount) for item in previous_month.allocations if item.target_account } if previous_month is not None else {} ) budget_categories = Category.query.join(Account).filter( Category.is_active.is_(True), Account.slug == "gemeinschaftskonto", ).all() cards = [] for community_account in community_accounts: if community_account.account_type == "personal" and community_account.linked_account_slug: current_total = current_allocations.get(community_account.linked_account_slug, Decimal("0.00")) previous_total = previous_allocations.get(community_account.linked_account_slug, Decimal("0.00")) assigned_budget_names = ["Persönliche Auszahlung"] else: assigned_categories = [ category for category in budget_categories if category.community_account_id == community_account.id ] current_total = sum( (current_entry_values.get(entry.id, Decimal("0.00")) for category in assigned_categories for entry in category.entries if entry.is_active), Decimal("0.00"), ) previous_total = sum( (previous_entry_values.get(entry.id, Decimal("0.00")) for category in assigned_categories for entry in category.entries if entry.is_active), Decimal("0.00"), ) assigned_budget_names = [category.name for category in assigned_categories] delta = current_total - previous_total cards.append( { "community_account": community_account, "current_total": current_total, "previous_total": previous_total, "delta": delta, "assigned_budget_names": assigned_budget_names, "needs_update": delta != Decimal("0.00"), "is_read_only": community_account.account_type == "personal", } ) return cards @main_bp.route("/") @login_required def index(): if sync_user_participants(): current_app.logger.info("App-Nutzer wurden mit Split-Personen synchronisiert.") from app.extensions import db db.session.commit() month = g.current_month summary = current_app.extensions["saldo.month_service"].compute_summary(month) previous_month = current_app.extensions["saldo.month_service"].previous_month(month.year, month.month) recent_months = Month.query.order_by(Month.year.desc(), Month.month.desc()).limit(6).all() notifications = ( InAppNotification.query.filter_by(is_read=False) .order_by(InAppNotification.created_at.desc()) .limit(5) .all() ) community_account_cards = _community_account_cards(month, previous_month) shared_account_changes = [ item for item in community_account_cards if not item["is_read_only"] and item["needs_update"] ] personal_allocations = { item.target_account.slug: to_decimal(item.amount) for item in month.allocations if item.target_account and item.target_account.slug in {"persoenlich-flo", "persoenlich-desi"} } internal_participants = { participant.linked_user_id: participant for participant in CostParticipant.query.filter( CostParticipant.is_active.is_(True), CostParticipant.is_app_user.is_(True), ).all() if participant.linked_user_id is not None } personal_label_map = personal_account_names() personal_user_list = personal_users() personal_user_map = { "persoenlich-flo": personal_user_list[0] if len(personal_user_list) > 0 else None, "persoenlich-desi": personal_user_list[1] if len(personal_user_list) > 1 else None, } def _personal_payload(slug: str) -> dict: user = personal_user_map.get(slug) participant = internal_participants.get(user.id) if user is not None else None return { "amount": personal_allocations.get(slug, Decimal("0.00")), "name": personal_label_map.get(slug, slug), "avatar_url": getattr(participant, "avatar_url", None) or getattr(user, "avatar_url", None), "avatar_initials": getattr(participant, "avatar_initials", None) or getattr(user, "avatar_initials", "??"), } return render_template( "main/index.html", month=month, summary=summary, recent_months=recent_months, notifications=notifications, community_account_cards=community_account_cards, shared_account_changes=shared_account_changes, personal_payouts={ "first": _personal_payload("persoenlich-flo"), "second": _personal_payload("persoenlich-desi"), }, ) @main_bp.route("/analytics") @login_required def analytics(): if sync_user_participants(): from app.extensions import db db.session.commit() month = g.current_month summary = current_app.extensions["saldo.month_service"].compute_summary(month) available_users = active_users() category_totals: dict[str, dict] = {} account_totals: dict[str, Decimal] = {} benefit_totals: dict[str, Decimal] = {} entry_rows = [] personal_label_map = personal_account_names() for value in month.entry_values: entry = value.entry category = entry.category if entry else None account = category.account if category else None if ( entry is None or category is None or account is None or not entry.is_active or not category.is_active or not account.is_active ): continue amount = to_decimal(value.planned_amount) is_personal_payout = account.slug in {"persoenlich-flo", "persoenlich-desi"} is_savings_target = account.slug in {"sparen", "urlaub", "freizeit"} category_key = ( "personal-payout" if is_personal_payout else "savings-targets" if is_savings_target else str(category.id) ) category_label = ( "Persönliche Auszahlung" if is_personal_payout else "Sparkonten" if is_savings_target else category.name ) category_account = ( "Persönliche Auszahlung" if is_personal_payout else "Sparen & Verteilung" if is_savings_target else account.name ) detail_entry_label = ( personal_label_map.get(account.slug, account.name) if account.slug in {"persoenlich-flo", "persoenlich-desi"} else category.name if is_savings_target else entry.name ) category_bucket = category_totals.setdefault( category_key, { "id": category_key, "label": category_label, "account": category_account, "value": Decimal("0.00"), "entries": {}, }, ) category_bucket["value"] += amount category_bucket["entries"][detail_entry_label] = ( category_bucket["entries"].get(detail_entry_label, Decimal("0.00")) + amount ) account_label = ( "Persönliche Auszahlung" if is_personal_payout else account.name ) account_totals[account_label] = account_totals.get(account_label, Decimal("0.00")) + amount benefit_label = benefit_scope_label(entry.benefit_scope, available_users) benefit_totals[benefit_label] = benefit_totals.get(benefit_label, Decimal("0.00")) + amount entry_rows.append( { "label": f"{category.name} · {entry.name}", "value": amount, } ) sorted_categories = sorted( category_totals.values(), key=lambda item: (-item["value"], item["account"], item["label"]), ) category_entry_map = { str(item["id"]): { "label": item["label"], "account": item["account"], "labels": [label for label, _ in sorted(item["entries"].items(), key=lambda entry: (-entry[1], entry[0]))], "values": [float(value) for _, value in sorted(item["entries"].items(), key=lambda entry: (-entry[1], entry[0]))], } for item in sorted_categories } default_category_id = str(sorted_categories[0]["id"]) if sorted_categories else "" sorted_accounts = sorted(account_totals.items(), key=lambda item: (-item[1], item[0])) top_entries = sorted(entry_rows, key=lambda item: (-item["value"], item["label"]))[:10] historical_months = Month.query.order_by(Month.year.asc(), Month.month.asc()).all() budget_categories = ( Category.query.join(Account) .filter( Category.is_active.is_(True), Account.slug == "gemeinschaftskonto", ) .order_by(Category.sort_order.asc(), Category.name.asc()) .all() ) budget_timeline_rows = { category.id: { "label": category.name, "data": [], } for category in budget_categories } for historical_month in historical_months: month_totals = {category.id: Decimal("0.00") for category in budget_categories} for value in historical_month.entry_values: entry = value.entry category = entry.category if entry else None account = category.account if category else None if ( entry is None or category is None or account is None or category.id not in month_totals or not entry.is_active or not category.is_active or not account.is_active ): continue month_totals[category.id] += to_decimal(value.planned_amount) for category in budget_categories: budget_timeline_rows[category.id]["data"].append(float(month_totals[category.id])) budget_timeline_datasets = [ dataset for dataset in budget_timeline_rows.values() if any(value != 0 for value in dataset["data"]) ] return render_template( "main/analytics.html", month=month, summary=summary, category_labels=[item["label"] for item in sorted_categories], category_values=[float(item["value"]) for item in sorted_categories], category_keys=[str(item["id"]) for item in sorted_categories], category_entry_map=category_entry_map, default_category_id=default_category_id, benefit_labels=[label for label, _ in sorted(benefit_totals.items(), key=lambda item: (-item[1], item[0]))], benefit_values=[float(value) for _, value in sorted(benefit_totals.items(), key=lambda item: (-item[1], item[0]))], account_labels=[label for label, _ in sorted_accounts], account_values=[float(value) for _, value in sorted_accounts], top_entry_labels=[item["label"] for item in top_entries], top_entry_values=[float(item["value"]) for item in top_entries], budget_timeline_labels=[item.label for item in historical_months], budget_timeline_datasets=budget_timeline_datasets, ) @main_bp.route("/health") def health(): return {"status": "ok"}, 200 @main_bp.route("/media/avatars/") def uploaded_avatar(filename: str): return send_from_directory(current_app.config["AVATAR_UPLOAD_DIR"], filename)