feat(modelling): mark a Property as run via has_recommendations + updated_at

The new pipeline left no per-Property record of a run (the old engine set
property.has_recommendations and populated property_details_epc). Restore the
marker: PropertyRepository.mark_modelled sets has_recommendations (true when the
Plan carries measures, mirroring the old engine) and bumps updated_at, so a
first-run under the new process is identifiable as updated_at >= 2026-06-01.

ModellingOrchestrator marks each Property after its Scenarios (true if any
Scenario yielded a measure); run_modelling_e2e's --persist path marks it too
(its compute runs on in-memory fakes, so the DB UoW sets it directly). Adds the
has_recommendations/updated_at columns to the PropertyRow mirror.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-06-16 23:34:33 +00:00
parent 4a13fc8b0f
commit 694cdd9c23
8 changed files with 83 additions and 1 deletions

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, Optional
from sqlalchemy import Column
@ -48,3 +49,10 @@ class PropertyRow(SQLModel, table=True):
user_inputted_address: Optional[str] = Field(default=None)
user_inputted_postcode: Optional[str] = Field(default=None)
lexiscore: Optional[float] = Field(default=None)
# FE-owned columns the modelling pipeline now WRITES to record a run: the old
# engine set `has_recommendations` (engine.py); we mirror that, and bump
# `updated_at` so a run is datable (a first-run under the new process is
# `updated_at >= 2026-06-01`, the cutoff the old pipeline predates).
has_recommendations: Optional[bool] = Field(default=None)
updated_at: Optional[datetime] = Field(default=None)

View file

@ -126,6 +126,7 @@ class ModellingOrchestrator:
solar_potential: Optional[SolarPotential] = _solar_potential_for(
uow.solar, prop.identity.uprn
)
has_recommendations = False
for scenario in scenarios:
plan = self._plan_for(
scorer,
@ -145,6 +146,13 @@ class ModellingOrchestrator:
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
has_recommendations = has_recommendations or bool(plan.measures)
# Record the run on the Property: the old engine's per-Property
# `has_recommendations` marker (true if any Scenario yielded a
# measure), with `updated_at` bumped so the run is datable.
uow.property.mark_modelled(
property_id, has_recommendations=has_recommendations
)
uow.commit()
def _plan_for(

View file

@ -2,8 +2,9 @@ from __future__ import annotations
from typing import Optional, cast
from sqlalchemy import Table
from sqlalchemy import Table, func
from sqlalchemy import select as sa_select
from sqlalchemy import update as sa_update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col, select
@ -116,6 +117,17 @@ class PropertyPostgresRepository(PropertyRepository):
return {}
return self._spatial_repo.get_for_uprns(uprns)
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
# The old engine set `has_recommendations` per Property; we mirror it and
# bump `updated_at` (DB clock) so a new-process run is datable against the
# 2026-06-01 cutoff. Does not commit — the Unit of Work owns the txn.
stmt = (
sa_update(self._table)
.where(self._table.c.id == property_id)
.values(has_recommendations=has_recommendations, updated_at=func.now())
)
self._session.execute(stmt) # pyright: ignore[reportDeprecated]
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
if not rows:
return 0

View file

@ -47,6 +47,15 @@ class PropertyRepository(ABC):
input ids."""
...
@abstractmethod
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
"""Record that a Property has been run through the modelling pipeline:
set ``has_recommendations`` (the old engine's per-Property marker — true
when the Plan carries measures) and bump ``updated_at`` so the run is
datable (a first-run under the new process is ``updated_at >=
2026-06-01``). Idempotent re-running overwrites the same row."""
...
@abstractmethod
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
"""Bulk-insert identity rows, skipping any whose ``(portfolio_id, uprn)``

View file

@ -334,6 +334,12 @@ def _persist(
portfolio_id=portfolio_id,
is_default=scenario.is_default,
)
# Mark the Property as run under the new process (old engine's
# `has_recommendations` marker + a bumped `updated_at`); the modelling
# compute above runs on in-memory fakes, so this DB UoW must set it.
uow.property.mark_modelled(
property_id, has_recommendations=bool(plan.measures)
)
uow.commit()

View file

@ -63,6 +63,11 @@ class FakePropertyRepo(PropertyRepository):
def get_many(self, property_ids: list[int]) -> Properties:
return Properties([self._hydrate(property_id) for property_id in property_ids])
def mark_modelled(self, property_id: int, *, has_recommendations: bool) -> None:
# Record the marker so tests can assert the pipeline set it.
self.modelled: dict[int, bool] = getattr(self, "modelled", {})
self.modelled[property_id] = has_recommendations
def insert_all(self, rows: list[PropertyIdentityInsert]) -> int:
self.inserted: list[PropertyIdentityInsert] = list(rows)
return len(rows)

View file

@ -50,6 +50,11 @@ class FakePropertyRepository(PropertyRepository):
def get_many(self, property_ids: list[int]) -> Properties: # pragma: no cover
raise NotImplementedError
def mark_modelled( # pragma: no cover
self, property_id: int, *, has_recommendations: bool
) -> None:
raise NotImplementedError
class FakeStatusWriter(BulkUploadStatusWriter):
def __init__(self) -> None:

View file

@ -111,3 +111,32 @@ def test_get_many_defaults_to_unrestricted_when_uprn_has_no_spatial_row(
# Assert — an uncovered UPRN means unrestricted, not blocked (per legacy
# `empty_spatial_df`; ADR-0020).
assert properties.items[0].planning_restrictions == PlanningRestrictions()
def test_mark_modelled_sets_has_recommendations_and_bumps_updated_at(
db_engine: Engine,
) -> None:
# Arrange — a freshly-inserted property with no run recorded yet.
with Session(db_engine) as session:
row = PropertyRow(portfolio_id=7, uprn=12345)
session.add(row)
session.commit()
property_id = row.id
assert property_id is not None
assert row.has_recommendations is None
assert row.updated_at is None
# Act — record a run that produced recommendations.
with Session(db_engine) as session:
PropertyPostgresRepository(session).mark_modelled(
property_id, has_recommendations=True
)
session.commit()
# Assert — the marker is set and updated_at is stamped, so the run is datable
# against the 2026-06-01 new-process cutoff.
with Session(db_engine) as session:
refreshed = session.get(PropertyRow, property_id)
assert refreshed is not None
assert refreshed.has_recommendations is True
assert refreshed.updated_at is not None