from __future__ import annotations from collections import defaultdict from datetime import datetime from sqlalchemy import extract, select from ..extensions import db from ..models import BadgeDefinition, MonthlyScoreSnapshot, TaskInstance, User from .badges import compute_badge_awards from .dates import local_now, month_bounds, next_month, previous_month def _build_ranking(rows: list[dict]) -> list[dict]: rows.sort(key=lambda row: (-row["total_points"], -row["completed_tasks_count"], row["user"].name.lower())) for index, row in enumerate(rows, start=1): row["rank"] = index return rows def compute_monthly_scores(year: int, month: int) -> list[dict]: start, end = month_bounds(year, month) users = User.query.order_by(User.name.asc()).all() badges = BadgeDefinition.query.filter_by(active=True).all() completed_tasks = TaskInstance.query.filter( TaskInstance.completed_at.isnot(None), TaskInstance.completed_at >= start, TaskInstance.completed_at < end, ).all() tasks_by_user: dict[int, list[TaskInstance]] = defaultdict(list) for task in completed_tasks: if task.completed_by_user_id: tasks_by_user[task.completed_by_user_id].append(task) rows = [] for user in users: personal_tasks = tasks_by_user.get(user.id, []) base_points = sum(task.points_awarded for task in personal_tasks) awards = compute_badge_awards(badges, personal_tasks) bonus_points = sum(award["bonus_points"] for award in awards) rows.append( { "user": user, "base_points": base_points, "bonus_points": bonus_points, "total_points": base_points + bonus_points, "completed_tasks_count": len(personal_tasks), "badges": awards, } ) return _build_ranking(rows) def ensure_monthly_snapshots(reference: datetime | None = None) -> None: now = reference or local_now().replace(tzinfo=None) target_year, target_month = previous_month(now.year, now.month) if MonthlyScoreSnapshot.query.filter_by(year=target_year, month=target_month).count(): return snapshot_rows = compute_monthly_scores(target_year, target_month) for row in snapshot_rows: db.session.add( MonthlyScoreSnapshot( year=target_year, month=target_month, user_id=row["user"].id, total_points=row["total_points"], completed_tasks_count=row["completed_tasks_count"], rank=row["rank"], ) ) db.session.commit() def archive_months_missing_up_to_previous() -> None: now = local_now() previous_year, previous_month_value = previous_month(now.year, now.month) latest = ( MonthlyScoreSnapshot.query.order_by(MonthlyScoreSnapshot.year.desc(), MonthlyScoreSnapshot.month.desc()).first() ) if latest: year, month = next_month(latest.year, latest.month) else: year, month = previous_year, previous_month_value while (year, month) <= (previous_year, previous_month_value): if not MonthlyScoreSnapshot.query.filter_by(year=year, month=month).count(): rows = compute_monthly_scores(year, month) for row in rows: db.session.add( MonthlyScoreSnapshot( year=year, month=month, user_id=row["user"].id, total_points=row["total_points"], completed_tasks_count=row["completed_tasks_count"], rank=row["rank"], ) ) db.session.commit() year, month = next_month(year, month) def get_archived_months(limit: int = 12) -> list[tuple[int, int]]: rows = ( db.session.query(MonthlyScoreSnapshot.year, MonthlyScoreSnapshot.month) .group_by(MonthlyScoreSnapshot.year, MonthlyScoreSnapshot.month) .order_by(MonthlyScoreSnapshot.year.desc(), MonthlyScoreSnapshot.month.desc()) .limit(limit) .all() ) return [(row.year, row.month) for row in rows] def get_snapshot_rows(year: int, month: int) -> list[MonthlyScoreSnapshot]: return ( MonthlyScoreSnapshot.query.filter_by(year=year, month=month) .order_by(MonthlyScoreSnapshot.rank.asc(), MonthlyScoreSnapshot.total_points.desc()) .all() )