98 lines
4.1 KiB
Python
98 lines
4.1 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
|
|
from app.models import CostParticipant, EntryShareRule, to_decimal
|
|
|
|
|
|
class ShareCalculationService:
|
|
def calculate_entry_shares(self, monthly_entry_value) -> dict:
|
|
amount = to_decimal(monthly_entry_value.planned_amount)
|
|
rules = monthly_entry_value.entry.share_rules
|
|
if not rules:
|
|
return {
|
|
"total": amount,
|
|
"internal_total": amount,
|
|
"external_total": Decimal("0.00"),
|
|
"shares": [],
|
|
}
|
|
|
|
shares = []
|
|
total_assigned = Decimal("0.00")
|
|
equal_rules = [rule for rule in rules if rule.share_type == "equal"]
|
|
non_equal_rules = [rule for rule in rules if rule.share_type != "equal"]
|
|
|
|
for rule in non_equal_rules:
|
|
share_amount = self._amount_for_rule(amount, rule)
|
|
total_assigned += share_amount
|
|
shares.append(self._serialize_share(rule.participant, share_amount))
|
|
|
|
if equal_rules:
|
|
remaining = amount - total_assigned
|
|
unit = (remaining / Decimal(len(equal_rules))).quantize(
|
|
Decimal("0.01"), rounding=ROUND_HALF_UP
|
|
)
|
|
for index, rule in enumerate(equal_rules):
|
|
share_amount = unit
|
|
if index == len(equal_rules) - 1:
|
|
share_amount = amount - total_assigned - (unit * (len(equal_rules) - 1))
|
|
shares.append(self._serialize_share(rule.participant, share_amount))
|
|
|
|
internal_total = sum(
|
|
(item["amount"] for item in shares if not item["is_external"]), Decimal("0.00")
|
|
)
|
|
external_total = sum(
|
|
(item["amount"] for item in shares if item["is_external"]), Decimal("0.00")
|
|
)
|
|
return {
|
|
"total": amount,
|
|
"internal_total": internal_total,
|
|
"external_total": external_total,
|
|
"shares": shares,
|
|
}
|
|
|
|
def calculate_external_month_totals(self, month) -> list[dict]:
|
|
participant_totals: dict[int, dict] = defaultdict(
|
|
lambda: {
|
|
"participant_id": None,
|
|
"participant_name": "",
|
|
"participant_avatar_url": None,
|
|
"participant_avatar_initials": "",
|
|
"total": Decimal("0.00"),
|
|
"items": [],
|
|
}
|
|
)
|
|
for value in month.entry_values:
|
|
result = self.calculate_entry_shares(value)
|
|
for share in result["shares"]:
|
|
if not share["is_external"]:
|
|
continue
|
|
bucket = participant_totals[share["participant_id"]]
|
|
bucket["participant_id"] = share["participant_id"]
|
|
bucket["participant_name"] = share["participant_name"]
|
|
bucket["participant_avatar_url"] = share["participant_avatar_url"]
|
|
bucket["participant_avatar_initials"] = share["participant_avatar_initials"]
|
|
bucket["total"] += share["amount"]
|
|
bucket["items"].append({"entry_name": value.entry.name, "amount": share["amount"]})
|
|
return sorted(participant_totals.values(), key=lambda item: item["participant_name"])
|
|
|
|
def _amount_for_rule(self, total_amount: Decimal, rule: EntryShareRule) -> Decimal:
|
|
if rule.share_type == "percentage":
|
|
return (total_amount * to_decimal(rule.share_value) / Decimal("100")).quantize(
|
|
Decimal("0.01"), rounding=ROUND_HALF_UP
|
|
)
|
|
if rule.share_type == "fixed":
|
|
return to_decimal(rule.share_value)
|
|
return Decimal("0.00")
|
|
|
|
def _serialize_share(self, participant: CostParticipant, amount: Decimal) -> dict:
|
|
return {
|
|
"participant_id": participant.id,
|
|
"participant_name": participant.display_name,
|
|
"participant_avatar_url": participant.avatar_url,
|
|
"participant_avatar_initials": participant.avatar_initials,
|
|
"amount": amount.quantize(Decimal("0.01")),
|
|
"is_external": participant.is_external,
|
|
}
|