Files
putzliga/app/services/notifications.py

232 lines
7.4 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass
from urllib.parse import urljoin
from flask import current_app
from pywebpush import WebPushException, webpush
from sqlalchemy import or_
from ..extensions import db
from ..models import NotificationLog, PushSubscription, TaskInstance, User, UserBadge
from .monthly import archive_months_missing_up_to_previous, get_snapshot_rows
from .dates import local_now, previous_month
@dataclass
class NotificationResult:
sent: int = 0
skipped: int = 0
failed: int = 0
def push_enabled() -> bool:
return bool(current_app.config["VAPID_PUBLIC_KEY"] and current_app.config["VAPID_PRIVATE_KEY"])
def _absolute_url(path: str) -> str:
base = current_app.config["APP_BASE_URL"].rstrip("/") + "/"
return urljoin(base, path.lstrip("/"))
def _notification_exists(user_id: int, notification_type: str, payload: dict) -> bool:
payload_value = json.dumps(payload, sort_keys=True)
return (
NotificationLog.query.filter_by(user_id=user_id, type=notification_type, payload=payload_value).first()
is not None
)
def _log_notification(user_id: int, notification_type: str, payload: dict) -> None:
db.session.add(
NotificationLog(user_id=user_id, type=notification_type, payload=json.dumps(payload, sort_keys=True))
)
def _send_subscription(subscription: PushSubscription, payload: dict) -> bool:
try:
webpush(
subscription_info={
"endpoint": subscription.endpoint,
"keys": {"p256dh": subscription.p256dh, "auth": subscription.auth},
},
data=json.dumps(payload),
vapid_private_key=current_app.config["VAPID_PRIVATE_KEY"],
vapid_claims={"sub": current_app.config["VAPID_CLAIMS_SUBJECT"]},
)
return True
except WebPushException:
return False
def _subscriptions_for_user(user_id: int) -> list[PushSubscription]:
return PushSubscription.query.filter_by(user_id=user_id).all()
def _send_payload_to_user(user: User, notification_type: str, marker: dict, payload: dict) -> NotificationResult:
result = NotificationResult()
subscriptions = _subscriptions_for_user(user.id)
if not subscriptions:
result.skipped += 1
return result
if _notification_exists(user.id, notification_type, marker):
result.skipped += 1
return result
sent_any = False
for subscription in subscriptions:
if _send_subscription(subscription, payload):
sent_any = True
result.sent += 1
else:
result.failed += 1
if sent_any:
_log_notification(user.id, notification_type, marker)
db.session.commit()
elif result.failed == 0:
result.skipped += 1
return result
def _merge_results(base: NotificationResult, extra: NotificationResult) -> NotificationResult:
base.sent += extra.sent
base.skipped += extra.skipped
base.failed += extra.failed
return base
def send_due_notifications() -> NotificationResult:
result = NotificationResult()
if not push_enabled():
result.skipped += 1
return result
now = local_now()
if now.hour < 9:
result.skipped += 1
return result
today = now.date()
relevant_tasks = TaskInstance.query.filter(
TaskInstance.completed_at.is_(None),
TaskInstance.due_date == today,
or_(
TaskInstance.assigned_user_id.isnot(None),
TaskInstance.assigned_user_secondary_id.isnot(None),
),
).all()
tasks_by_user: dict[int, list[TaskInstance]] = {}
for task in relevant_tasks:
for assigned_user in task.assigned_users:
tasks_by_user.setdefault(assigned_user.id, []).append(task)
for user in User.query.order_by(User.name.asc()).all():
if not user.notification_task_due_enabled:
result.skipped += 1
continue
personal_tasks = tasks_by_user.get(user.id, [])
if not personal_tasks:
result.skipped += 1
continue
task_count = len(personal_tasks)
payload = {
"title": "Putzliga für heute",
"body": (
"Heute wartet 1 offene Aufgabe auf dich. Zeit zum Punkte sammeln."
if task_count == 1
else f"Heute warten {task_count} offene Aufgaben auf dich. Zeit zum Punkte sammeln."
),
"icon": _absolute_url("/static/images/pwa-icon-192.png"),
"badge": _absolute_url("/static/images/pwa-badge.png"),
"url": _absolute_url("/my-tasks"),
"tag": f"due-{today.isoformat()}-{user.id}",
}
marker = {"date": today.isoformat()}
_merge_results(result, _send_payload_to_user(user, "task_due_daily", marker, payload))
return result
def send_monthly_winner_notifications() -> NotificationResult:
result = NotificationResult()
if not push_enabled():
result.skipped += 1
return result
now = local_now()
if not (now.day == 1 and now.hour >= 9):
result.skipped += 1
return result
archive_months_missing_up_to_previous()
target_year, target_month = previous_month(now.year, now.month)
rows = get_snapshot_rows(target_year, target_month)
if not rows:
result.skipped += 1
return result
winners = [row.user.name for row in rows if row.rank == 1]
winner_text = ", ".join(winners)
users = User.query.order_by(User.name.asc()).all()
marker = {"year": target_year, "month": target_month}
for user in users:
if not user.notification_monthly_winner_enabled:
result.skipped += 1
continue
payload = {
"title": "Der Haushalts-Champion des letzten Monats steht fest",
"body": f"{winner_text} führt den letzten Monat an. Schau ins Scoreboard.",
"icon": _absolute_url("/static/images/pwa-icon-192.png"),
"badge": _absolute_url("/static/images/pwa-badge.png"),
"url": _absolute_url(f"/scoreboard?archive={target_year}-{target_month:02d}"),
"tag": f"winner-{target_year}-{target_month}",
}
_merge_results(result, _send_payload_to_user(user, "monthly_winner", marker, payload))
return result
def send_badge_notifications_for_awards(awards: list[UserBadge]) -> NotificationResult:
result = NotificationResult()
if not push_enabled():
result.skipped += len(awards) or 1
return result
for award in awards:
user = award.user
definition = award.badge_definition
if not user or not definition:
result.skipped += 1
continue
if not user.notification_badge_enabled:
result.skipped += 1
continue
marker = {"user_badge_id": award.id}
payload = {
"title": "Neues Badge freigeschaltet",
"body": f"{definition.name}: {definition.description}",
"icon": _absolute_url("/static/images/pwa-icon-192.png"),
"badge": _absolute_url("/static/images/pwa-badge.png"),
"url": _absolute_url("/settings"),
"tag": f"badge-{award.id}",
}
_merge_results(result, _send_payload_to_user(user, "badge_award", marker, payload))
return result
def run_scheduled_notifications() -> dict[str, NotificationResult]:
return {
"daily_due": send_due_notifications(),
"monthly_winner": send_monthly_winner_notifications(),
}