Model/orchestration/modelling_orchestrator.py
Khalim Conn-Kowlessar c7e2aa3755 feat(modelling): ModellingOrchestrator persists a Plan end-to-end (#1157)
Slice 4b — closes the #1157 tracer. ModellingOrchestrator.run(property_ids,
scenario_ids, portfolio_id) now does real work in one Unit of Work,
committed once (ADR-0011/0012/0016/0017):

  read Property (effective EPC) + Scenario via repos → recommend_cavity_wall
  → select its Option → PackageScorer.score (role-2 package total) +
  marginal_impacts (role-3 attribution) → build Plan/PlanMeasure →
  uow.plan.save → commit.

- AraFirstRunPipeline / ModellingStage thread portfolio_id from the trigger
  body (one source of truth); handler builds the real orchestrator
  (unit_of_work + Sap10Calculator), dropping the Scenario/Materials stubs.
- ScenarioRepository.get_many promoted to @abstractmethod now the bare-stub
  instantiations are gone.
- New ara_first_run-style integration test: a property with an uninsulated
  cavity wall yields a persisted Plan + one cavity_wall_insulation Plan
  Measure (priced from the Product, figures present, linked by plan_id).
  Numeric SAP correctness is pinned separately in test_elmhurst_cascade_pins.
- Existing pipeline integration test updated: seeds scenario 7 and runs the
  real Modelling stage (its already-insulated sample wall yields an empty
  package — no crash).

121 pass across repositories/modelling/orchestration/app; pyright strict clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 12:08:32 +00:00

105 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from collections.abc import Callable
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.modelling.package_scorer import PackageScorer, Score
from domain.modelling.plan import Plan, PlanMeasure
from domain.modelling.recommendation import MeasureOption, Recommendation
from domain.modelling.scenario import Scenario
from domain.modelling.scoring import MeasureImpact, marginal_impacts
from domain.modelling.simulation import EpcSimulation
from domain.modelling.wall_recommendation import recommend_cavity_wall
from domain.sap10_calculator.calculator import SapCalculator
from repositories.product.product_repository import ProductRepository
from repositories.unit_of_work import UnitOfWork
class ModellingOrchestrator:
"""Stage 3 — scores each baselined Property against its Scenarios into Plans
and persists them (CONTEXT.md: Modelling; ADR-0011 / ADR-0012 / ADR-0016 /
ADR-0017).
Runs the whole batch in **one** Unit of Work and commits once: for each
(Property × Scenario) it reads the Property's Effective EPC and the Scenario
through repos, generates the candidate Recommendation, selects its Option
into a trivial Optimised Package, scores the package (role 2) and attributes
each measure (role-3 marginal cascade), and persists a **Plan** with its
**Plan Measures**. The optimiser, exclusions, and multi-measure generators
land in later slices; this is the single-measure tracer.
Reads only through repos and threads only IDs (`property_ids`,
`scenario_ids`, `portfolio_id`) — never an in-memory hand-off from Baseline
(ADR-0011). The injected `SapCalculator` is the scoring engine seam.
"""
def __init__(
self,
*,
unit_of_work: Callable[[], UnitOfWork],
calculator: SapCalculator,
) -> None:
self._unit_of_work = unit_of_work
self._calculator = calculator
def run(
self, property_ids: list[int], scenario_ids: list[int], portfolio_id: int
) -> None:
scorer = PackageScorer(self._calculator)
with self._unit_of_work() as uow:
properties = uow.property.get_many(property_ids)
scenarios: list[Scenario] = uow.scenario.get_many(scenario_ids)
for property_id, prop in zip(property_ids, properties, strict=True):
effective_epc: EpcPropertyData = prop.effective_epc
for scenario in scenarios:
plan = self._plan_for(scorer, effective_epc, uow.product)
uow.plan.save(
plan,
property_id=property_id,
scenario_id=scenario.id,
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
uow.commit()
def _plan_for(
self,
scorer: PackageScorer,
effective_epc: EpcPropertyData,
products: ProductRepository,
) -> Plan:
"""Generate → select → score → attribute the single-measure package for
one Property + Scenario, and assemble its Plan."""
recommendation: Recommendation | None = recommend_cavity_wall(
effective_epc, products
)
selected: list[MeasureOption] = (
[recommendation.options[0]] if recommendation is not None else []
)
overlays: list[EpcSimulation] = [option.overlay for option in selected]
baseline: Score = scorer.score(effective_epc, [])
post_retrofit: Score = scorer.score(effective_epc, overlays)
impacts: list[MeasureImpact] = marginal_impacts(
scorer, effective_epc, overlays
)
measures: tuple[PlanMeasure, ...] = tuple(
_plan_measure(option, impact)
for option, impact in zip(selected, impacts, strict=True)
)
return Plan(
measures=measures, baseline=baseline, post_retrofit=post_retrofit
)
def _plan_measure(option: MeasureOption, impact: MeasureImpact) -> PlanMeasure:
if option.cost is None:
raise ValueError(
f"measure option {option.measure_type!r} has no cost; cannot persist"
)
return PlanMeasure(
measure_type=option.measure_type,
description=option.description,
cost=option.cost,
impact=impact,
)