from __future__ import annotations import json from dataclasses import dataclass from urllib.parse import urljoin from flask import current_app from pywebpush import WebPushException, webpush from sqlalchemy import or_ from ..extensions import db from ..models import NotificationLog, PushSubscription, TaskInstance, User, UserBadge from .monthly import archive_months_missing_up_to_previous, get_snapshot_rows from .dates import local_now, previous_month @dataclass class NotificationResult: sent: int = 0 skipped: int = 0 failed: int = 0 def push_enabled() -> bool: return bool(current_app.config["VAPID_PUBLIC_KEY"] and current_app.config["VAPID_PRIVATE_KEY"]) def _absolute_url(path: str) -> str: base = current_app.config["APP_BASE_URL"].rstrip("/") + "/" return urljoin(base, path.lstrip("/")) def _notification_exists(user_id: int, notification_type: str, payload: dict) -> bool: payload_value = json.dumps(payload, sort_keys=True) return ( NotificationLog.query.filter_by(user_id=user_id, type=notification_type, payload=payload_value).first() is not None ) def _log_notification(user_id: int, notification_type: str, payload: dict) -> None: db.session.add( NotificationLog(user_id=user_id, type=notification_type, payload=json.dumps(payload, sort_keys=True)) ) def _send_subscription(subscription: PushSubscription, payload: dict) -> bool: try: webpush( subscription_info={ "endpoint": subscription.endpoint, "keys": {"p256dh": subscription.p256dh, "auth": subscription.auth}, }, data=json.dumps(payload), vapid_private_key=current_app.config["VAPID_PRIVATE_KEY"], vapid_claims={"sub": current_app.config["VAPID_CLAIMS_SUBJECT"]}, ) return True except WebPushException: return False def _subscriptions_for_user(user_id: int) -> list[PushSubscription]: return PushSubscription.query.filter_by(user_id=user_id).all() def _send_payload_to_user(user: User, notification_type: str, marker: dict, payload: dict) -> NotificationResult: result = NotificationResult() subscriptions = _subscriptions_for_user(user.id) if not subscriptions: result.skipped += 1 return result if _notification_exists(user.id, notification_type, marker): result.skipped += 1 return result sent_any = False for subscription in subscriptions: if _send_subscription(subscription, payload): sent_any = True result.sent += 1 else: result.failed += 1 if sent_any: _log_notification(user.id, notification_type, marker) db.session.commit() elif result.failed == 0: result.skipped += 1 return result def _merge_results(base: NotificationResult, extra: NotificationResult) -> NotificationResult: base.sent += extra.sent base.skipped += extra.skipped base.failed += extra.failed return base def send_due_notifications() -> NotificationResult: result = NotificationResult() if not push_enabled(): result.skipped += 1 return result now = local_now() if now.hour < 9: result.skipped += 1 return result today = now.date() relevant_tasks = TaskInstance.query.filter( TaskInstance.completed_at.is_(None), TaskInstance.due_date == today, or_( TaskInstance.assigned_user_id.isnot(None), TaskInstance.assigned_user_secondary_id.isnot(None), ), ).all() tasks_by_user: dict[int, list[TaskInstance]] = {} for task in relevant_tasks: for assigned_user in task.assigned_users: tasks_by_user.setdefault(assigned_user.id, []).append(task) for user in User.query.order_by(User.name.asc()).all(): if not user.notification_task_due_enabled: result.skipped += 1 continue personal_tasks = tasks_by_user.get(user.id, []) if not personal_tasks: result.skipped += 1 continue task_count = len(personal_tasks) payload = { "title": "Putzliga für heute", "body": ( "Heute wartet 1 offene Aufgabe auf dich. Zeit zum Punkte sammeln." if task_count == 1 else f"Heute warten {task_count} offene Aufgaben auf dich. Zeit zum Punkte sammeln." ), "icon": _absolute_url("/static/images/pwa-icon-192.png"), "badge": _absolute_url("/static/images/pwa-badge.png"), "url": _absolute_url("/my-tasks"), "tag": f"due-{today.isoformat()}-{user.id}", } marker = {"date": today.isoformat()} _merge_results(result, _send_payload_to_user(user, "task_due_daily", marker, payload)) return result def send_monthly_winner_notifications() -> NotificationResult: result = NotificationResult() if not push_enabled(): result.skipped += 1 return result now = local_now() if not (now.day == 1 and now.hour >= 9): result.skipped += 1 return result archive_months_missing_up_to_previous() target_year, target_month = previous_month(now.year, now.month) rows = get_snapshot_rows(target_year, target_month) if not rows: result.skipped += 1 return result winners = [row.user.name for row in rows if row.rank == 1] winner_text = ", ".join(winners) users = User.query.order_by(User.name.asc()).all() marker = {"year": target_year, "month": target_month} for user in users: if not user.notification_monthly_winner_enabled: result.skipped += 1 continue payload = { "title": "Der Haushalts-Champion des letzten Monats steht fest", "body": f"{winner_text} führt den letzten Monat an. Schau ins Scoreboard.", "icon": _absolute_url("/static/images/pwa-icon-192.png"), "badge": _absolute_url("/static/images/pwa-badge.png"), "url": _absolute_url(f"/scoreboard?archive={target_year}-{target_month:02d}"), "tag": f"winner-{target_year}-{target_month}", } _merge_results(result, _send_payload_to_user(user, "monthly_winner", marker, payload)) return result def send_badge_notifications_for_awards(awards: list[UserBadge]) -> NotificationResult: result = NotificationResult() if not push_enabled(): result.skipped += len(awards) or 1 return result for award in awards: user = award.user definition = award.badge_definition if not user or not definition: result.skipped += 1 continue if not user.notification_badge_enabled: result.skipped += 1 continue marker = {"user_badge_id": award.id} payload = { "title": "Neues Badge freigeschaltet", "body": f"{definition.name}: {definition.description}", "icon": _absolute_url("/static/images/pwa-icon-192.png"), "badge": _absolute_url("/static/images/pwa-badge.png"), "url": _absolute_url("/settings"), "tag": f"badge-{award.id}", } _merge_results(result, _send_payload_to_user(user, "badge_award", marker, payload)) return result def run_scheduled_notifications() -> dict[str, NotificationResult]: return { "daily_due": send_due_notifications(), "monthly_winner": send_monthly_winner_notifications(), }