Model/backend/app/db/functions/portfolio_functions.py
Khalim Conn-Kowlessar af5dbe325d feat(modelling): cut plan→recommendation readers onto plan_id
Rewrite the three structurally-identical m2m-join readers
(portfolio_functions.aggregate_portfolio_recommendations,
Outputs.get_recommendations_from_db, export get_recommendations) to join
PlanModel directly via recommendation.plan_id, dropping the plan_recommendations
join and its now-unused import. The writers set plan_id (prior slice), so the
rows resolve. test_export pins the export reader through the cut (its fixtures
now set recommendation.plan_id). A portfolio_functions DB characterization test
lands with the scenario consolidation (which provides the full-parity scenario
table the aggregation writes to).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 21:09:43 +00:00

60 lines
2.1 KiB
Python

from sqlalchemy import func
from backend.app.db.models.recommendations import (
PlanModel,
Recommendation,
ScenarioModel,
)
def aggregate_portfolio_recommendations(
session,
portfolio_id: int,
scenario_id: int,
total_valuation_increase: float,
labour_days: float,
aggregated_data: dict,
):
# Aggregate multiple fields
aggregates = (
session.query(
func.sum(Recommendation.estimated_cost).label("cost"),
func.sum(Recommendation.total_work_hours).label("total_work_hours"),
func.sum(Recommendation.kwh_savings).label("energy_savings"),
func.sum(Recommendation.co2_equivalent_savings).label(
"co2_equivalent_savings"
),
func.sum(Recommendation.energy_cost_savings).label("energy_cost_savings"),
)
.join(PlanModel, PlanModel.id == Recommendation.plan_id)
.filter(
PlanModel.portfolio_id == portfolio_id,
PlanModel.scenario_id == scenario_id,
Recommendation.default == True,
)
.one()
)
# Contingeny and funding are in the aggregated data
aggregates_dict = {
"cost": aggregates.cost or 0,
"total_work_hours": aggregates.total_work_hours or 0,
"energy_savings": aggregates.energy_savings or 0,
"co2_equivalent_savings": aggregates.co2_equivalent_savings or 0,
"energy_cost_savings": aggregates.energy_cost_savings or 0,
**aggregated_data,
}
# Get the scenario and update the fields. This data needs to be stored against the scenario, not the portfolio
portfolio_scenario = session.query(ScenarioModel).filter_by(id=scenario_id).one()
# Update the data
for key, value in aggregates_dict.items():
setattr(portfolio_scenario, key, value)
# Insert total valuation increase and labour days
portfolio_scenario.property_valuation_increase = total_valuation_increase
portfolio_scenario.labour_days = labour_days
# Merge the updated portfolio plan back into the session
session.merge(portfolio_scenario)
session.flush()