got backend working with eco plan data for one property

This commit is contained in:
Khalim Conn-Kowlessar 2025-10-30 18:41:55 +00:00
parent 9c5d68f55f
commit 23eb26527c
15 changed files with 638 additions and 159 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="AssetList" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="AssetList" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Fastapi-backend" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

View file

@ -65,6 +65,7 @@ class Property:
# Surplus information, that can be provided as optional inputs, by a customer
n_bathrooms = None
n_bedrooms = None
landlord_property_id = None # unique reference for the property as recognised by the landlord
building_id = None # Used to group properties together into a single building
# Contains the solar panel optimisation results from the Google Solar API
@ -265,8 +266,9 @@ class Property:
"number_of_floors": number_of_floors,
"insulation_floor_area": insulation_floor_area,
"insulation_wall_area": insulation_wall_area,
"building_id": kwargs.get("building_id", None),
"floor_area": floor_area
"building_id": kwargs.get("building_id", kwargs.get("landlord_block_reference", None)),
"floor_area": floor_area,
"landlord_property_id": kwargs.get("landlord_property_id"),
}
def parse_kwargs(self, kwargs):

View file

@ -0,0 +1,214 @@
import re
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, Type, TypeVar
from sqlalchemy.orm import Session
from datetime import timezone
from enum import Enum
from datetime import datetime, timedelta
import math
import pytz
import enum
from backend.app.db.models.inspections import (
InspectionModel,
InspectionArchetype,
InspectionArchetype2,
InspectionsWallConstruction,
InspectionsWallInsulation,
InspectionsInsulationMaterial,
InspectionBorescoped,
InspectionsRoofOrientation,
InspectionsTileHung,
InspectionsRendered,
InspectionsCladding,
InspectionsAccessIssues,
)
from sqlalchemy.dialects.postgresql import insert
NON_INTRUSIVE_PREFIX = "non-intrusives:"
@dataclass
class InspectionData:
archetype: Optional[InspectionArchetype] = None
archetype_2: Optional[InspectionArchetype2] = None
wall_construction: Optional[InspectionsWallConstruction] = None
insulation: Optional[InspectionsWallInsulation] = None
insulation_material: Optional[InspectionsInsulationMaterial] = None
borescoped: Optional[InspectionBorescoped] = None
roof_orientation: Optional[InspectionsRoofOrientation] = None
tile_hung: Optional[InspectionsTileHung] = None
rendered: Optional[InspectionsRendered] = None
cladding: Optional[InspectionsCladding] = None
access_issues: Optional[InspectionsAccessIssues] = None
date: Optional[datetime] = None # Reflects the date when the survey was actually conducted
notes: Optional[str] = None
surveyor_name: Optional[str] = None
def _clean_string(value: Any) -> Optional[str]:
"""Normalize strings for enum matching, tolerant of NaN/None."""
if value is None:
return None
if isinstance(value, float) and math.isnan(value):
return None
if not isinstance(value, str):
return None
v = (
value.strip()
.lower()
.replace("", '"')
.replace("", '"')
.replace("", "'")
)
return re.sub(r"\s+", " ", v)
E = TypeVar("E", bound=Enum)
def _match_enum(value: Any, enum_cls: Type[E]) -> Optional[E]:
"""Case-insensitive fuzzy matching for enums, tolerant of NaN/None."""
v = _clean_string(value)
if not v:
return None
for e in enum_cls:
if v == e.value.lower():
return e
for e in enum_cls:
if v in e.value.lower() or e.value.lower() in v:
return e
return None
def _lower_key_dict(d: dict) -> dict:
"""Convert all keys to lowercase for case-insensitive lookup."""
return {str(k).lower(): v for k, v in d.items() if isinstance(k, str)}
def extract_inspection_data(config: Dict[str, Any]) -> Optional[InspectionData]:
"""Extract and map inspection data from a config row."""
config_lower = _lower_key_dict(config)
non_intrusive_fields = {
k: v for k, v in config_lower.items()
if k.startswith(NON_INTRUSIVE_PREFIX)
}
if not non_intrusive_fields:
return None
data = InspectionData()
data.archetype = _match_enum(
config_lower.get("non-intrusives: archetype"), InspectionArchetype
)
data.archetype_2 = _match_enum(
config_lower.get("non-intrusives: archetype 2"), InspectionArchetype2
)
data.wall_construction = _match_enum(
config_lower.get("non-intrusives: construction"), InspectionsWallConstruction
)
data.insulation = _match_enum(
config_lower.get("non-intrusives: insulated"), InspectionsWallInsulation
)
data.insulation_material = _match_enum(
config_lower.get("non-intrusives: material"), InspectionsInsulationMaterial
)
data.borescoped = _match_enum(
config_lower.get("non-intrusives: boroscoped?"), InspectionBorescoped
)
data.roof_orientation = _match_enum(
config_lower.get("non-intrusives: roof orientation"), InspectionsRoofOrientation
)
data.tile_hung = _match_enum(
config_lower.get("non-intrusives: tile hung"), InspectionsTileHung
)
data.rendered = _match_enum(
config_lower.get("non-intrusives: rendered"), InspectionsRendered
)
data.cladding = _match_enum(
config_lower.get("non-intrusives: cladding"), InspectionsCladding
)
data.access_issues = _match_enum(
config_lower.get("non-intrusives: access issues"), InspectionsAccessIssues
)
data.date = config_lower.get("non-intrusives: date")
data.notes = config_lower.get("non-intrusives: further surveyor notes")
# convert surveyor name to title case if present
data.surveyor_name = config_lower.get("non-intrusives: name of surveyor").title() if config_lower.get(
"non-intrusives: name of surveyor") else None
return data
def bulk_upsert_inspections_pg(session: Session, inspections_map):
"""
Bulk insert/update inspection records:
- 'created_at' = actual survey date
- 'uploaded_at' = time of upload or update
- If an inspection exists for the same property on the same date overwrite
- Otherwise insert a new record
"""
if not inspections_map:
return
now = datetime.now(pytz.utc)
for property_id, data in inspections_map.items():
# Extract survey date from the data
record = asdict(data)
survey_date = getattr(data, "survey_date", None) or record.get("survey_date")
if not survey_date:
continue # skip if no survey date available
# Convert to UTC datetime if needed
if hasattr(survey_date, "to_pydatetime"):
survey_date = survey_date.to_pydatetime()
if survey_date.tzinfo is None:
survey_date = survey_date.replace(tzinfo=pytz.utc)
record["property_id"] = property_id
record["created_at"] = survey_date
record["uploaded_at"] = now
# Normalize enums and NaNs
for key, value in record.items():
if isinstance(value, enum.Enum):
record[key] = value.value
elif isinstance(value, float) and math.isnan(value):
record[key] = None
# Find existing inspection *for same property on same day*
start_of_day = survey_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = start_of_day + timedelta(days=1)
existing_inspection = (
session.query(InspectionModel)
.filter(
InspectionModel.property_id == property_id,
InspectionModel.created_at >= start_of_day,
InspectionModel.created_at < end_of_day,
)
.first()
)
if existing_inspection:
# Overwrite existing record (same survey day)
for field, value in record.items():
setattr(existing_inspection, field, value)
existing_inspection.uploaded_at = now
else:
# Create new inspection for new day
new_inspection = InspectionModel(**record)
session.add(new_inspection)
session.flush()

View file

@ -12,7 +12,7 @@ from sqlalchemy.orm.exc import NoResultFound
def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str,
energy_assessment: dict) -> (int, bool):
energy_assessment: dict, landlord_property_id: str | None = None) -> (int, bool):
"""
This function will create a record for the property in the database if it does not exist.
If it does exist, it will just update the updated_at field.
@ -20,6 +20,9 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
:param portfolio_id: The ID of the portfolio the property belongs to
:param address: The address of the property
:param postcode: The postcode of the property
:param uprn: The UPRN of the property
:param energy_assessment: The energy assessment data for the property
:param landlord_property_id: The landlord property ID if available
:return: The ID of the property and a boolean indicating whether it was created or not
"""
@ -49,6 +52,7 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
postcode=postcode,
portfolio_id=portfolio_id,
uprn=uprn,
landlord_property_id=landlord_property_id,
creation_status=PropertyCreationStatus.LOADING,
status=status,
has_pre_condition_report=False,
@ -63,6 +67,30 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
return new_property.id, True
def ensure_property_exists(session, body, epc_searcher, energy_assessment, landlord_property_id=None):
"""
Wrapper funtion which checks if a property is new and will return the roperty type if not
:param session:
:param body:
:param epc_searcher:
:param energy_assessment:
:param landlord_property_id:
:return:
"""
property_id, is_new = create_property(
session=session,
portfolio_id=body.portfolio_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
uprn=epc_searcher.uprn,
energy_assessment=energy_assessment,
landlord_property_id=str(landlord_property_id) if landlord_property_id is not None else None
)
if not is_new and not body.multi_plan:
return None, False
return property_id, is_new
def create_property_targets(
session: Session, property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None
):

View file

@ -10,6 +10,7 @@ from sqlalchemy import (
ForeignKey,
)
from sqlalchemy.ext.declarative import declarative_base
from backend.app.db.models.portfolio import PropertyModel
Base = declarative_base()
@ -138,19 +139,117 @@ class InspectionModel(Base):
__tablename__ = "inspections"
id = Column(BigInteger, primary_key=True, autoincrement=True)
property_id = Column(BigInteger, ForeignKey("property.id"), nullable=False)
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
archetype = Column(Enum(InspectionArchetype), nullable=True)
archetype_2 = Column(Enum(InspectionArchetype2), nullable=True)
wall_construction = Column(Enum(InspectionsWallConstruction), nullable=True)
insulation = Column(Enum(InspectionsWallInsulation), nullable=True)
insulation_material = Column(Enum(InspectionsInsulationMaterial), nullable=True)
borescoped = Column(Enum(InspectionBorescoped), nullable=True)
roof_orientation = Column(Enum(InspectionsRoofOrientation), nullable=True)
tile_hung = Column(Enum(InspectionsTileHung), nullable=True)
rendered = Column(Enum(InspectionsRendered), nullable=True)
cladding = Column(Enum(InspectionsCladding), nullable=True)
access_issues = Column(Enum(InspectionsAccessIssues), nullable=True)
archetype = Column(
Enum(
InspectionArchetype,
name="inspection_archetype",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
archetype_2 = Column(
Enum(
InspectionArchetype2,
name="inspection_archetype_2",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
wall_construction = Column(
Enum(
InspectionsWallConstruction,
name="inspections_wall_construction",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
insulation = Column(
Enum(
InspectionsWallInsulation,
name="inspections_wall_insulation",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
insulation_material = Column(
Enum(
InspectionsInsulationMaterial,
name="inspections_insulation_material",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
borescoped = Column(
Enum(
InspectionBorescoped,
name="inspection_borescoped",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
roof_orientation = Column(
Enum(
InspectionsRoofOrientation,
name="inspections_roof_orientation",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
tile_hung = Column(
Enum(
InspectionsTileHung,
name="inspections_tile_hung",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
rendered = Column(
Enum(
InspectionsRendered,
name="inspections_rendered",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
cladding = Column(
Enum(
InspectionsCladding,
name="inspections_cladding",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
access_issues = Column(
Enum(
InspectionsAccessIssues,
name="inspections_access_issues",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
notes = Column(Text)
surveyor_name = Column(Text)

View file

@ -86,6 +86,7 @@ class PropertyModel(Base):
portfolio_id = Column(Integer, ForeignKey('portfolio.id'), nullable=False)
creation_status = Column(Enum(PropertyCreationStatus), nullable=False)
uprn = Column(Integer)
landlord_property_id = Column(Text)
building_reference_number = Column(Integer)
status = Column(Enum(PortfolioStatus, values_callable=lambda x: [e.value for e in x]), nullable=False)
address = Column(Text)

View file

@ -4,6 +4,7 @@ from sqlalchemy.sql import func
from backend.app.db.models.portfolio import Portfolio, PropertyModel
from backend.app.db.models.materials import Material
from datatypes.enums import QuantityUnits
import enum
Base = declarative_base()
@ -47,6 +48,14 @@ class RecommendationMaterials(Base):
estimated_cost = Column(Float, nullable=False)
class PlanTypeEnum(enum.Enum):
SOLAR_ECO4 = "solar_eco4"
SOLAR_HHRSH_ECO4 = "solar_hhrsh_eco4"
EMPTY_CAVITY_ECO = "empty_cavity_eco"
PARTIAL_CAVITY_ECO = "partial_cavity_eco"
EXTRACTION_ECO = "extraction_eco"
class Plan(Base):
__tablename__ = 'plan'
@ -60,6 +69,15 @@ class Plan(Base):
valuation_increase_lower_bound = Column(Float)
valuation_increase_upper_bound = Column(Float)
valuation_increase_average = Column(Float)
plan_type = Column(
Enum(
PlanTypeEnum,
name="plan_type",
values_callable=lambda e: [m.value for m in e],
create_type=False,
),
nullable=True,
)
class PlanRecommendations(Base):

View file

@ -0,0 +1,10 @@
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class PropertyRequestData:
patch: dict
already_installed: dict
non_invasive_recommendations: dict
valuation: Optional[float]

View file

@ -55,7 +55,7 @@ MEASURE_MAP = {
VALID_GOALS = ["Increasing EPC", "Energy Savings", "Reducing CO2 emissions"]
VALID_HOUSING_TYPES = ["Social", "Private"]
VALID_EVENT_TYPES = ["remote_assessment"]
VALID_EVENT_TYPES = ["remote_assessment", "eco_project"]
# Define the validation function for inclusions/exclusions
@ -113,7 +113,7 @@ class PlanTriggerRequest(BaseModel):
# When performing a remote assessment, if this has been set, it will allow the engine to
# pull data from the find my epc website, to utilise as part of a remote assessment
event_type: Optional[Literal["remote_assessment"]] = None
event_type: Optional[Literal["remote_assessment", "eco_project"]] = None
# If true, before optimising the engine will select a slightly larger package, to account for the SAP 10 causing
# scores to drop by a few points

View file

@ -1,7 +1,8 @@
from utils.s3 import read_from_s3
from backend.app.config import get_settings
import msgpack
from utils.s3 import read_from_s3
from backend.app.config import get_settings
from backend.app.plan.data_classes import PropertyRequestData
from typing import Any
def get_cleaned():
@ -21,3 +22,169 @@ def get_cleaned():
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def patch_epc(patch, epc_records):
"""
This utility function is useful to patch the epc data if we have data from the customer
:return:
"""
for patch_variable, patch_value in patch.items():
if patch_variable in ["address", "postcode"]:
continue
if patch_value == "":
continue
if patch_variable in epc_records["original_epc"]:
epc_records["original_epc"][patch_variable] = patch_value
return epc_records
def extract_property_request_data(
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
patch = next((
x for x in patches if str(x["uprn"]) == str(config["uprn"])
), {})
else:
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
property_already_installed = next((
x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
if has_uprn:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(str(x["uprn"]) == str(uprn))
), {})
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
else:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
property_non_invasive_recommendations["recommendations"]
)
transformed = []
for rec in property_non_invasive_recommendations["recommendations"]:
if isinstance(rec, str):
transformed.append({"type": rec, })
else:
transformed.append(rec)
property_non_invasive_recommendations["recommendations"] = transformed
# Check if the valuation data has uprn
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
if valuation_has_uprn:
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
if valuation_has_uprn:
property_valuation = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
else:
property_valuation = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), None)
# Return data class to give a structured format
return PropertyRequestData(
patch=patch,
already_installed=property_already_installed,
non_invasive_recommendations=property_non_invasive_recommendations,
valuation=property_valuation
)
def parse_eco_packages(config: dict[str, Any]) -> tuple[list[str], int, str] | tuple[None, None, None]:
solar_identification = config.get("solar_reason", None)
cavity_identification = config.get("cavity_reason", None)
if not solar_identification and not cavity_identification:
return None, None, None
# We map the categories to the desired measures and upgrade targets
# We note that the categories are placeholder until we move the standardised asset list
identification_map = {
"Solar Eligible": {
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
"target_sap": 86, # High B
"plan_type": "solar_eco4"
},
"Solar Eligible, Solid Wall Uninsulated, EPC E or Below": {
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
"target_sap": 86, # High B
"plan_type": "solar_eco4"
},
"Solar Eligible, Needs Heating Upgrade": {
"measures": ["solar_pv", "loft_insulation", "high_heat_retention_storage_heater"],
"target_sap": 86, # High B
"plan_type": "solar_hhrsh_eco4"
},
"Non-Intrusive Data Shows Empty Cavity": {
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "empty_cavity_eco"
},
'Non-Intrusive Data Shows Empty Cavity, built after 2002': {
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "empty_cavity_eco"
},
"EPC Shows Empty Cavity, inspections show retro drilled": {
# EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "extraction_eco"
},
"EPC Shows Empty Cavity, inspections show filled at build": {
# EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "extraction_eco"
},
"EPC Shows Empty Cavity": {
# EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "empty_cavity_eco"
}
}
# Always prioritise solar
if solar_identification:
_key = solar_identification.split(":")[0]
else:
_key = cavity_identification.split(":")[0]
mapped = identification_map[_key]
return mapped["measures"], mapped["target_sap"], mapped["plan_type"]
def handle_error(session, msg, status=500):
# When the pipeline fails, handles error process
logger.error(msg, exc_info=True)
session.rollback()
return Response(status_code=status, content=msg)

View file

@ -17,8 +17,8 @@ from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details
create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details, ensure_property_exists
)
from backend.app.db.functions.recommendations_functions import (
create_plan, upload_recommendations, create_scenario
@ -27,9 +27,14 @@ from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import get_cleaned
from backend.app.plan.utils import (
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error
)
from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.app.db.functions.inspections_functions import (
extract_inspection_data, bulk_upsert_inspections_pg
)
from backend.ml_models.api import ModelApi
from backend.Property import Property
@ -57,25 +62,6 @@ BATCH_SIZE = 5
SCORING_BATCH_SIZE = 100
def patch_epc(patch, epc_records):
"""
This utility function is useful to patch the epc data if we have data from the customer
:return:
"""
for patch_variable, patch_value in patch.items():
if patch_variable in ["address", "postcode"]:
continue
if patch_value == "":
continue
if patch_variable in epc_records["original_epc"]:
epc_records["original_epc"][patch_variable] = patch_value
return epc_records
def extract_portfolio_aggregation_data(
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
):
@ -349,75 +335,6 @@ def get_request_property_data(body: PlanTriggerRequest):
return patches, already_installed, non_invasive_recommendations, valuation_data
def extract_property_request_data(
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
patch = next((
x for x in patches if str(x["uprn"]) == str(config["uprn"])
), {})
else:
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
property_already_installed = next((
x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
if has_uprn:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(str(x["uprn"]) == str(uprn))
), {})
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
else:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
property_non_invasive_recommendations["recommendations"]
)
transformed = []
for rec in property_non_invasive_recommendations["recommendations"]:
if isinstance(rec, str):
transformed.append({"type": rec, })
else:
transformed.append(rec)
property_non_invasive_recommendations["recommendations"] = transformed
# Check if the valuation data has uprn
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
if valuation_has_uprn:
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
if valuation_has_uprn:
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
else:
property_valution = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), None)
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
def get_funding_data():
"""
This function retrieves the eco project scores matrix and the warm homes local grant funding data
@ -564,7 +481,7 @@ async def model_engine(body: PlanTriggerRequest):
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
input_properties, inspections_map, eco_packages = [], {}, {}
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
@ -601,15 +518,12 @@ async def model_engine(body: PlanTriggerRequest):
# We check for an energy assessment we have performed on this property:
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
# Create a record in db
property_id, is_new = create_property(
session=session,
portfolio_id=body.portfolio_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
uprn=epc_searcher.uprn,
energy_assessment=energy_assessment
property_id, is_new = ensure_property_exists(
session, body, epc_searcher, energy_assessment, landlord_property_id=config.get("landlord_property_id")
)
if not property_id:
continue
if not is_new and not body.multi_plan:
continue
@ -636,16 +550,17 @@ async def model_engine(body: PlanTriggerRequest):
epc_searcher, energy_assessment
)
patch, property_already_installed, property_non_invasive_recommendations, property_valuation = (
extract_property_request_data(
config=config,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=epc_searcher.uprn,
)
req_data = extract_property_request_data(
config=config,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=epc_searcher.uprn,
)
# Pull this out as it may get overwritten
property_non_invasive_recommendations = req_data.non_invasive_recommendations
patch = req_data.patch
# if we have a remote assment data type, we pull the additional data and include it
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
@ -679,17 +594,31 @@ async def model_engine(body: PlanTriggerRequest):
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed,
property_valuation=property_valuation,
already_installed=req_data.already_installed,
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
**Property.extract_kwargs(config), # TODO: Depraecate this
)
)
# If we have an ECO project, we parse the cavity/solar reasons
eco_packages[property_id] = parse_eco_packages(config)
# Final step - extract inspections data, if we have it
property_inspections = extract_inspection_data(config)
if property_inspections:
inspections_map[property_id] = property_inspections
if not input_properties:
return Response(status_code=204)
# We check if we have inspections data and store it in the database if so. We'll update or create
# aginst each property if
if inspections_map:
logger.info("Inserting inspections data")
bulk_upsert_inspections_pg(session, inspections_map)
# Set up model api and warm up the lambdas
model_api = ModelApi(
portfolio_id=body.portfolio_id,
@ -766,11 +695,20 @@ async def model_engine(body: PlanTriggerRequest):
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
# We set the ECO package data, if we have it
property_eco_package = eco_packages.get(p.id, (None, None, None))
if property_eco_package[0] is not None:
inclusions = property_eco_package[0]
exclusions = []
else:
inclusions = body.inclusions
exclusions = body.exclusions
recommender = Recommendations(
property_instance=p,
materials=materials,
exclusions=body.exclusions,
inclusions=body.inclusions,
exclusions=exclusions,
inclusions=inclusions,
default_u_values=body.default_u_values
)
property_recommendations, property_representative_recommendations = recommender.recommend()
@ -788,7 +726,6 @@ async def model_engine(body: PlanTriggerRequest):
recommendations_scoring_data.extend(p.recommendations_scoring_data)
# TODO: Make sure that number_habitable_rooms has been dropped
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
@ -878,16 +815,16 @@ async def model_engine(body: PlanTriggerRequest):
fixed_gain = optimiser_functions.calculate_fixed_gain(
property_required_measures, recommendations, p, needs_ventilation
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
funding = Funding(
tenure=body.housing_type,
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=12.5,
eco4_social_cavity_abs_rate=13,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=12.5,
eco4_private_cavity_abs_rate=13,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
@ -1025,8 +962,8 @@ async def model_engine(body: PlanTriggerRequest):
funding.check_funding(
measures=solution,
starting_sap=p.data["current-energy-efficiency"],
ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]),
starting_sap=int(p.data["current-energy-efficiency"]),
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
@ -1193,6 +1130,7 @@ async def model_engine(body: PlanTriggerRequest):
"valuation_increase_average": (
valuations["average_increased_value"] - valuations["current_value"]
),
"plan_type": eco_packages.get(p.id, (None, None, None))[2]
})
upload_recommendations(
@ -1212,7 +1150,7 @@ async def model_engine(body: PlanTriggerRequest):
except Exception as e:
# Rollback the session if an error occurs
session.rollback()
print("Failed i = %s" % str(i))
logger.warning("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
@ -1251,21 +1189,13 @@ async def model_engine(body: PlanTriggerRequest):
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
return handle_error(session, "Database integrity error.", 500)
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
return handle_error(session, "Database operational error.", 500)
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
return handle_error(session, "Bad request: malformed data.", 400)
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
return handle_error(session, "An unexpected error occurred.", 500)
finally:
session.close()

View file

@ -380,7 +380,7 @@ class EPCRecord:
df.columns = [x.upper().replace("-", "_") for x in df.columns]
if replace_empty_string:
df = df.replace("", np.nan)
df = df.replace("", np.nan).infer_objects(copy=False)
return df

View file

@ -416,7 +416,9 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
"total_gain": total_gain,
"path": path_spec,
"scheme": scheme,
"is_eligible": _is_eligible_funding_package(scheme, p.data["current-energy-efficiency"], total_gain),
"is_eligible": _is_eligible_funding_package(
scheme, int(p.data["current-energy-efficiency"]), total_gain
),
"unfunded_items": unfunded_picked,
})
@ -432,7 +434,7 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
# logger.info("We have some packages that are fundable but do not meet the target gain")
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
solutions["starting_sap"] = p.data["current-energy-efficiency"]
solutions["starting_sap"] = int(p.data["current-energy-efficiency"])
solutions["floor_area"] = p.floor_area
solutions["ending_sap"] = solutions["starting_sap"] + solutions["total_gain"]
solutions["starting_band"] = solutions["starting_sap"].apply(funding.get_sap_band)

View file

@ -176,7 +176,8 @@ def calculate_fixed_gain(property_required_measures, recommendations, p, needs_v
return fixed_gain
def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) -> float | None:
def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float,
eco_packages: None | dict = None) -> float | None:
"""
Calculates the target gain value for optimisation based on the goal.
@ -193,6 +194,7 @@ def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) ->
Property object with EPC data (must have p.data["current-energy-efficiency"]).
fixed_gain : float
Total fixed gain from required measures (returned by calculate_fixed_gain).
eco_packages : dict, optional
Returns
-------
@ -201,8 +203,14 @@ def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) ->
"""
if body.goal == "Increasing EPC":
current_sap = int(p.data["current-energy-efficiency"])
target_sap = (
eco_packages.get(p.id)[1] if eco_packages.get(p.id)[1] is not None
else epc_to_sap_lower_bound(body.goal_value)
)
gain = CostOptimiser.calculate_sap_gain_with_slack(
epc_to_sap_lower_bound(body.goal_value) - current_sap
target_sap - current_sap
) - fixed_gain
if body.simulate_sap_10:
gain += 3