from __future__ import annotations from pywebpush import WebPushException, webpush from app.models import PushSubscription class PushService: def __init__(self, public_key: str, private_key: str, claims: dict): self.public_key = public_key self.private_key = private_key self.claims = claims def send_to_user(self, user, title: str, body: str, url: str = "/") -> int: if not self.public_key or not self.private_key: return 0 sent = 0 for sub in user.push_subscriptions: try: webpush( subscription_info={ "endpoint": sub.endpoint, "keys": {"p256dh": sub.p256dh_key, "auth": sub.auth_key}, }, data=f'{{"title":"{title}","body":"{body}","url":"{url}"}}', vapid_private_key=self.private_key, vapid_claims=self.claims, ) sent += 1 except WebPushException: continue return sent