from __future__ import annotations import json from collections import defaultdict from datetime import date, datetime, time, timedelta from sqlalchemy import and_ from ..extensions import db from ..models import BadgeDefinition, MonthlyScoreSnapshot, TaskInstance, User, UserBadge from .dates import month_bounds, previous_month def _max_day_streak(days: set[date]) -> int: if not days: return 0 ordered = sorted(days) best = streak = 1 for previous, current in zip(ordered, ordered[1:]): if current == previous + timedelta(days=1): streak += 1 else: streak = 1 best = max(best, streak) return best def award_badge(user: User, badge_key: str, *, awarded_at: datetime | None = None, context: dict | None = None) -> bool: definition = BadgeDefinition.query.filter_by(key=badge_key, active=True).first() if not definition: return False existing = UserBadge.query.filter_by(user_id=user.id, badge_definition_id=definition.id).first() if existing: return False db.session.add( UserBadge( user_id=user.id, badge_definition_id=definition.id, awarded_at=awarded_at or datetime.utcnow(), context=json.dumps(context, sort_keys=True) if context else None, ) ) return True def badge_awards_for_month(user_id: int, year: int, month: int) -> list[UserBadge]: start, end = month_bounds(year, month) return ( UserBadge.query.filter( UserBadge.user_id == user_id, UserBadge.awarded_at >= start, UserBadge.awarded_at < end, ) .join(BadgeDefinition) .filter(BadgeDefinition.active.is_(True)) .order_by(UserBadge.awarded_at.asc()) .all() ) def earned_badges_for_user(user_id: int) -> list[UserBadge]: return ( UserBadge.query.filter_by(user_id=user_id) .join(BadgeDefinition) .order_by(UserBadge.awarded_at.desc()) .all() ) def _completion_metrics(user: User) -> dict[str, int]: tasks = ( TaskInstance.query.filter( TaskInstance.completed_by_user_id == user.id, TaskInstance.completed_at.isnot(None), ) .order_by(TaskInstance.completed_at.asc()) .all() ) completion_days: set[date] = set() metrics = defaultdict(int) max_points = 0 for task in tasks: completion_day = task.completed_at.date() completion_days.add(completion_day) metrics["first_task_completed"] += 1 metrics["total_tasks_completed"] += 1 if completion_day <= task.due_date: metrics["on_time_tasks_completed"] += 1 if completion_day <= task.due_date - timedelta(days=1): metrics["early_tasks_completed"] += 1 if task.assigned_user_id and task.assigned_user_id != user.id: metrics["foreign_tasks_completed"] += 1 max_points = max(max_points, task.points_awarded) metrics["streak_days"] = _max_day_streak(completion_days) metrics["high_point_task"] = max_points return metrics def evaluate_task_badges(user: User) -> list[str]: definitions = BadgeDefinition.query.filter_by(active=True).all() metrics = _completion_metrics(user) unlocked: list[str] = [] for definition in definitions: metric_value = metrics.get(definition.trigger_type) if metric_value is None: continue if metric_value >= definition.threshold and award_badge(user, definition.key): unlocked.append(definition.name) if unlocked: db.session.commit() return unlocked def _month_end_award_time(year: int, month: int) -> datetime: _, end = month_bounds(year, month) return end - timedelta(seconds=1) def _user_had_clean_month(user_id: int, year: int, month: int) -> bool: start_date = date(year, month, 1) end_date = (month_bounds(year, month)[1] - timedelta(days=1)).date() tasks = TaskInstance.query.filter( TaskInstance.assigned_user_id == user_id, TaskInstance.due_date >= start_date, TaskInstance.due_date <= end_date, ).all() if not tasks: return False for task in tasks: if not task.completed_at: return False if task.completed_at.date() > task.due_date: return False return True def _winner_user_ids(year: int, month: int) -> set[int]: rows = MonthlyScoreSnapshot.query.filter_by(year=year, month=month, rank=1).all() return {row.user_id for row in rows} def evaluate_monthly_badges(year: int, month: int) -> list[str]: award_time = _month_end_award_time(year, month) unlocked: list[str] = [] winners = _winner_user_ids(year, month) previous_year, previous_month_value = previous_month(year, month) previous_winners = _winner_user_ids(previous_year, previous_month_value) for user in User.query.order_by(User.id.asc()).all(): if user.id in winners: if award_badge(user, "monthly_champion", awarded_at=award_time, context={"year": year, "month": month}): unlocked.append(f"{user.name}: Monatssieger") if user.id in previous_winners: if award_badge(user, "title_defender", awarded_at=award_time, context={"year": year, "month": month}): unlocked.append(f"{user.name}: Titelverteidiger") elif MonthlyScoreSnapshot.query.filter_by(year=previous_year, month=previous_month_value, user_id=user.id).first(): if award_badge(user, "comeback_kid", awarded_at=award_time, context={"year": year, "month": month}): unlocked.append(f"{user.name}: Comeback Kid") if _user_had_clean_month(user.id, year, month): if award_badge(user, "white_glove", awarded_at=award_time, context={"year": year, "month": month}): unlocked.append(f"{user.name}: Weiße Weste") if unlocked: db.session.commit() return unlocked def sync_existing_badges() -> None: for user in User.query.order_by(User.id.asc()).all(): evaluate_task_badges(user) archived_months = ( db.session.query(MonthlyScoreSnapshot.year, MonthlyScoreSnapshot.month) .group_by(MonthlyScoreSnapshot.year, MonthlyScoreSnapshot.month) .order_by(MonthlyScoreSnapshot.year.asc(), MonthlyScoreSnapshot.month.asc()) .all() ) for row in archived_months: evaluate_monthly_badges(row.year, row.month)