diff --git a/datatypes/epc/domain/mapper.py b/datatypes/epc/domain/mapper.py index b3fc944f..159c0f92 100644 --- a/datatypes/epc/domain/mapper.py +++ b/datatypes/epc/domain/mapper.py @@ -5,6 +5,7 @@ from decimal import ROUND_HALF_UP, Decimal from typing import Any, Dict, Final, List, Optional, Sequence, TypeVar, Union, cast from datatypes.epc.schema.helpers import from_dict +from datatypes.epc.domain.epc import Epc from datatypes.epc.domain.epc_property_data import ( BASEMENT_WALL_CONSTRUCTION_CODE, Addendum, @@ -2268,61 +2269,53 @@ class EpcPropertyDataMapper: if schema == "RdSAP-Schema-21.0.1": from datatypes.epc.schema.rdsap_schema_21_0_1 import RdSapSchema21_0_1 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_21_0_1( - from_dict(RdSapSchema21_0_1, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_21_0_1( + from_dict(RdSapSchema21_0_1, data) ) - if schema == "RdSAP-Schema-21.0.0": + elif schema == "RdSAP-Schema-21.0.0": from datatypes.epc.schema.rdsap_schema_21_0_0 import RdSapSchema21_0_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_21_0_0( - from_dict(RdSapSchema21_0_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_21_0_0( + from_dict(RdSapSchema21_0_0, data) ) - if schema == "RdSAP-Schema-20.0.0": + elif schema == "RdSAP-Schema-20.0.0": from datatypes.epc.schema.rdsap_schema_20_0_0 import RdSapSchema20_0_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_20_0_0( - from_dict(RdSapSchema20_0_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_20_0_0( + from_dict(RdSapSchema20_0_0, data) ) - if schema == "RdSAP-Schema-19.0": + elif schema == "RdSAP-Schema-19.0": from datatypes.epc.schema.rdsap_schema_19_0 import RdSapSchema19_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_19_0( - from_dict(RdSapSchema19_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_19_0( + from_dict(RdSapSchema19_0, data) ) - if schema == "RdSAP-Schema-18.0": + elif schema == "RdSAP-Schema-18.0": from datatypes.epc.schema.rdsap_schema_18_0 import RdSapSchema18_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_18_0( - from_dict(RdSapSchema18_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_18_0( + from_dict(RdSapSchema18_0, data) ) - if schema == "RdSAP-Schema-17.1": + elif schema == "RdSAP-Schema-17.1": from datatypes.epc.schema.rdsap_schema_17_1 import RdSapSchema17_1 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_17_1( - from_dict(RdSapSchema17_1, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_17_1( + from_dict(RdSapSchema17_1, data) ) - if schema == "RdSAP-Schema-17.0": + elif schema == "RdSAP-Schema-17.0": from datatypes.epc.schema.rdsap_schema_17_0 import RdSapSchema17_0 - return _clear_basement_flag_when_system_built( - EpcPropertyDataMapper.from_rdsap_schema_17_0( - from_dict(RdSapSchema17_0, data) - ) + mapped = EpcPropertyDataMapper.from_rdsap_schema_17_0( + from_dict(RdSapSchema17_0, data) ) + else: + raise ValueError(f"Unsupported EPC schema: {schema!r}") - raise ValueError(f"Unsupported EPC schema: {schema!r}") + return _clear_basement_flag_when_system_built( + _with_renewable_heat_incentive( + _with_recorded_performance(mapped, data), data + ) + ) # --------------------------------------------------------------------------- @@ -2330,6 +2323,85 @@ class EpcPropertyDataMapper: # --------------------------------------------------------------------------- +def _with_recorded_performance( + epc: EpcPropertyData, data: Dict[str, Any] +) -> EpcPropertyData: + """Overlay the recorded current-performance scalars from the raw API payload. + + The current SAP rating, EPC band, Primary Energy Intensity + (``energy_consumption_current``) and CO2 are top-level fields on every RdSAP + schema response, but only a couple of the per-schema mappers copy them + through (and none map the band). Baseline's Lodged Performance reads all four + off the EPC, so map them here, once, for every schema version. An absent key + leaves the mapped value untouched. + """ + band = data.get("current_energy_efficiency_band") + co2 = data.get("co2_emissions_current") + consumption = data.get("energy_consumption_current") + rating = data.get("energy_rating_current") + return replace( + epc, + current_energy_efficiency_band=( + Epc(band) if band is not None else epc.current_energy_efficiency_band + ), + co2_emissions_current=( + float(co2) if co2 is not None else epc.co2_emissions_current + ), + energy_consumption_current=( + int(consumption) + if consumption is not None + else epc.energy_consumption_current + ), + energy_rating_current=( + int(rating) if rating is not None else epc.energy_rating_current + ), + ) + + +def _with_renewable_heat_incentive( + epc: EpcPropertyData, data: Dict[str, Any] +) -> EpcPropertyData: + """Gap-fill the RHI block (baseline space/water-heating kWh) from the raw + payload. + + The ``renewable_heat_incentive`` object is present on every schema response, + but only the 21.x mappers copy it through; Baseline reads + ``space_heating_kwh`` / ``water_heating_kwh`` off it. Only fills when a mapper + left it unset, and only when the block carries both required kWh figures — + otherwise the EPC is returned untouched. + """ + if epc.renewable_heat_incentive is not None: + return epc + rhi = data.get("renewable_heat_incentive") + if not isinstance(rhi, dict): + return epc + rhi_obj = cast(Dict[str, Any], rhi) + space = rhi_obj.get("space_heating_existing_dwelling") + water = rhi_obj.get("water_heating") + if space is None or water is None: + return epc + return replace( + epc, + renewable_heat_incentive=RenewableHeatIncentive( + space_heating_kwh=float(space), + water_heating_kwh=float(water), + impact_of_loft_insulation_kwh=_optional_float( + rhi_obj.get("impact_of_loft_insulation") + ), + impact_of_cavity_insulation_kwh=_optional_float( + rhi_obj.get("impact_of_cavity_insulation") + ), + impact_of_solid_wall_insulation_kwh=_optional_float( + rhi_obj.get("impact_of_solid_wall_insulation") + ), + ), + ) + + +def _optional_float(value: Any) -> Optional[float]: + return float(value) if value is not None else None + + def _clear_basement_flag_when_system_built( epc: EpcPropertyData, ) -> EpcPropertyData: diff --git a/repositories/epc/epc_postgres_repository.py b/repositories/epc/epc_postgres_repository.py index 8e38c32b..faa86323 100644 --- a/repositories/epc/epc_postgres_repository.py +++ b/repositories/epc/epc_postgres_repository.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Sequence -from datetime import date +from datetime import date, datetime from typing import Optional, Protocol, TypeVar from sqlmodel import Session, col, delete, select @@ -57,6 +57,24 @@ def _require(value: Optional[_T], field: str) -> _T: return value +def _as_date(value: object) -> date: + """Normalise an ``epc_property`` date column value to a ``date``. + + The FE-owned date columns (``inspection_date`` / ``completion_date`` / + ``registration_date``) are Postgres ``timestamp``s even though the SQLModel + mirror types them ``str`` (it stores the writer's ``isoformat()`` string). + So a read hands back a ``datetime``, while a value still in flight may be + the ISO string — accept both. + """ + if isinstance(value, datetime): + return value.date() + if isinstance(value, date): + return value + if isinstance(value, str): + return date.fromisoformat(value) + raise TypeError(f"unexpected inspection_date value: {value!r}") + + class _HasEpcPropertyId(Protocol): epc_property_id: int @@ -425,7 +443,7 @@ class EpcPostgresRepository(EpcRepository): return EpcPropertyData( dwelling_type=p.dwelling_type, - inspection_date=date.fromisoformat(p.inspection_date), + inspection_date=_as_date(p.inspection_date), tenure=p.tenure, transaction_type=p.transaction_type, address_line_1=_require(p.address_line_1, "address_line_1"), @@ -480,12 +498,10 @@ class EpcPostgresRepository(EpcRepository): pressure_test=p.pressure_test, language_code=p.language_code, completion_date=( - date.fromisoformat(p.completion_date) if p.completion_date else None + _as_date(p.completion_date) if p.completion_date else None ), registration_date=( - date.fromisoformat(p.registration_date) - if p.registration_date - else None + _as_date(p.registration_date) if p.registration_date else None ), measurement_type=p.measurement_type, conservatory_type=p.conservatory_type,