from __future__ import annotations from collections import defaultdict from datetime import datetime from ..extensions import db from ..models import MonthlyScoreSnapshot, TaskInstance, User from .badges import badge_awards_for_month, evaluate_monthly_badges from .dates import local_now, month_bounds, next_month, previous_month def _build_ranking(rows: list[dict]) -> list[dict]: rows.sort(key=lambda row: (-row["total_points"], -row["completed_tasks_count"], row["user"].name.lower())) for index, row in enumerate(rows, start=1): row["rank"] = index return rows def compute_monthly_scores(year: int, month: int) -> list[dict]: start, end = month_bounds(year, month) users = User.query.order_by(User.name.asc()).all() completed_tasks = TaskInstance.query.filter( TaskInstance.completed_at.isnot(None), TaskInstance.completed_at >= start, TaskInstance.completed_at < end, ).all() tasks_by_user: dict[int, list[TaskInstance]] = defaultdict(list) for task in completed_tasks: if task.completed_by_user_id: tasks_by_user[task.completed_by_user_id].append(task) rows = [] for user in users: personal_tasks = tasks_by_user.get(user.id, []) base_points = sum(task.points_awarded for task in personal_tasks) earned_badges = badge_awards_for_month(user.id, year, month) bonus_points = sum(badge.badge_definition.bonus_points for badge in earned_badges if badge.badge_definition.active) rows.append( { "user": user, "base_points": base_points, "bonus_points": bonus_points, "total_points": base_points + bonus_points, "completed_tasks_count": len(personal_tasks), "badges": earned_badges, } ) return _build_ranking(rows) def ensure_monthly_snapshots(reference: datetime | None = None) -> None: now = reference or local_now().replace(tzinfo=None) target_year, target_month = previous_month(now.year, now.month) if MonthlyScoreSnapshot.query.filter_by(year=target_year, month=target_month).count(): return snapshot_rows = compute_monthly_scores(target_year, target_month) for row in snapshot_rows: db.session.add( MonthlyScoreSnapshot( year=target_year, month=target_month, user_id=row["user"].id, total_points=row["total_points"], completed_tasks_count=row["completed_tasks_count"], rank=row["rank"], ) ) db.session.commit() def archive_months_missing_up_to_previous() -> None: now = local_now() previous_year, previous_month_value = previous_month(now.year, now.month) latest = ( MonthlyScoreSnapshot.query.order_by(MonthlyScoreSnapshot.year.desc(), MonthlyScoreSnapshot.month.desc()).first() ) if latest: year, month = next_month(latest.year, latest.month) else: year, month = previous_year, previous_month_value while (year, month) <= (previous_year, previous_month_value): if not MonthlyScoreSnapshot.query.filter_by(year=year, month=month).count(): rows = compute_monthly_scores(year, month) for row in rows: db.session.add( MonthlyScoreSnapshot( year=year, month=month, user_id=row["user"].id, total_points=row["total_points"], completed_tasks_count=row["completed_tasks_count"], rank=row["rank"], ) ) db.session.commit() evaluate_monthly_badges(year, month) year, month = next_month(year, month) def get_archived_months(limit: int = 12) -> list[tuple[int, int]]: rows = ( db.session.query(MonthlyScoreSnapshot.year, MonthlyScoreSnapshot.month) .group_by(MonthlyScoreSnapshot.year, MonthlyScoreSnapshot.month) .order_by(MonthlyScoreSnapshot.year.desc(), MonthlyScoreSnapshot.month.desc()) .limit(limit) .all() ) return [(row.year, row.month) for row in rows] def get_snapshot_rows(year: int, month: int) -> list[MonthlyScoreSnapshot]: return ( MonthlyScoreSnapshot.query.filter_by(year=year, month=month) .order_by(MonthlyScoreSnapshot.rank.asc(), MonthlyScoreSnapshot.total_points.desc()) .all() )