232 lines
7.4 KiB
Python
232 lines
7.4 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from urllib.parse import urljoin
|
|
|
|
from flask import current_app
|
|
from pywebpush import WebPushException, webpush
|
|
from sqlalchemy import or_
|
|
|
|
from ..extensions import db
|
|
from ..models import NotificationLog, PushSubscription, TaskInstance, User, UserBadge
|
|
from .monthly import archive_months_missing_up_to_previous, get_snapshot_rows
|
|
from .dates import local_now, previous_month
|
|
|
|
|
|
@dataclass
|
|
class NotificationResult:
|
|
sent: int = 0
|
|
skipped: int = 0
|
|
failed: int = 0
|
|
|
|
|
|
def push_enabled() -> bool:
|
|
return bool(current_app.config["VAPID_PUBLIC_KEY"] and current_app.config["VAPID_PRIVATE_KEY"])
|
|
|
|
|
|
def _absolute_url(path: str) -> str:
|
|
base = current_app.config["APP_BASE_URL"].rstrip("/") + "/"
|
|
return urljoin(base, path.lstrip("/"))
|
|
|
|
|
|
def _notification_exists(user_id: int, notification_type: str, payload: dict) -> bool:
|
|
payload_value = json.dumps(payload, sort_keys=True)
|
|
return (
|
|
NotificationLog.query.filter_by(user_id=user_id, type=notification_type, payload=payload_value).first()
|
|
is not None
|
|
)
|
|
|
|
|
|
def _log_notification(user_id: int, notification_type: str, payload: dict) -> None:
|
|
db.session.add(
|
|
NotificationLog(user_id=user_id, type=notification_type, payload=json.dumps(payload, sort_keys=True))
|
|
)
|
|
|
|
|
|
def _send_subscription(subscription: PushSubscription, payload: dict) -> bool:
|
|
try:
|
|
webpush(
|
|
subscription_info={
|
|
"endpoint": subscription.endpoint,
|
|
"keys": {"p256dh": subscription.p256dh, "auth": subscription.auth},
|
|
},
|
|
data=json.dumps(payload),
|
|
vapid_private_key=current_app.config["VAPID_PRIVATE_KEY"],
|
|
vapid_claims={"sub": current_app.config["VAPID_CLAIMS_SUBJECT"]},
|
|
)
|
|
return True
|
|
except WebPushException:
|
|
return False
|
|
|
|
|
|
def _subscriptions_for_user(user_id: int) -> list[PushSubscription]:
|
|
return PushSubscription.query.filter_by(user_id=user_id).all()
|
|
|
|
|
|
def _send_payload_to_user(user: User, notification_type: str, marker: dict, payload: dict) -> NotificationResult:
|
|
result = NotificationResult()
|
|
subscriptions = _subscriptions_for_user(user.id)
|
|
if not subscriptions:
|
|
result.skipped += 1
|
|
return result
|
|
|
|
if _notification_exists(user.id, notification_type, marker):
|
|
result.skipped += 1
|
|
return result
|
|
|
|
sent_any = False
|
|
for subscription in subscriptions:
|
|
if _send_subscription(subscription, payload):
|
|
sent_any = True
|
|
result.sent += 1
|
|
else:
|
|
result.failed += 1
|
|
|
|
if sent_any:
|
|
_log_notification(user.id, notification_type, marker)
|
|
db.session.commit()
|
|
elif result.failed == 0:
|
|
result.skipped += 1
|
|
|
|
return result
|
|
|
|
|
|
def _merge_results(base: NotificationResult, extra: NotificationResult) -> NotificationResult:
|
|
base.sent += extra.sent
|
|
base.skipped += extra.skipped
|
|
base.failed += extra.failed
|
|
return base
|
|
|
|
|
|
def send_due_notifications() -> NotificationResult:
|
|
result = NotificationResult()
|
|
if not push_enabled():
|
|
result.skipped += 1
|
|
return result
|
|
|
|
now = local_now()
|
|
if now.hour < 9:
|
|
result.skipped += 1
|
|
return result
|
|
|
|
today = now.date()
|
|
relevant_tasks = TaskInstance.query.filter(
|
|
TaskInstance.completed_at.is_(None),
|
|
TaskInstance.due_date == today,
|
|
or_(
|
|
TaskInstance.assigned_user_id.isnot(None),
|
|
TaskInstance.assigned_user_secondary_id.isnot(None),
|
|
),
|
|
).all()
|
|
|
|
tasks_by_user: dict[int, list[TaskInstance]] = {}
|
|
for task in relevant_tasks:
|
|
for assigned_user in task.assigned_users:
|
|
tasks_by_user.setdefault(assigned_user.id, []).append(task)
|
|
|
|
for user in User.query.order_by(User.name.asc()).all():
|
|
if not user.notification_task_due_enabled:
|
|
result.skipped += 1
|
|
continue
|
|
|
|
personal_tasks = tasks_by_user.get(user.id, [])
|
|
if not personal_tasks:
|
|
result.skipped += 1
|
|
continue
|
|
|
|
task_count = len(personal_tasks)
|
|
payload = {
|
|
"title": "Putzliga für heute",
|
|
"body": (
|
|
"Heute wartet 1 offene Aufgabe auf dich. Zeit zum Punkte sammeln."
|
|
if task_count == 1
|
|
else f"Heute warten {task_count} offene Aufgaben auf dich. Zeit zum Punkte sammeln."
|
|
),
|
|
"icon": _absolute_url("/static/images/pwa-icon-192.png"),
|
|
"badge": _absolute_url("/static/images/pwa-badge.png"),
|
|
"url": _absolute_url("/my-tasks"),
|
|
"tag": f"due-{today.isoformat()}-{user.id}",
|
|
}
|
|
marker = {"date": today.isoformat()}
|
|
_merge_results(result, _send_payload_to_user(user, "task_due_daily", marker, payload))
|
|
return result
|
|
|
|
|
|
def send_monthly_winner_notifications() -> NotificationResult:
|
|
result = NotificationResult()
|
|
if not push_enabled():
|
|
result.skipped += 1
|
|
return result
|
|
|
|
now = local_now()
|
|
if not (now.day == 1 and now.hour >= 9):
|
|
result.skipped += 1
|
|
return result
|
|
|
|
archive_months_missing_up_to_previous()
|
|
target_year, target_month = previous_month(now.year, now.month)
|
|
rows = get_snapshot_rows(target_year, target_month)
|
|
if not rows:
|
|
result.skipped += 1
|
|
return result
|
|
|
|
winners = [row.user.name for row in rows if row.rank == 1]
|
|
winner_text = ", ".join(winners)
|
|
users = User.query.order_by(User.name.asc()).all()
|
|
marker = {"year": target_year, "month": target_month}
|
|
|
|
for user in users:
|
|
if not user.notification_monthly_winner_enabled:
|
|
result.skipped += 1
|
|
continue
|
|
|
|
payload = {
|
|
"title": "Der Haushalts-Champion des letzten Monats steht fest",
|
|
"body": f"{winner_text} führt den letzten Monat an. Schau ins Scoreboard.",
|
|
"icon": _absolute_url("/static/images/pwa-icon-192.png"),
|
|
"badge": _absolute_url("/static/images/pwa-badge.png"),
|
|
"url": _absolute_url(f"/scoreboard?archive={target_year}-{target_month:02d}"),
|
|
"tag": f"winner-{target_year}-{target_month}",
|
|
}
|
|
_merge_results(result, _send_payload_to_user(user, "monthly_winner", marker, payload))
|
|
|
|
return result
|
|
|
|
|
|
def send_badge_notifications_for_awards(awards: list[UserBadge]) -> NotificationResult:
|
|
result = NotificationResult()
|
|
if not push_enabled():
|
|
result.skipped += len(awards) or 1
|
|
return result
|
|
|
|
for award in awards:
|
|
user = award.user
|
|
definition = award.badge_definition
|
|
if not user or not definition:
|
|
result.skipped += 1
|
|
continue
|
|
if not user.notification_badge_enabled:
|
|
result.skipped += 1
|
|
continue
|
|
|
|
marker = {"user_badge_id": award.id}
|
|
payload = {
|
|
"title": "Neues Badge freigeschaltet",
|
|
"body": f"{definition.name}: {definition.description}",
|
|
"icon": _absolute_url("/static/images/pwa-icon-192.png"),
|
|
"badge": _absolute_url("/static/images/pwa-badge.png"),
|
|
"url": _absolute_url("/settings"),
|
|
"tag": f"badge-{award.id}",
|
|
}
|
|
_merge_results(result, _send_payload_to_user(user, "badge_award", marker, payload))
|
|
|
|
return result
|
|
|
|
|
|
def run_scheduled_notifications() -> dict[str, NotificationResult]:
|
|
return {
|
|
"daily_due": send_due_notifications(),
|
|
"monthly_winner": send_monthly_winner_notifications(),
|
|
}
|