Merge pull request #624 from Hestia-Homes/eco-eligiblity-bug

Removing funding code and added requirements for insulation before ASHP
This commit is contained in:
KhalimCK 2026-01-01 13:20:21 +08:00 committed by GitHub
commit 8b88fad8d4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1657 additions and 618 deletions

View file

@ -997,7 +997,15 @@ class AssetList:
# Keep a record of duplicates
self.duplicated_addresses = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy()
][[self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname, self.postcode_colname]].copy()
df = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].isin(
self.duplicated_addresses[self.DOMNA_PROPERTY_ID])
][[self.landlord_property_id, self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname,
self.postcode_colname]].copy()
df = df.sort_values(by=[self.DOMNA_PROPERTY_ID])
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()

View file

@ -59,6 +59,39 @@ def app():
Property UPRN
"""
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Warmfront/SCIS")
data_filename = "SCIS_Historic_Deemed_Combined_Workings.xlsx"
sheet_name = "SCIS"
postcode_column = 'POSTCODE'
address1_column = "NO"
address1_method = None
fulladdress_column = None
address_cols_to_concat = ["NO", "Street / Block Name", "Town/Area"]
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = None
landlord_property_type = "PROPERTY TYPE As per table emailed"
landlord_built_form = "PROPERTY TYPE As per table emailed"
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "Row ID"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
outcomes_postcode = None
outcomes_houseno = None
outcomes_id = None
outcomes_address = None
master_filepaths = []
master_id_colnames = []
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = None
asset_list_header = 0
landlord_block_reference = None
# Peabody data for cleaning
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation")

View file

@ -464,6 +464,60 @@ BUILT_FORM_MAPPINGS = {
'EnclosedEndTerrace': 'enclosed end-terrace',
'EndTerrace': 'end-terrace',
'SemiDetached': 'semi-detached',
'MidTerrace': 'mid-terrace'
'MidTerrace': 'mid-terrace',
'1st FLOOR FLAT': 'mid-floor',
'END TERRACE HOUSE': 'end-terrace',
'BUNGALOW-END TERRACE': 'end-terrace',
'BUNGALOW END TERRACE': 'end-terrace',
'END-TERRACE': 'end-terrace',
'SEMI DETACHED': 'semi-detached',
'Mid flat Ground Floor': 'ground floor',
'MID TERRACED': 'mid-terrace',
'Mid Terrace bungalow': 'mid-terrace',
'BUNGLAOW SEMI DETACHED': 'detached',
'Bungalow ENd Terrace': 'end-terrace',
'Bungalow Semi detached': 'detached',
'BUNGALOW - SEMI DETACHED': 'detached',
'Bungalow mid terrace': 'mid-terrace',
'BUNGALOW - MID TERRACED': 'mid-terrace',
'BUNGALOW - MID TERRACE': 'mid-terrace',
'Bungalow end terrace': 'end-terrace',
'BUNGALOW SEMI-DETACHED': 'detached',
'MID TERR': 'mid-terrace',
'Bungalow - mid terrace': 'mid-terrace',
'MID-TERRACE': 'mid-terrace',
'Bunagalow Semi Detached': 'semi-detached',
'SEMI DETACHED BUNGALOW': 'semi-detached',
'MID TERRACE HOUSE': 'mid-terrace',
'END - TERRACE': 'end-terrace',
'BUNGALOW-SEMI DETACHED': 'semi-detached',
'Semi-Detached': 'semi-detached',
'End-Terrace house': 'end-terrace',
'BUNGALOW MID TERRACE': 'mid-terrace',
'SEMI DETACHED HOUSE': 'semi-detached',
'BUNGALOW SEMI DETACHED': 'detached',
'MID - TERRACE': 'mid-terrace',
'3 EXT WALL FLAT': 'end-terrace',
'3 Ext wall flat': 'end-terrace',
'3 EX WALL FLAT': 'end-terrace',
'2 ext wall flats': 'mid-terrace',
'2 EXT WALLS': 'mid-terrace',
'3.EXT.WALL FLAT': 'end-terrace',
'FLAT 3 WALLS': 'end-terrace',
'2 Ext Wall flat': 'mid-terrace',
'DETATCHED HOUSE': 'detached',
'3 EXT. WALL FLAT': 'end-terrace',
'3 ext wall flat': 'end-terrace',
'3 EXT WALLS': 'end-terrace',
'3 EXT WALL - NOW 2 EXT': 'unknown',
'3 EXT-WALL FLAT': 'end-terrace',
'FLAT 2 WALLS': 'mid-terrace',
'3 EX WALL MAISONETTE': 'end-terrace',
'3 Ext Wall Flat': 'end-terrace',
'Semi Bungalow': 'semi-detached',
'2 EXT WALL FLAT': 'mid-terrace',
'2.EXT.WALL FLAT': 'mid-terrace',
'2 EXT. WALL FLAT': 'mid-terrace',
}

View file

@ -362,6 +362,71 @@ PROPERTY_MAPPING = {
'Maisonette: Semi Detached: Mid Floor': 'maisonette',
'Maisonette: Detached: Mid Floor': 'maisonette',
'House: EnclosedMidTerrace': 'house'
'House: EnclosedMidTerrace': 'house',
'3 EXT WALL FLAT': 'flat',
'1st FLOOR FLAT': 'flat',
'3 Ext wall flat': 'flat',
'3 EX WALL FLAT': 'flat',
'END TERRACE HOUSE': 'house',
'BUNGALOW-END TERRACE': 'bungalow',
'BUNGALOW END TERRACE': 'bungalow',
'2 ext wall flats': 'flat',
'Mid flat Ground Floor': 'flat',
'3.EXT.WALL FLAT': 'flat',
'FLAT 3 WALLS': 'flat',
'Mid Terrace bungalow': 'bungalow',
'Bungalow ENd Terrace': 'bungalow',
'2 Ext Wall flat': 'flat',
'DETATCHED HOUSE': 'house',
'Bungalow Semi detached': 'bungalow',
'BUNGALOW - SEMI DETACHED': 'bungalow',
'Bungalow mid terrace': 'bungalow',
'BUNGALOW - MID TERRACED': 'bungalow',
'BUNGALOW - MID TERRACE': 'bungalow',
'Bungalow end terrace': 'bungalow',
'3 EXT. WALL FLAT': 'flat',
'3 ext wall flat': 'flat',
'BUNGALOW SEMI-DETACHED': 'bungalow',
'3 EXT-WALL FLAT': 'flat',
'Bungalow - mid terrace': 'bungalow',
'SEMI DETACHED BUNGALOW': 'bungalow',
'FLAT 2 WALLS': 'flat',
'MID TERRACE HOUSE': 'house',
'3 EX WALL MAISONETTE': 'maisonette',
'BUNGALOW-SEMI DETACHED': 'bungalow',
'3 Ext Wall Flat': 'flat',
'Semi Bungalow': 'bungalow',
'End-Terrace house': 'house',
'BUNGALOW MID TERRACE': 'bungalow',
'Mid-terrace house': 'house',
'SEMI DETACHED HOUSE': 'house',
'Semi-detached house': 'house',
'2 EXT WALL FLAT': 'flat',
'2.EXT.WALL FLAT': 'flat',
'BUNGALOW SEMI DETACHED': 'bungalow',
'2 EXT. WALL FLAT': 'flat',
'END-TERRACE': 'unknown',
'SEMI DETACHED': 'unknown',
'2 EXT WALLS': 'unknown',
'MID TERRACED': 'unknown',
'BUNGLAOW SEMI DETACHED': 'bungalow',
'END TERRACE': 'unknown',
'3 EXT WALLS': 'unknown',
'Mid Terrace': 'unknown',
'3 EXT WALL - NOW 2 EXT': 'unknown',
'MID TERR': 'unknown',
'DETACHED': 'unknown',
'MID-TERRACE': 'unknown',
'Bunagalow Semi Detached': 'bungalow',
'End-terrace': 'unknown',
'END - TERRACE': 'unknown',
'SEMI-DETACHED': 'unknown',
'Semi-Detached': 'unknown',
'MID TERRACE': 'unknown',
'End Terrace': 'unknown',
'Detached': 'unknown',
'Mid-terrace': 'unknown',
'MID - TERRACE': 'unknown'
}

View file

@ -23,6 +23,7 @@ from recommendations.recommendation_utils import (
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.app.db.models.portfolio import rating_lookup
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
DATA_BUCKET = os.environ.get(
@ -80,6 +81,7 @@ class Property:
postcode,
address,
epc_record,
uprn=None, # Pass as an optional input
property_valuation=None,
already_installed=None,
non_invasive_recommendations=None,
@ -120,7 +122,7 @@ class Property:
self.valuation = property_valuation
self.uprn = epc_record.get("uprn")
self.uprn = uprn if uprn is not None else epc_record.get("uprn")
self.uprn_source = self.data.get("uprn-source")
self.full_sap_epc = epc_record.get("full_sap_epc")
@ -828,7 +830,7 @@ class Property:
return property_data
@classmethod
def _prepare_rating_field(cls, field, rating_lookup):
def _prepare_rating_field(cls, field):
"""
Utility function for usage in the lambda, for preparing the _rating fields
"""
@ -838,7 +840,7 @@ class Property:
else None
)
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
def get_property_details_epc(self, portfolio_id: int):
if self.current_energy_bill is None:
raise ValueError("Current energy bill has not been set")
@ -869,37 +871,21 @@ class Property:
"full_address": self.data["address"],
"total_floor_area": float(self.data["total-floor-area"]),
"walls": self.walls["clean_description"],
"walls_rating": self._prepare_rating_field(
self.data["walls-energy-eff"], rating_lookup
),
"walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"]),
"roof": self.roof["clean_description"],
"roof_rating": self._prepare_rating_field(
self.data["roof-energy-eff"], rating_lookup
),
"roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"]),
"floor": self.floor["clean_description"],
"floor_rating": self._prepare_rating_field(
self.data["floor-energy-eff"], rating_lookup
),
"floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"]),
"windows": self.windows["clean_description"],
"windows_rating": self._prepare_rating_field(
self.data["windows-energy-eff"], rating_lookup
),
"windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"]),
"heating": self.main_heating["clean_description"],
"heating_rating": self._prepare_rating_field(
self.data["mainheat-energy-eff"], rating_lookup
),
"heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"]),
"heating_controls": self.main_heating_controls["clean_description"],
"heating_controls_rating": self._prepare_rating_field(
self.data["mainheatc-energy-eff"], rating_lookup
),
"heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"]),
"hot_water": self.hotwater["clean_description"],
"hot_water_rating": self._prepare_rating_field(
self.data["hot-water-energy-eff"], rating_lookup
),
"hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"]),
"lighting": self.lighting["clean_description"],
"lighting_rating": self._prepare_rating_field(
self.data["lighting-energy-eff"], rating_lookup
),
"lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"]),
"mainfuel": self.main_fuel["clean_description"],
"ventilation": self.ventilation["ventilation"],
"solar_pv": self.solar_pv["solar_pv"],
@ -908,9 +894,7 @@ class Property:
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces[
"number_of_open_fireplaces"
],
"number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"],
"number_of_extensions": self.number_of_extensions["number_of_extensions"],
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
"mains_gas": self.mains_gas,
@ -1296,6 +1280,13 @@ class Property:
else:
raise NotImplementedError(f"Unhandled fuel {self.main_fuel['fuel_type']}")
# We handle edge case where no heating system is indicated
if self.main_fuel["fuel_type"] in fuel_map:
mapped_fuel = fuel_map[self.main_fuel["fuel_type"]]
self.heating_energy_source = mapped_fuel
self.hot_water_energy_source = mapped_fuel
return
if len(self.heating_energy_source) > 1:
# We treat this as a community scheme
self.heating_energy_source = ["Varied (Community Scheme)"]

View file

@ -553,22 +553,31 @@ class SearchEpc:
else:
raise ValueError("Multiple UPRNs found - investigate me")
if uprns:
uprn = uprns.pop()
# Convert to int
if not pd.isnull(uprn):
uprn = int(uprn)
else:
newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED
uprn = hash(self.address1 + self.postcode)
# if uprns:
# epc_uprn = uprns.pop()
# # Convert to int
# if not pd.isnull(epc_uprn):
# uprn = int(epc_uprn)
# else:
# newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED
# uprn = hash(self.address1 + self.postcode)
if self.uprn is not None and uprns:
epc_uprn = uprns.pop()
if int(epc_uprn) != self.uprn:
logger.warning(
f"Provided UPRN {self.uprn} does not match EPC UPRN {epc_uprn}, using provided UPRN"
)
# We overwrite but in this instance, we've likely got the wrong EPC data
newest_epc["uprn"] = self.uprn
if self.fast:
return newest_epc, [], {}, "", "", None, ""
return newest_epc, [], {}, "", "", ""
# Retrieve postcode and address
address_epc, postcode_epc, address_postal_town = self.format_address(newest_epc=newest_epc)
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn, address_postal_town
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, address_postal_town
@staticmethod
def filter_newest_epc(list_of_epcs: List):
@ -923,7 +932,7 @@ class SearchEpc:
@staticmethod
def calculate_weighted_lodgement_datetime(epc_data):
numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64')
numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).astype('int64')
# Calculate the weighted sum of dates
weighted_sum = (numeric_dates * epc_data['weight']).sum()
@ -991,7 +1000,7 @@ class SearchEpc:
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean,
self.address_postal_town
) = self.extract_epc_data(address=self.full_address)
@ -1085,7 +1094,7 @@ class SearchEpc:
response = self.get_epc()
if response["status"] == 200:
(
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn,
self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean,
self.address_postal_town
) = self.extract_epc_data()
return

View file

@ -0,0 +1,67 @@
from dataclasses import dataclass
from typing import Optional
@dataclass(slots=True)
class Address:
uprn: Optional[int]
landlord_property_id: Optional[str]
address: Optional[str]
full_address: Optional[str]
postcode: str
property_type: Optional[str]
built_form: Optional[str]
estimated: bool
# Additional address data, associated to a standardised asset list
domna_full_address: Optional[str]
domna_address_1: Optional[str]
landlord_heating_system: Optional[str] = None
solar_reason: Optional[str] = None
cavity_reason: Optional[str] = None
@property
def address1(self):
if self.domna_address_1 is not None:
address1 = self.domna_address_1
else:
address1 = self.address
# Format
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
return address1
@property
def request_data(self) -> dict[str, Optional[str]]:
"""
Canonical request payload for downstream services.
"""
data = {
"uprn": self.uprn,
"landlord_property_id": self.landlord_property_id,
"postcode": self.postcode,
"address1": self.address1,
"full_address": self.full_address,
}
# Drop nulls
return {k: v for k, v in data.items() if v is not None}
@property
def heating_system(self):
"""
Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited,
placeholder function to cover some initial immediate cases.
:return:
"""
ll_heating = self.landlord_property_id
if not ll_heating:
return None
if ll_heating == "electric storage heaters":
# Return with the same format at the EPC
return "Electric storage heaters"
return None

View file

@ -0,0 +1,84 @@
from backend.addresses.Address import Address
class Addresses:
def __init__(self, addresses: list[Address]):
self._addresses = addresses
# self._identity_index = self._build_identity_index()
def __getitem__(self, index: int) -> Address:
return self._addresses[index]
def __len__(self) -> int:
return len(self._addresses)
@classmethod
def from_plan_input(cls, plan_input: list[dict], body) -> "Addresses":
addresses = []
for row in plan_input:
addresses.append(cls._parse_row(row, body))
return cls(addresses)
def get_uprns(self):
return [x.uprn for x in self._addresses if x.uprn is not None]
def get_landlord_ids(self):
return [x.landlord_property_id for x in self._addresses if x.landlord_property_id is not None]
def get_unique_postcodes(self):
return list({x.postcode for x in self._addresses})
def get_postcodes_for_flats(self):
# Method to extract all of the postcodes associated to a flat, which is used for remote assessments
# on flats
return [x.postcode for x in self._addresses if x.property_type in ["Flat", "flat"]]
def get_property_requests(self):
return [x.request_data for x in self._addresses]
@staticmethod
def _parse_row(row: dict, body) -> Address:
def clean_uprn(v):
try:
return int(float(v))
except (TypeError, ValueError):
return None
uprn = clean_uprn(row.get("uprn"))
address = row.get("address")
if not address and body.file_format == "domna_asset_list":
address = row.get("domna_address_1")
full_address = (
row.get("domna_full_address")
if body.file_format == "domna_asset_list"
else None
)
if not isinstance(full_address, str):
full_address = None
postcode = str(row["postcode"]).strip().upper()
return Address(
uprn=uprn,
landlord_property_id=str(row["landlord_property_id"])
if row.get("landlord_property_id") else None,
address=str(address).strip() if address else None,
full_address=str(full_address).strip() if full_address else None,
postcode=postcode,
property_type=row.get("property_type"),
built_form=row.get("built_form"),
estimated=bool(row.get("estimated", False)),
domna_full_address=row.get("domna_full_address"),
domna_address_1=row.get("domna_address_1"),
)
# def _build_identity_index(self) -> dict:
# index = {}
# for addr in self._addresses:
# key = addr.identity_key()
# if key in index:
# raise ValueError(f"Duplicate address identity detected: {key}")
# index[key] = addr
# return index

View file

@ -20,7 +20,7 @@ def _get_associated_records(results, uprn, uprn_key="UPRN"):
return matched_record
def get_associated_uprns(session: Session, postcode: str, uprn: str):
def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str | int):
"""
Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based
on parent UPRN. This will be properties in the same building
@ -28,40 +28,87 @@ def get_associated_uprns(session: Session, postcode: str, uprn: str):
Parent UPRN is referenced in the following docs:
https://static.geoplace.co.uk/downloads/GeoPlace-Data-Entry-Conventions-Best-Practice-for-Addresses.pdf
:param session: The database session
:param postcode: The postcode string to search for
:param PostcodeSearch postcode_search: The postcode search record
:param uprn: The UPRN string to match
:return: The matching PostcodeSearch record, or None if not found
"""
try:
record = (
session.query(PostcodeSearch)
.filter(func.upper(PostcodeSearch.postcode) == postcode)
.first()
)
if not record:
# No record found for this postcode
return []
if not postcode_search:
return []
matched_record = _get_associated_records(results=record.result_data["results"], uprn=uprn)
if isinstance(uprn, int):
# For this, coerce to string
uprn = str(uprn)
if len(matched_record) != 1:
logger.error("Something went wrong, about to return nothing")
return []
matched_record = _get_associated_records(results=postcode_search.result_data["results"], uprn=uprn)
if not matched_record[0].get("PARENT_UPRN"):
logger.info("No parent UPRN found, cannot get associated records")
return []
if len(matched_record) != 1:
return []
associated_records = _get_associated_records(
results=record.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN"
)
# We now fetch all UPRNS with the same parent UPRN
associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)]
if not matched_record[0].get("PARENT_UPRN"):
logger.info("No parent UPRN found, cannot get associated records")
return []
return associated_uprns
associated_records = _get_associated_records(
results=postcode_search.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN"
)
# We now fetch all UPRNS with the same parent UPRN
associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)]
except SQLAlchemyError as e:
session.rollback()
raise e
return associated_uprns
def get_by_postcodes(session: Session, postcodes: list[str]) -> dict[str, PostcodeSearch]:
"""
Given a list of postcodes, retrieves postcode data from the database form the PostcodeSearch table
:param session:
:param postcodes:
:return:
"""
if not postcodes:
return {}
normalised = {p.upper() for p in postcodes if p}
records = (
session.query(PostcodeSearch)
.filter(func.upper(PostcodeSearch.postcode).in_(normalised))
.all()
)
return {r.postcode.upper(): r for r in records}
def get_associated_uprns_from_record(record: PostcodeSearch, uprn: str) -> list[int]:
"""
Given the postcode sra
:param record:
:param uprn:
:return:
"""
if not record:
return []
matched_record = _get_associated_records(
results=record.result_data["results"],
uprn=uprn
)
if len(matched_record) != 1:
return []
parent_uprn = matched_record[0].get("PARENT_UPRN")
if not parent_uprn:
return []
associated_records = _get_associated_records(
results=record.result_data["results"],
uprn=parent_uprn,
uprn_key="PARENT_UPRN"
)
return [
int(x["UPRN"])
for x in associated_records
if x["UPRN"] != str(uprn)
]

View file

@ -1,3 +1,4 @@
from typing import Iterable
from backend.app.db.models.energy_assessments import (
EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum
)
@ -63,27 +64,48 @@ def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> D
return uprn_to_assessment_id
def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]:
def get_latest_assessments_for_uprns(
session: Session,
uprns: Iterable[int],
) -> dict[int, dict]:
"""
Retrieve the latest energy assessment for a given UPRN based on the inspection date.
Fetch the latest energy assessment per UPRN in a single query.
:param session: The database session
:param uprn: The unique property reference number
:return: The latest EnergyAssessment object or None if not found
Returns a dict:
uprn -> assessment_dict | empty_response
"""
if not uprn:
return EnergyAssessment.empty_response()
uprns = [u for u in uprns if u]
if not uprns:
return {}
try:
# Query the EnergyAssessment model, filter by uprn, order by inspection_date in descending order
latest_assessment = session.query(EnergyAssessment).filter_by(uprn=uprn).order_by(
desc(EnergyAssessment.inspection_date)).first()
# DISTINCT ON requires matching ORDER BY
records = (
session.query(EnergyAssessment)
.filter(EnergyAssessment.uprn.in_(uprns))
.order_by(
EnergyAssessment.uprn,
desc(EnergyAssessment.inspection_date),
)
.distinct(EnergyAssessment.uprn)
.all()
)
return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response()
except Exception as e:
logger.info(f"An error occurred: {e}")
return None
result: dict[int, dict] = {}
for record in records:
result[record.uprn] = record.to_dict()
# Fill missing uprns with empty response
uprn_set = set(uprns)
found_set = set(result.keys())
missing_uprns = uprn_set - found_set
for uprn in missing_uprns:
result[uprn] = EnergyAssessment.empty_response()
return result
def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict):

View file

@ -1,7 +1,9 @@
from typing import List
from datetime import datetime, timedelta, timezone
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.epc import EpcStore
from sqlmodel import Session
from sqlalchemy.dialects.postgresql import insert
class EpcStoreService:
@ -50,6 +52,77 @@ class EpcStoreService:
"epc_page_created_at": record.epc_page_created_at,
}
@classmethod
def get_epcs_for_uprns(cls, session: Session, uprns: List[int]) -> dict[int, dict]:
"""
Given a list of uprns, return a dict mapping each uprn to its EPC data status and content.
:param session:
:param uprns:
:return:
"""
if not uprns:
return {}
cutoff = datetime.now(timezone.utc) - timedelta(days=cls.FRESHNESS_DAYS)
records = (
session.query(EpcStore)
.filter(EpcStore.uprn.in_(uprns))
.all()
)
result: dict[int, dict] = {}
for record in records:
if not record.epc_api_created_at:
result[record.uprn] = {
"status": cls.MISSING,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
continue
if record.epc_api_created_at.date() < cutoff.date():
# We only expose epc_page when epc_api is fresh.
result[record.uprn] = {
"status": cls.EXPIRED,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
continue
result[record.uprn] = {
"status": cls.FRESH,
"epc_api": record.epc_api,
"epc_page": record.epc_page,
"epc_page_rrn": record.epc_page_rrn,
"epc_api_created_at": record.epc_api_created_at,
"epc_page_created_at": record.epc_page_created_at,
}
# For the uprns not found in records, mark them as missing
requested = set(uprns)
found = set(result.keys())
missing = requested - found
for uprn in missing:
result[uprn] = {
"status": cls.MISSING,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
return result
@classmethod
def check_insert_needed(cls, epc_cache, epc_estimated, uprn):
"""
@ -115,11 +188,42 @@ class EpcStoreService:
)
session.add(record)
session.flush()
session.commit()
return record
except SQLAlchemyError as e:
session.rollback()
raise e
@classmethod
def bulk_upsert_epc_data(cls, session: Session, rows_to_insert: list[dict]):
if not rows_to_insert:
return
now = datetime.now(timezone.utc)
values = [
{
"uprn": row["uprn"],
"epc_api": row["epc_api"],
"epc_api_created_at": now,
"epc_page": row["epc_page"],
"epc_page_rrn": row["epc_page_rrn"],
"epc_page_created_at": now if row["epc_page"] else None,
}
for row in rows_to_insert
]
insert_stmt = insert(EpcStore).values(values)
stmt = insert_stmt.on_conflict_do_update(
index_elements=["uprn"],
set_={
"epc_api": insert_stmt.excluded.epc_api,
"epc_api_created_at": insert_stmt.excluded.epc_api_created_at,
"epc_page": insert_stmt.excluded.epc_page,
"epc_page_rrn": insert_stmt.excluded.epc_page_rrn,
"epc_page_created_at": insert_stmt.excluded.epc_page_created_at,
},
)
session.execute(stmt)
session.commit()

View file

@ -1,5 +1,6 @@
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import insert
from backend.app.db.models.funding import FundingPackage, FundingPackageMeasures
@ -69,3 +70,72 @@ def upload_funding(session: Session, p, plan_id, recommendations_to_upload):
session.rollback()
print(f"An error occurred: {e}")
return False
def bulk_upload_funding_packages(
session: Session,
funding_payload: list[dict],
):
"""
Bulk upload:
- funding_package
- funding_package_measures
Assumes caller manages the transaction.
"""
if not funding_payload:
return
# ---------------------------------------------------------
# 1. Prepare funding package rows
# ---------------------------------------------------------
funding_rows = []
measures_by_index = []
for f in funding_payload:
funding_rows.append({
"plan_id": f["plan_id"],
"scheme": f["scheme"],
"project_funding": f["project_funding"],
"total_uplift": f["total_uplift"],
"full_project_score": f["full_project_score"],
"partial_project_score": f["partial_project_score"],
"uplift_project_score": f["uplift_project_score"],
})
measures_by_index.append(f.get("measures", []))
# ---------------------------------------------------------
# 2. Insert funding packages and get IDs
# ---------------------------------------------------------
result = session.execute(
insert(FundingPackage)
.values(funding_rows)
.returning(FundingPackage.id)
)
funding_package_ids = [row[0] for row in result]
# ---------------------------------------------------------
# 3. Insert funding package measures
# ---------------------------------------------------------
measures_rows = []
for funding_package_id, measures in zip(
funding_package_ids, measures_by_index
):
for m in measures:
measures_rows.append({
"funding_package_id": funding_package_id,
"measure": m["measure"],
"material_id": m["material_id"],
"innovation_uplift": m["innovation_uplift"],
"partial_project_score": m["partial_project_score"],
"uplift_project_score": m["uplift_project_score"],
})
if measures_rows:
session.execute(
insert(FundingPackageMeasures).values(measures_rows)
)

View file

@ -2,7 +2,6 @@ import re
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, Type, TypeVar
from sqlalchemy.orm import Session
from datetime import timezone
from enum import Enum
from datetime import datetime, timedelta
@ -24,7 +23,6 @@ from backend.app.db.models.inspections import (
InspectionsCladding,
InspectionsAccessIssues,
)
from sqlalchemy.dialects.postgresql import insert
NON_INTRUSIVE_PREFIX = "non-intrusives:"

View file

@ -3,12 +3,16 @@
###
import datetime
import pytz
from sqlalchemy import select, or_, bindparam, update
from sqlalchemy.orm import Session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.dialects.postgresql import insert
from backend.addresses.Address import Address
from backend.app.db.models.portfolio import (
PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel,
PropertyDetailsSpatial
)
from sqlalchemy.orm.exc import NoResultFound
def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str,
@ -203,3 +207,162 @@ def update_or_create_property_spatial_details(session: Session, uprn: int, prope
session.flush()
return True
def get_existing_properties(session, portfolio_id, uprns, landlord_ids):
"""
Bulk method for checking for existing properties
:param session:
:param portfolio_id:
:param uprns:
:param landlord_ids:
:return:
"""
return (
session.exec(
select(PropertyModel)
.where(PropertyModel.portfolio_id == portfolio_id)
.where(
or_(
PropertyModel.uprn.in_(uprns),
PropertyModel.landlord_property_id.in_(landlord_ids),
)
)
)
.scalars()
.all()
)
def bulk_create_properties(
session,
body,
addresses: list[Address], # these are *new* addresses
energy_assessment_by_uprn: dict[int, dict],
):
rows = []
for addr in addresses:
energy_assessment = energy_assessment_by_uprn.get(addr.uprn, {})
status = (
PortfolioStatus.ASSESSMENT.value
if not energy_assessment.get("epc")
else PortfolioStatus.SURVEY.value
)
rows.append(
{
"address": addr.address1,
"postcode": addr.postcode,
"portfolio_id": body.portfolio_id,
"uprn": addr.uprn,
"landlord_property_id": addr.landlord_property_id,
"creation_status": PropertyCreationStatus.LOADING,
"status": status,
"has_pre_condition_report": False,
"has_recommendations": False,
}
)
if not rows:
return []
stmt = (
insert(PropertyModel)
.values(rows)
.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"],
index_where=PropertyModel.uprn.isnot(None),
)
.returning(
PropertyModel.id,
PropertyModel.uprn,
PropertyModel.landlord_property_id,
)
)
result = session.execute(stmt)
session.flush()
return result.fetchall()
def bulk_update_properties(session: Session, property_updates: list[dict]):
if not property_updates:
return
now = datetime.datetime.now(pytz.utc)
stmt = (
update(PropertyModel.__table__)
.where(
PropertyModel.id == bindparam("b_id"),
PropertyModel.portfolio_id == bindparam("b_portfolio_id"),
)
.values(
**{k: bindparam(k) for k in property_updates[0]["data"].keys()},
updated_at=now,
)
)
payload = [
{
"b_id": row["property_id"], # renamed bind param
"b_portfolio_id": row["portfolio_id"],
**row["data"],
}
for row in property_updates
]
session.execute(
stmt,
payload,
execution_options={"synchronize_session": False},
)
def bulk_upsert_property_details_epc(session: Session, rows: list[dict]):
if not rows:
return
insert_stmt = insert(PropertyDetailsEpcModel).values(rows)
update_cols = {
col.name: insert_stmt.excluded[col.name]
for col in PropertyDetailsEpcModel.__table__.columns
if col.name not in ("id",)
}
stmt = insert_stmt.on_conflict_do_update(
index_elements=["portfolio_id", "property_id"],
set_=update_cols,
)
session.execute(stmt)
def bulk_upsert_property_spatial(session: Session, rows: list[dict]):
if not rows:
return
values = []
for row in rows:
values.append({
"uprn": row["uprn"],
**row["data"],
})
insert_stmt = insert(PropertyDetailsSpatial).values(values)
update_cols = {
col.name: insert_stmt.excluded[col.name]
for col in PropertyDetailsSpatial.__table__.columns
if col.name not in ("id", "uprn")
}
stmt = insert_stmt.on_conflict_do_update(
index_elements=["uprn"],
set_=update_cols,
)
session.execute(stmt)

View file

@ -96,27 +96,47 @@ def create_plan(session: Session, plan):
raise e
def create_scenario(session: Session, scenario):
"""
This function will create a record for the scenario in the database if it does not exist.
:param session: The database session
:param scenario: dictionary of data representing a scenario to be created
"""
try:
def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]:
if not plans_to_create:
return {}
# Before creating a new scenario, we check if there is a scenario for this portfolio id already
# If there is, it means that any new scnario created will NOT be the default scenario
existing_scenario = session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first()
scenario["is_default"] = True if not existing_scenario else False
payload = [
{
"property_id": p["property_id"],
**p["plan_data"],
}
for p in plans_to_create
]
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush()
session.commit()
return new_scenario
except SQLAlchemyError as e:
session.rollback()
raise e
stmt = (
insert(Plan)
.values(payload)
.returning(Plan.id, Plan.property_id)
)
result = session.execute(stmt).all()
# property_id -> plan_id
return {row.property_id: row.id for row in result}
def create_scenario(session: Session, scenario: dict) -> int:
existing_scenario = (
session.query(Scenario)
.filter_by(portfolio_id=scenario["portfolio_id"])
.first()
)
scenario["is_default"] = not bool(existing_scenario)
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush() # ensures ID is populated
scenario_id = new_scenario.id
session.commit()
return scenario_id
def create_recommendation(session: Session, recommendation):
@ -237,6 +257,94 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
return False
def bulk_upload_recommendations_and_materials(
session: Session,
recommendation_payload: list[dict],
):
if not recommendation_payload:
return
# ---------------------------------------------------------
# 1. Prepare recommendation rows
# ---------------------------------------------------------
recommendation_rows = []
parts_by_index = []
plan_ids_by_index = []
for rec in recommendation_payload:
recommendation_rows.append({
"property_id": rec["property_id"],
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": rec["estimated_cost"],
"default": rec["default"],
"starting_u_value": rec["starting_u_value"],
"new_u_value": rec["new_u_value"],
"sap_points": rec["sap_points"],
"heat_demand": rec["heat_demand"],
"kwh_savings": rec["kwh_savings"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"energy_savings": rec["energy_savings"],
"energy_cost_savings": rec["energy_cost_savings"],
"total_work_hours": rec["total_work_hours"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
})
parts_by_index.append(rec["parts"])
plan_ids_by_index.append(rec["plan_id"])
# ---------------------------------------------------------
# 2. Insert recommendations and get IDs
# ---------------------------------------------------------
result = session.execute(
insert(Recommendation)
.values(recommendation_rows)
.returning(Recommendation.id)
)
recommendation_ids = [row[0] for row in result]
# ---------------------------------------------------------
# 3. Insert recommendation materials
# ---------------------------------------------------------
materials_rows = []
for recommendation_id, parts in zip(recommendation_ids, parts_by_index):
for part in parts:
materials_rows.append({
"recommendation_id": recommendation_id,
"material_id": part["material_id"],
"depth": part["depth"],
"quantity": part["quantity"],
"quantity_unit": part["quantity_unit"],
"estimated_cost": part["estimated_cost"],
})
if materials_rows:
session.execute(
insert(RecommendationMaterials).values(materials_rows)
)
# ---------------------------------------------------------
# 4. Insert plan ↔ recommendation links
# ---------------------------------------------------------
plan_recommendation_rows = [
{
"plan_id": plan_id,
"recommendation_id": recommendation_id,
}
for plan_id, recommendation_id in zip(
plan_ids_by_index, recommendation_ids
)
]
session.execute(
insert(PlanRecommendations).values(plan_recommendation_rows)
)
def chunked(iterable, size=100):
for i in range(0, len(iterable), size):
yield iterable[i:i + size]

View file

@ -1,9 +1,9 @@
import ast
import os
import time
import msgpack
from uuid import UUID
from typing import Any
from utils.s3 import read_from_s3
from backend.addresses.Address import Address
from backend.app.config import get_settings
from backend.app.plan.data_classes import PropertyRequestData
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
@ -52,21 +52,20 @@ def patch_epc(patch, epc_records):
def extract_property_request_data(
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
address: Address, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
patch = next((
x for x in patches if str(x["uprn"]) == str(config["uprn"])
x for x in patches if str(x["uprn"]) == str(address.uprn)
), {})
else:
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), {})
property_already_installed = next((
x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
x for x in already_installed if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), [])
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
@ -85,7 +84,7 @@ def extract_property_request_data(
else:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
(x["address"] == address.address) and (x["postcode"] == address.postcode)
), {})
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
@ -114,7 +113,7 @@ def extract_property_request_data(
else:
property_valuation = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
(x["address"] == address.address) and (x["postcode"] == address.postcode)
), None)
# Return data class to give a structured format
@ -126,14 +125,14 @@ def extract_property_request_data(
)
def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
None, None, None, list]:
solar_identification = config.get("solar_reason", None)
cavity_identification = config.get("cavity_reason", None)
solar_identification = addr.solar_reason
cavity_identification = addr.cavity_reason
if not solar_identification and not cavity_identification:
return None, None, None, []
landlord_heating_system = config["landlord_heating_system"]
landlord_heating_system = addr.landlord_heating_system
# This is the initial version of tackling "already installed" measures
already_installed = []
if landlord_heating_system == "air source heat pump":

View file

@ -23,7 +23,6 @@ from backend.app.db.connection import db_engine
import backend.app.db.functions as db_funcs
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import (
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url
@ -34,6 +33,7 @@ import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from backend.addresses.Addresses import Addresses
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
@ -45,7 +45,7 @@ from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, optimise_with_scenarios
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
from utils.logger import setup_logger
@ -368,24 +368,6 @@ def get_funding_data():
return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes
def parse_heating_system(config):
"""
Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited,
placeholder function to cover some initial immediate cases.
:return:
"""
ll_heating = config.get("landlord_heating_system", None)
if not ll_heating:
return None
if ll_heating == "electric storage heaters":
# Return with the same format at the EPC
return "Electric storage heaters"
return None
def check_duplicate_uprns(plan_input):
"""
Simple function to check if the input data contains duplicated UPRNS.
@ -426,6 +408,13 @@ def check_duplicate_property_ids(input_properties):
# de-dupe input_uprns
raise ValueError(f"Duplicate property IDs in the input data: {duplicates}")
# Check for dupe UPRNS
input_uprns = [x.uprn for x in input_properties if x.uprn is not None]
if input_uprns:
if len(input_uprns) != len(set(input_uprns)):
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
raise ValueError(f"Duplicate UPRNs in the input properties: {duplicates}")
return True
@ -682,71 +671,108 @@ async def model_engine(body: PlanTriggerRequest):
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties, inspections_map, eco_packages = [], {}, {}
for config in tqdm(plan_input):
# Prepare input data
addresses = Addresses.from_plan_input(plan_input, body)
uprn, address1, full_address = extract_address_data(config, body)
heating_system = parse_heating_system(config)
uprns = addresses.get_uprns()
landlord_ids = addresses.get_landlord_ids()
postcodes = addresses.get_postcodes_for_flats()
# ---------- 1) fetch data ----------
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
with db_read_session() as session:
epc_cache = {}
if uprn:
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
# Check if we've seen these properties before
with db_read_session() as session:
existing_properties = db_funcs.property_functions.get_existing_properties(
session, body.portfolio_id, uprns, landlord_ids
)
property_lookup = {}
for prop in existing_properties:
if prop.uprn:
property_lookup[("uprn", prop.uprn)] = prop.id
if prop.landlord_property_id:
property_lookup[("landlord_property_id", prop.landlord_property_id)] = prop.id
# For remote assessments of flats, we get associated UPRNs
associated_uprns = []
if body.event_type == "remote_assessment" and config.get("property_type") == "Flat":
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
# List of properties that need to be created in the db
to_create = []
for addr in addresses:
key = ("uprn", addr.uprn) if addr.uprn else ("landlord_property_id", addr.landlord_property_id)
if key not in property_lookup:
to_create.append(addr)
# We check for an energy assessment we have performed on this property:
energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn(
session, uprn
# Pre-requests to the db
with db_read_session() as session:
epc_cache_by_uprn = db_funcs.epc_functions.EpcStoreService.get_epcs_for_uprns(session, uprns)
postcode_searches = db_funcs.address_functions.get_by_postcodes(session, list(postcodes))
energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns(
session, uprns
)
# If we have properties that need to be created, we cerate them in bulk
new_property_ids = set()
if to_create:
logger.info("Creating %d new properties", len(to_create))
with db_session() as session:
inserted = db_funcs.property_functions.bulk_create_properties(
session, body, to_create, energy_assessments_by_uprn
)
for prop_id, uprn, landlord_property_id in inserted:
new_property_ids.add(prop_id)
# We append the newly created properties to property_lookup
for prop_id, uprn, landlord_property_id in inserted:
if uprn is not None:
property_lookup[("uprn", uprn)] = prop_id
if landlord_property_id:
property_lookup[("landlord_property_id", landlord_property_id)] = prop_id
input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, []
for addr, config in tqdm(
zip(addresses, plan_input),
total=len(addresses),
desc="Processing properties",
):
# ---------- 1) filter fetched data ----------
epc_cache = epc_cache_by_uprn[addr.uprn]
epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
# Extract from EPC cache
if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
# Extract associated UPRNs from the database response
associated_uprns = db_funcs.address_functions.get_associated_uprns(
postcode_searches.get(addr.postcode.upper()), uprn=addr.uprn
)
energy_assessment = energy_assessments_by_uprn.get(addr.uprn)
epc_searcher = SearchEpc(
address1=address1,
postcode=config["postcode"],
uprn=uprn,
address1=addr.address1,
postcode=addr.postcode,
uprn=addr.uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key="",
full_address=full_address,
heating_system=heating_system,
full_address=addr.full_address,
heating_system=addr.heating_system,
associated_uprns=associated_uprns
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
epc_searcher.ordnance_survey_client.built_form = addr.built_form
epc_searcher.ordnance_survey_client.property_type = addr.property_type
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True)
epc_searcher.set_uprn_source(file_format=body.file_format)
# ---------- 2) ensure property exists ----------
with db_session() as session:
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment,
landlord_property_id=config.get("landlord_property_id")
)
lookup_key = (
("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id)
)
property_id = property_lookup[lookup_key]
if not property_id or (not is_new and not body.multi_plan):
if not property_id:
logger.error("Could not find property ID for address: %s", addr.request_data)
# Should not happen unless input data is inconsistent
continue
if is_new:
with db_session() as session:
db_funcs.property_functions.create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
is_new = property_id in new_property_ids
if not is_new and not body.multi_plan:
continue
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
# Otherwise, we use the newest EPC
@ -757,12 +783,12 @@ async def model_engine(body: PlanTriggerRequest):
)
req_data = extract_property_request_data(
config=config,
address=addr,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=epc_searcher.uprn,
uprn=addr.uprn,
)
# Pull this out as it may get overwritten
property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch
@ -776,25 +802,21 @@ async def model_engine(body: PlanTriggerRequest):
epc_page=epc_page,
rrn=rrn,
cleaned_address=epc_searcher.address_clean,
config_address=config["address"],
config_address=addr.address,
address_postal_town=epc_searcher.address_postal_town
)
)
epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data,
)
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data)
# TODO: This is a temp function to handle a specific edge case with Peabody. We should
# factor this into EPCRecord as part of the cleaning however we need some more testing
prepared_epc = averages_cleaning(prepared_epc, cleaning_data)
# If we have an ECO project, we parse the cavity/solar reasons
eco_packages[property_id] = parse_eco_packages(config, prepared_epc)
eco_packages[property_id] = parse_eco_packages(addr, prepared_epc)
# Final step - extract inspections data, if we have it - we inject into property for usage
property_inspections = db_funcs.inspections_functions.extract_inspection_data(config)
@ -804,6 +826,7 @@ async def model_engine(body: PlanTriggerRequest):
input_properties.append(
Property(
id=property_id,
uprn=addr.uprn,
is_new=is_new,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
@ -822,30 +845,31 @@ async def model_engine(body: PlanTriggerRequest):
# 2) A real EPC
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
# We store this data
with db_session() as session:
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn
):
# We store the EPC data we have found for this property
db_funcs.epc_functions.EpcStoreService.upsert_epc_data(
session=session,
uprn=epc_searcher.uprn,
epc_api=epc_searcher.data,
epc_page=epc_page_source.get("page_source"),
epc_page_rrn=epc_page_source.get("rrn"),
)
uprn_to_check_against = addr.uprn if addr.uprn is not None else epc_searcher.uprn # Until we enforce uprn
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), uprn_to_check_against,
):
epc_upserts.append({
"uprn": uprn_to_check_against,
"epc_api": epc_searcher.data,
"epc_page": epc_page_source.get("page_source"),
"epc_page_rrn": epc_page_source.get("rrn"),
})
if not input_properties:
return Response(status_code=204)
check_duplicate_property_ids(input_properties)
logger.info("Inserting property data")
# We now bulk upload all of the EPC data
with db_session() as session:
db_funcs.epc_functions.EpcStoreService.bulk_upsert_epc_data(session, epc_upserts)
# We check if we have inspections data and store it in the database if so. We'll update or create
# aginst each property if
if inspections_map:
logger.info("Inserting inspections data")
with db_session() as session:
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
with db_session() as session:
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
# Set up model api and warm up the lambdas
model_api = ModelApi(
@ -865,7 +889,7 @@ async def model_engine(body: PlanTriggerRequest):
with db_read_session() as session:
materials = db_funcs.materials_functions.get_materials(session)
cleaned = get_cleaned()
project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
# project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True)
@ -956,12 +980,12 @@ async def model_engine(body: PlanTriggerRequest):
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Temp putting this here
recommendations_scoring_data["is_post_sap10_ending"] = True
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=[
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"
]
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
)
all_predictions = await model_api.async_paginated_predictions(
@ -1018,6 +1042,7 @@ async def model_engine(body: PlanTriggerRequest):
property_instance.current_energy_bill = property_current_energy_bill
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising measures")
for p in input_properties:
if not recommendations.get(p.id):
continue
@ -1045,74 +1070,14 @@ async def model_engine(body: PlanTriggerRequest):
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
funding = Funding(
tenure=body.housing_type,
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=13,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=13,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
gbis_private_cavity_abs_rate=21,
gbis_private_solid_abs_rate=28,
)
li_thickness = convert_thickness_to_numeric(
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
)
current_wall_u_value = p.walls["thermal_transmittance"]
if current_wall_u_value is None:
current_wall_u_value = get_wall_u_value(
clean_description=p.walls["clean_description"],
age_band=p.age_band,
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
)
# We insert the innovation uplift
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
(
r["partial_project_score"],
r["partial_project_funding"],
r["innovation_uplift"],
r["uplift_project_score"],
) = (
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=int(p.data["current-energy-efficiency"]),
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
)
if r["already_installed"]:
# if already installed, we zero out the uplift and funding
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (
0, 0, 0, 0
)
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (0, 0, 0, 0)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
@ -1122,62 +1087,36 @@ async def model_engine(body: PlanTriggerRequest):
# When the goal is Increasing EPC, we can run the funding optimiser
if body.goal == "Increasing EPC":
solutions = optimise_with_funding_paths(
solutions = optimise_with_scenarios(
p=p,
input_measures=input_measures,
housing_type=body.housing_type,
budget=body.budget,
target_gain=gain,
funding=funding,
work_package=eco_packages[p.id][2]
enforce_heat_pump_insulation=True,
enforce_fabric_first=False
)
# if handle the empty case
if solutions.empty:
scheme = "none"
funded_measures, solution = [], []
(
project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score,
battery_sap_score
) = 0, 0, 0, 0, 0, 0
solution, battery_sap_score = [], 0
else:
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
# We re-organise, taking the solution with the most gain and then the cheapest
solutions = solutions.sort_values(
by=["total_gain", "total_cost"], ascending=[False, True]
)
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
# We create this full list of selected measures, which is used in the next section for setting
# default measures
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
# This is the SAP score associated to a battery
pv_size = next(
(m["array_size"] for m in optimal_solution["items"] if m["type"] == "solar_pv"), 0
)
solution = deepcopy(optimal_solution["items"])
pv_size = float(optimal_solution["array_size"])
battery_sap_score = BatterySAPScorer.score(
starting_sap=optimal_solution["ending_sap"], pv_size=pv_size
starting_sap=optimal_solution["ending_sap_without_battery"], pv_size=pv_size
)
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
@ -1192,52 +1131,6 @@ async def model_engine(body: PlanTriggerRequest):
gain = optimiser.solution_gain
post_sap = int(p.data["current-energy-efficiency"]) + gain
recommendation_types = []
for measures in input_measures:
for measure in measures:
recommendation_types.append(measure["type"])
recommendation_types = set(recommendation_types)
has_wall_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
WALL_INSULATION_MEASURES
)
has_roof_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
ROOF_INSULATION_MEASURES
)
funding.check_funding(
measures=solution,
starting_sap=int(p.data["current-energy-efficiency"]),
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
)
# Determine the scheme
scheme = "none"
if funding.eco4_eligible:
scheme = "eco4"
if scheme == "none" and funding.gbis_eligible:
scheme = "gbis"
funded_measures = solution if scheme in ["gbis", "eco4"] else []
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
total_uplift = funding.eco4_uplift
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
pv_size = next(
(m["array_size"] for m in solution if m["type"] == "solar_pv"), 0
)
@ -1258,21 +1151,6 @@ async def model_engine(body: PlanTriggerRequest):
p.id, recommendations, selected, battery_sap_score
)
# TODO: functionise
for measure in funded_measures:
if "+mechanical_ventilation" in measure["type"]:
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
p.insert_funding(
scheme=scheme,
funded_measures=funded_measures,
project_funding=project_funding,
total_uplift=total_uplift,
full_project_score=full_project_score,
partial_project_score=partial_project_score,
uplift_project_score=uplift_project_score
)
# when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all
# of them
# TODO: We can probably do better and optimise at the building level - this is temp
@ -1308,7 +1186,7 @@ async def model_engine(body: PlanTriggerRequest):
scenario_id = body.scenario_id
else:
with db_session() as session:
engine_scenario = db_funcs.recommendations_functions.create_scenario(
scenario_id = db_funcs.recommendations_functions.create_scenario(
session=session,
scenario={
"name": body.scenario_name,
@ -1326,66 +1204,94 @@ async def model_engine(body: PlanTriggerRequest):
"multi_plan": body.multi_plan
}
)
scenario_id = engine_scenario.id
for i in tqdm(
range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE))
):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
with db_session() as session:
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
property_updates, property_epc_details, property_spatial_updates = [], [], []
plans_to_create, recommendations_to_create = [], []
total_cost = sum([r["total"] for r in default_recommendations])
# Prepare the data that will need to be uploaded in bulk
for p in input_properties:
recommendations_for_property = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_for_property if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
total_cost = sum([r["total"] for r in default_recommendations])
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc, total_cost=total_cost)
valuations = PropertyValuation.estimate(
property_instance=p, target_epc=new_epc, total_cost=total_cost
)
# --- property-level updates (always) ---
property_updates.append({
"property_id": p.id,
"portfolio_id": body.portfolio_id,
"data": p.get_full_property_data(current_valuation=valuations["current_value"])
})
property_plan_data = db_funcs.recommendations_functions.prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc,
default_recommendations
)
property_epc_details.append(p.get_property_details_epc(portfolio_id=body.portfolio_id))
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
db_funcs.property_functions.create_property_details_epc(session, property_details_epc)
property_spatial_updates.append({"uprn": p.uprn, "data": p.spatial})
db_funcs.property_functions.update_or_create_property_spatial_details(
session, p.uprn, p.spatial
)
# --- skip plan creation if no recommendations ---
if not recommendations_for_property:
continue
db_funcs.property_functions.update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
plan_data = db_funcs.recommendations_functions.prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations
)
plans_to_create.append({"property_id": p.id, "plan_data": plan_data})
if not recommendations_to_upload:
continue
# store recommendations keyed by property
for r in recommendations_for_property:
recommendations_to_create.append({
"property_id": p.id,
# ---- Recommendation core ----
"type": r["type"],
"measure_type": r["measure_type"],
"description": r["description"],
"estimated_cost": float(r["total"]),
"default": r["default"],
"starting_u_value": float(r["starting_u_value"]) if r.get("starting_u_value") else None,
"new_u_value": float(r["new_u_value"]) if r.get("new_u_value") else None,
"sap_points": float(r["sap_points"]),
"energy_savings": float(r["heat_demand"]),
"kwh_savings": float(r["kwh_savings"]),
"co2_equivalent_savings": float(r["co2_equivalent_savings"]),
"total_work_hours": float(r["labour_hours"]),
"energy_cost_savings": float(r["energy_cost_savings"]),
"labour_days": float(r["labour_days"]),
"already_installed": r["already_installed"],
"heat_demand": float(r["heat_demand"]),
new_plan_id = db_funcs.recommendations_functions.create_plan(
session, plan=property_plan_data
)
# ---- parts ----
"parts": [
{
"material_id": part["id"],
"depth": int(part["depth"]) if part.get("depth") else None,
"quantity": float(part["quantity"]) if part.get("quantity") else None,
"quantity_unit": part.get("quantity_unit"),
"estimated_cost": float(part.get("total", part.get("total_cost"))),
}
for part in r.get("parts", [])
],
})
db_funcs.recommendations_functions.upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
db_funcs.funding_functions.upload_funding(
session, p, new_plan_id, recommendations_to_upload
)
# Bulk upload property data
logger.info("Uploading property data in bulk")
with db_session() as session:
db_funcs.property_functions.bulk_update_properties(session, property_updates)
db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details)
db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates)
# # Bulk create plans
plan_id_by_property = db_funcs.recommendations_functions.bulk_create_plans(session, plans_to_create)
recommendation_payload = [
{
"plan_id": plan_id_by_property[r["property_id"]],
**{k: v for k, v in r.items() if k not in ["parts"]},
"parts": r["parts"],
} for r in recommendations_to_create if r["property_id"] in plan_id_by_property
]
except Exception as e:
# Rollback the session if an error occurs
logger.warning("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
db_funcs.recommendations_functions.bulk_upload_recommendations_and_materials(
session, recommendation_payload
)
logger.info("Work completed, updating log status")

View file

@ -1,6 +1,7 @@
import numpy as np
from recommendations.county_to_region import county_to_region_map
from utils.logger import setup_logger
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
logger = setup_logger()
@ -21,25 +22,6 @@ regional_labour_variations = [
{"Region": "Northern Ireland", "Adjustment_Factor": 0.76}
]
# This data is based on the MCS database - taken the figures for June 2024
MCS_SOLAR_PV_COST_DATA = {
"last_updated": "2024-07-10",
"average_cost_per_kwh": 1825,
"average_cost_per_kwh-Outer London": 1950,
"average_cost_per_kwh-Inner London": 1950,
"average_cost_per_kwh-South East England": 1966,
"average_cost_per_kwh-South West England": 1864,
"average_cost_per_kwh-East of England": 1719,
"average_cost_per_kwh-East Midlands": 1730,
"average_cost_per_kwh-West Midlands": 1789,
"average_cost_per_kwh-North East England": 1872,
"average_cost_per_kwh-North West England": 1860,
"average_cost_per_kwh-Yorkshire and the Humber": 1789,
"average_cost_per_kwh-Wales": 1676,
"average_cost_per_kwh-Scotland": 1781,
"average_cost_per_kwh-Northern Ireland": 1347,
}
# Installers are now working with 435 watt panels
PANEL_SIZE = 0.435
@ -61,47 +43,40 @@ INSTALLER_SOLAR_COSTS = [
{'n_panels': 18, 'array_kwp': 18 * PANEL_SIZE, 'cost': 6792.57, 'installer': 'CEG'}
]
# These are costs we received from CRG, for pricing up air source heat pumps
# These are costs that we have been provided from CRG specifically for air source heat pumps
ASHP_SMALL_SYSTEM_COST = 8812.92 # 4.8 to 8.5, based on their pricing
ASHP_LARGE_SYSTEM_COST = 11053.25
ASHP_SECURITY = 455.00
ASHP_WALL_BRACKET = 574.17
ASHP_DISTRIBUTION_SYSTEM_COSTS = [
{"n_radiators": 4, "cost": 3380.00},
{"n_radiators": 5, "cost": 3607.50},
{"n_radiators": 6, "cost": 4116.67},
{"n_radiators": 7, "cost": 4647.50},
{"n_radiators": 8, "cost": 5200.00},
{"n_radiators": 9, "cost": 5730.83},
{"n_radiators": 10, "cost": 6283.33},
{"n_radiators": 11, "cost": 6857.50},
{"n_radiators": 12, "cost": 7431.67},
{"n_radiators": 13, "cost": 8016.67},
{"n_radiators": 14, "cost": 8612.50},
{"n_radiators": 15, "cost": 9219.17},
{"n_radiators": 16, "cost": 9804.17},
{"n_radiators": 17, "cost": 10389.17},
]
ASHP_CYLINDER_COSTS = [
{"capacity_l": 120, "cost": 3318.25},
{"capacity_l": 180, "cost": 3480.75},
{"capacity_l": 200, "cost": 3853.42},
{"capacity_l": 250, "cost": 3961.75},
]
# CEG uses use Solshare as an inverter to provide solar PV to multiple flats. This costs £7500 for the inverter alone
# https://midsummerwholesale.co.uk/buy/solshare
INSTALLER_SOLAR_PV_INVERTER_COST = 7500
INSTALLER_SOLAR_PV_INVERTER_LABOUR_COST = 500 # Just a rough guess to labour costs
# INSTALLER_SCAFFOLDING_COSTS = [
# {'stories': 1, 'description': '1 Story Scaffold', 'cost': 531.00, 'installer': 'CEG'},
# {'stories': 2, 'description': '2 Story Scaffold', 'cost': 841.00, 'installer': 'CEG'},
# {'stories': 3, 'description': '3 Story Scaffold', 'cost': 1077.00, 'installer': 'CEG'}
# ]
# This data is based on the MCS database, We use the larger figure between the 2023 and 2024 average,
# to be conservative
MCS_AIR_SOURCE_HEAT_PUMP_COST_DATA = {
"Outer London": 13220,
"Inner London": 13220,
"South East England": 13547,
"South West England": 12776,
"East of England": 12585,
"East Midlands": 12239,
"West Midlands": 13182,
"North East England": 11829,
"North West England": 11714,
"Yorkshire and the Humber": 11919,
"Wales": 13701,
"Scotland": 12586,
"Northern Ireland": 12000, # There are hardly any air source heat pump installs going on in Northern Ireland
}
INSTALLER_ASHP_COSTS = [
{'capacity_kw': 5.0, 'brand': 'Mitsubishi', 'tank_size_liters': 150, 'cost': 10149.53, 'installer': 'CEG'},
{'capacity_kw': 6.0, 'brand': 'Mitsubishi', 'tank_size_liters': 170, 'cost': 10823.48, 'installer': 'CEG'},
{'capacity_kw': 8.5, 'brand': 'Mitsubishi', 'tank_size_liters': 200, 'cost': 11312.43, 'installer': 'CEG'},
{'capacity_kw': 11.2, 'brand': 'Mitsubishi', 'tank_size_liters': 250, 'cost': 12156.75, 'installer': 'CEG'},
{'capacity_kw': 14.0, 'brand': 'Mitsubishi', 'tank_size_liters': 300, 'cost': 14405.54, 'installer': 'CEG'},
{'capacity_kw': 14.0, 'brand': 'Mitsubishi', 'tank_size_liters': 300, 'cost': 14405.54, 'installer': 'CEG'},
{'capacity_kw': 17.0, 'brand': 'Grant', 'tank_size_liters': 300, 'cost': 14445.00, 'installer': 'CEG'},
{'capacity_kw': 20.0, 'brand': 'Ecoforest', 'tank_size_liters': 400, 'cost': 21189.41, 'installer': 'CEG'},
{'capacity_kw': None, 'brand': '2 x cascaded ASHPs', 'tank_size_liters': 500, 'cost': 22950.00, 'installer': 'CEG'}
]
INSTALLER_SOLAR_BATTERY_COSTS = [
{'capacity_kwh': 5, 'description': 'Battery Add on', 'cost': 3769.89, 'installer': 'JJC'},
# {'capacity_kwh': 10, 'description': 'Battery Add on', 'cost': 4300.00, 'installer': 'CEG'},
@ -350,16 +325,31 @@ class Costs:
total_cost = material["total_cost"] * insulation_floor_area
labour_hours = material["labour_hours_per_unit"] * insulation_floor_area
# To install suspended floor insulation, a small to medium size project might be conducted by a team of 3
# people
labour_days = (labour_hours / 8) / 3
# We assume the average house takes ~7 days to complete at £300/day incl. VAT, as per checkatrade
# which can be seen here: https://www.checkatrade.com/blog/cost-guides/floor-insulation-cost
# Assumptions
base_days = 7 # The quickest it will be completed
base_area = 45 # The area that can be completed in that time (for a typical 90m2 house)
labour_exponent = 0.85 # Non-linear scaling
daily_labour_rate = 300 # Based on checkatrade
min_days = 3 # Fewest days it will take
labour_days = max(
min_days,
base_days * (insulation_floor_area / base_area) ** labour_exponent
)
labour_cost = labour_days * daily_labour_rate
total_cost = total_cost + labour_cost
total_cost = round(total_cost)
return {
"total": total_cost,
"contingency": self.CONTINGENCIES["solid_floor_insulation"] * total_cost,
"contingency_rate": self.CONTINGENCIES["solid_floor_insulation"],
"labour_hours": labour_hours,
"labour_hours": labour_days * 8,
"labour_days": labour_days,
}
@ -838,32 +828,55 @@ class Costs:
"labour_days": labour_days,
}
def air_source_heat_pump(self, ashp_size):
"""
Based on the region and type of property, this function will produce a cost estimation for an air source heat
pump. This cost will include the boiler upgrade scheme grant
"""
# This is the average cost of a project, we'll add some additional contingency
if ashp_size is None:
cost = [x for x in INSTALLER_ASHP_COSTS if x["capacity_kw"] is None][0]["cost"]
@staticmethod
def _select_cylinder_capacity(occupants: float):
if occupants <= 2:
return 120
elif occupants <= 3:
return 180
elif occupants <= 4:
return 200
else:
cost = [x for x in INSTALLER_ASHP_COSTS if x][0]["cost"]
return 250
# The costs from installers exclude VAT
vat = cost * self.VAT_RATE
cost = cost + vat
def air_source_heat_pump(self, ashp_size: float, number_heated_rooms: int, total_floor_area: float) -> dict:
"""
We produce a cost estimation for an air source heat pump, based on costs we have received from installers.
# We assume 5 days installation
labour_days = 5
labour_hours = labour_days * 8
"""
system_cost = (
(ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST) + ASHP_SECURITY + ASHP_WALL_BRACKET
)
available_n_rads = [x["n_radiators"] for x in ASHP_DISTRIBUTION_SYSTEM_COSTS]
if number_heated_rooms < min(available_n_rads):
# We use the smallest value
rads_to_use = min(available_n_rads)
elif number_heated_rooms > max(available_n_rads):
# We use the largest value
rads_to_use = max(available_n_rads)
else:
rads_to_use = int(number_heated_rooms)
distribution_system_cost = [
x for x in ASHP_DISTRIBUTION_SYSTEM_COSTS if x["n_radiators"] == rads_to_use
][0]["cost"]
# Cylinder cost
est_n_occupants = AnnualBillSavings.calculate_occupants(total_floor_area)
cylinder_capacity = self._select_cylinder_capacity(est_n_occupants)
cylinder_cost = [
x for x in ASHP_CYLINDER_COSTS if x["capacity_l"] == cylinder_capacity
][0]["cost"]
total = system_cost + distribution_system_cost + cylinder_cost
return {
"total": cost,
"contingency": cost * self.CONTINGENCIES["air_source_heat_pump"],
"total": total,
"contingency": total * self.CONTINGENCIES["air_source_heat_pump"],
"contingency_rate": self.CONTINGENCIES["air_source_heat_pump"],
"vat": vat,
"labour_hours": labour_hours,
"labour_days": labour_days,
"vat": 0,
"labour_hours": 80,
"labour_days": 10,
}

View file

@ -526,14 +526,14 @@ class HeatingRecommender:
# 1) Best available path: HLP → direct peak
if heat_loss_parameter_W_per_m2K is not None:
peak_kw = heat_loss_parameter_W_per_m2K * floor_area_m2 * ΔT / 1000.0
return (peak_kw, peak_kw) # no range needed
return peak_kw, peak_kw # no range needed
# 2) Second-best: space-heating demand → HDD method
if space_heat_kwh_per_m2_yr is not None:
annual_space_kwh = space_heat_kwh_per_m2_yr * floor_area_m2
Htot = annual_space_kwh * 1000.0 / (hdd_base_dd * 24.0) # W/K
peak_kw = Htot * ΔT / 1000.0
return (peak_kw, peak_kw)
return peak_kw, peak_kw
# 3) Minimal inputs: primary energy + assumed fraction → range
assert epc_primary_kwh_per_m2_yr is not None
@ -547,7 +547,7 @@ class HeatingRecommender:
low = to_peak(space_heat_fraction_range[0])
high = to_peak(space_heat_fraction_range[1])
return (low, high)
return low, high
@staticmethod
def pick_model(peak_kw_range, models_kw=(5, 6, 8.5, 11.2, 14, 17, 20)):
@ -555,7 +555,9 @@ class HeatingRecommender:
for kw in models_kw:
if kw >= target:
return kw
return None
# Return the largest
return max(models_kw)
def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations, _return=False):
"""
@ -586,7 +588,15 @@ class HeatingRecommender:
)
ashp_size = self.pick_model(estimated_load)
ashp_costs = self.costs.air_source_heat_pump(ashp_size)
number_heated_rooms = self._estimate_n_heated_rooms()
# We now adjust this depending on the floor area to get number of communcal rooms (e.g. hallways)
communal_heated_rooms = self._estimate_n_communal_heated_rooms()
ashp_costs = self.costs.air_source_heat_pump(
ashp_size,
number_heated_rooms=number_heated_rooms + communal_heated_rooms,
total_floor_area=self.property.floor_area
)
if non_intrusive_recommendation:
# Update with non-intrusive recommendation
if non_intrusive_recommendation.get("cost"):
@ -907,6 +917,56 @@ class HeatingRecommender:
return already_has_hhr and already_has_hhr_contols
def _estimate_n_heated_rooms(self):
# If the property is off-gas and has no heating system in place, the number of heated rooms will actually
# be 0, so we use the number of rooms as the figure
number_heated_rooms = (
self.property.data["number-heated-rooms"] if self.property.data["number-heated-rooms"] > 0
else (
self.property.number_of_rooms - 1 if self.property.number_of_rooms > 1 else
self.property.number_of_rooms
)
)
# To be conservative, we adjust if we still have 1 room
if (number_heated_rooms == 1) and (self.property.number_of_rooms > 2):
number_heated_rooms = self.property.number_of_rooms - 1
return number_heated_rooms
def _estimate_n_communal_heated_rooms(self) -> int:
"""
Estimate number of communal circulation rooms (hallways / landings) that may reasonably contain a heater
"""
# Base assumptions
base_by_type = {
"Flat": 1,
"Maisonette": 1,
"Bungalow": 1,
"House": 2,
}
# Fallback if property type unknown
base = base_by_type.get(self.property.data["property-type"], 1)
# Area-based adjustments
if self.property.data["property-type"] in ("Flat", "Maisonette"):
if self.property.floor_area > 90:
return base + 1 # duplex or very large flat
return base
if self.property.data["property-type"] == "Bungalow":
if self.property.floor_area > 100:
return base + 1 # secondary corridor
return base
if self.property.data["property-type"] == "House":
if self.property.floor_area > 140:
return base + 1 # extra landing / circulation
return base
return base
def recommend_hhr_storage_heaters(self, phase, system_change, heating_controls_only, _return=False):
"""
We will recommend upgrading to a high heat retention storage system, if the current system is not already
@ -1010,18 +1070,7 @@ class HeatingRecommender:
else:
heating_simulation_config["hot_water_energy_eff_ending"] = self.property.data["hot-water-energy-eff"]
# If the property is off-gas and has no heating system in place, the number of heated rooms will actually
# be 0, so we use the number of rooms as the figure
number_heated_rooms = (
self.property.data["number-heated-rooms"] if self.property.data["number-heated-rooms"] > 0
else (
self.property.number_of_rooms - 1 if self.property.number_of_rooms > 1 else
self.property.number_of_rooms
)
)
# To be conservative, we adjust if we still have 1 room
if (number_heated_rooms == 1) and (self.property.number_of_rooms > 2):
number_heated_rooms = self.property.number_of_rooms - 1
number_heated_rooms = self._estimate_n_heated_rooms()
# We focus on the 700 watt product
hhrsh_product = next((x for x in self.hhrsh_products if x["size"] == 700), {})

View file

@ -615,7 +615,7 @@ class Recommendations:
if metric == "sap":
property_phase_impact[metric] = round(property_phase_impact[metric], 2)
else:
# We prevent these from being positive
# We prevent mechanical ventilation from being positive
property_phase_impact[metric] = (
0 if property_phase_impact[metric] > 0 else property_phase_impact[metric]
)
@ -632,6 +632,38 @@ class Recommendations:
property_phase_impact["carbon"], rec["co2_equivalent_savings"]
)
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"]
# We also ensure that mechanical ventilation doesn't have an ovely strong negative SAP impact
if rec["type"] == "mechanical_ventilation":
# ventilation is capped by having no greater and a -4 impact
ventilation_sap_limit = -4
def _check_veniltation_out_of_bounds(sap_impact):
return (sap_impact < ventilation_sap_limit) or (sap_impact >= 0)
def _adjust_ventilation_sap(sap_impact):
if sap_impact >= 0:
return -1
if sap_impact < ventilation_sap_limit:
return ventilation_sap_limit
ventilation_out_of_bounds = _check_veniltation_out_of_bounds(property_phase_impact["sap"])
if ventilation_out_of_bounds:
previous_modelled_sap = previous_phase_values.get("sap_prediction", 0)
proposed_sap_impact = current_phase_sap - previous_modelled_sap
proposal_out_of_bounds = _check_veniltation_out_of_bounds(proposed_sap_impact)
if proposal_out_of_bounds:
property_phase_impact["sap"] = _adjust_ventilation_sap(proposed_sap_impact)
else:
property_phase_impact["sap"] = proposed_sap_impact
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
if rec["type"] == "loft_insulation":
# When we have a loft insulation recommendation, where there is an extension and the existing
# amount of loft insulation is already good, we limit the SAP points
@ -642,6 +674,8 @@ class Recommendations:
)
if li_sap_limit is not None:
property_phase_impact["sap"] = min(property_phase_impact["sap"], li_sap_limit)
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
if rec["type"] == "solar_pv":
# We use the SAP points in the recommendation as a minimum
@ -649,6 +683,8 @@ class Recommendations:
rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else
property_phase_impact["sap"]
)
# Update the current phase values
current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"]
# Insert this information into the recommendation.
if not rec.get("survey", False):
@ -669,7 +705,8 @@ class Recommendations:
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
**current_phase_values
**current_phase_values,
"sap_prediction": phase_energy_efficiency_metrics["sap_change"]
}
)

View file

@ -168,7 +168,7 @@ class WallRecommendations(Definitions):
):
return
if u_value:
if u_value is not None:
if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT:
raise NotImplementedError(

View file

@ -10,6 +10,7 @@ In the future, we will adapt this into a class-based structure to allow for more
from copy import deepcopy
import pandas as pd
import numpy as np
from itertools import product
from backend.app.plan.schemas import (
WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, ECO4_ELIGIBILE_FABRIC_MEASURES
@ -587,6 +588,288 @@ def optimise_with_funding_paths(
return solutions
def build_heat_pump_paths(
remaining_wall_measures,
remaining_roof_measures,
):
"""
Build AND-paths using cartesian products.
Rules:
- Always include air_source_heat_pump
- Choose 1 wall measure if any exist
- Choose 1 roof measure if any exist
"""
# If a category is empty, use [None] so product still works
wall_choices = remaining_wall_measures or [None]
roof_choices = remaining_roof_measures or [None]
paths = []
for wall, roof in product(wall_choices, roof_choices):
parts = []
if wall is not None:
parts.append(wall)
if roof is not None:
parts.append(roof)
parts.append("air_source_heat_pump")
paths.append({"AND": parts})
return paths
def exclude_measure_types(input_measures, excluded_types):
excluded = set(excluded_types)
filtered = []
for group in input_measures:
kept = [
opt for opt in group
if opt["type"] not in excluded
]
if kept:
filtered.append(kept)
return filtered
def optimise_with_scenarios(
p,
input_measures,
budget=None,
target_gain=None,
enforce_heat_pump_insulation=True,
enforce_fabric_first=False
):
"""
Scenario-based optimiser (funding-agnostic).
Currently implemented scenarios:
1) With air source heat pump AND required insulation
"""
solutions = []
paths = []
# Produce the unique list of measure types
all_measure_types = []
for inputs in input_measures:
all_measure_types.extend([x["type"] for x in inputs])
all_measure_types = list(set(all_measure_types))
# We modify the solar PV gain, if there is a battery, to include an estimated SAP battery uplift, should
# the property hit the upgrade target, plus 1. We add the additional 1 because the higher the starting SAP,
# the lower the battery SAP uplift, so this is a conservative approach since the true SAP score is
# re-calculated later on.
optimisation_measures = deepcopy(input_measures)
for measures in optimisation_measures:
if measures[0]["type"] == "solar_pv":
for x in measures:
if x["has_battery"]:
x["battery_gain"] = BatterySAPScorer.score(
starting_sap=int(p.data["current-energy-efficiency"]) + target_gain + 1,
pv_size=x["array_size"]
)
x["gain"] += x["battery_gain"]
if enforce_fabric_first:
# If this is true, it means we only want to consider a fabric first approach. This means that
# - We treat the fabric of the house first
# - Only once the fabric has been upgraded, do we consider heating upgrades
# This should be wall insulation, roof insulation, floor insulation and windows
fabric_measures = WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES + ECO4_ELIGIBILE_FABRIC_MEASURES
fabric_only_measures = [
[opt for opt in group if opt["type"] in fabric_measures] for group in optimisation_measures
]
fabric_only_measures = [g for g in fabric_only_measures if g]
if not fabric_only_measures:
# If we have no fabric measures, it means the work has already been done and we can proceed
# straight to heating optimisation
picked_fabric, fabric_cost, fabric_gain = [], 0, 0
else:
picked_fabric, fabric_cost, fabric_gain = run_optimizer(
input_measures=fabric_only_measures,
budget=budget,
sub_target_gain=target_gain,
# If we can achieve the target gain with just insulation measures, we're done
)
picked_fabric_types = {m["type"] for m in picked_fabric}
remaining_measures = []
for group in optimisation_measures:
kept = [m for m in group if m["type"] not in picked_fabric_types]
if kept:
remaining_measures.append(kept)
picked_extra, extra_cost, extra_gain = run_optimizer(
remaining_measures,
budget=budget - fabric_cost if budget is not None else None,
sub_target_gain=(
target_gain - fabric_gain
if target_gain is not None
else None
)
)
if picked_extra is None:
picked_extra, extra_cost, extra_gain = [], 0, 0
solutions.append({
"scenario": "fabric_first",
"items": picked_fabric + picked_extra,
"fixed_items": picked_fabric,
"total_cost": fabric_cost + extra_cost,
"total_gain": fabric_gain + extra_gain,
"already_installed_gain": sum([x["gain"] for x in picked_fabric + picked_extra if x["already_installed"]])
})
return append_solution_metrics(solutions, target_gain, p)
# ------------------------------------------------------------------
# Scenario 1: Air source heat pump with required insulation
# ------------------------------------------------------------------
if enforce_heat_pump_insulation:
# Wall measures could be IWI or EWI
remaining_wall_measures = [
x for x in all_measure_types if x in WALL_INSULATION_MEASURES + [
"internal_wall_insulation+mechanical_ventilation", "external_wall_insulation+mechanical_ventilation"
]
]
remaining_roof_measures = [x for x in all_measure_types if x in ROOF_INSULATION_MEASURES]
# Mandatory structure:
# - must include ASHP
# - must include >=1 wall insulation (if still needed)
# - must include >=1 roof insulation (if still needed)
# We need all of the combinations of remaining wall and remaining roof measures
heat_pump_paths = build_heat_pump_paths(remaining_wall_measures, remaining_roof_measures)
paths.extend(heat_pump_paths)
fixed_selections = expand_funding_path(optimisation_measures, paths)
for fixed in fixed_selections:
# fixed = [(gi, oi, opt), ...]
fixed_items = [opt for (_, _, opt) in fixed]
fixed_groups = {gi for (gi, _, _) in fixed}
fixed_cost, fixed_gain = sum_cost_gain(fixed_items)
# Remaining measures (all other groups)
remaining_measures = [
grp for gi, grp in enumerate(optimisation_measures)
if gi not in fixed_groups
]
# Optimise remaining measures
if (
target_gain is not None
and fixed_gain >= target_gain
):
picked, sub_cost, sub_gain = [], 0, 0
else:
picked, sub_cost, sub_gain = run_optimizer(
remaining_measures,
budget=budget - fixed_cost if budget is not None else None,
sub_target_gain=(
target_gain - fixed_gain
if target_gain is not None
else None
)
)
if picked is None:
continue
total_items = fixed_items + picked
total_cost = fixed_cost + sub_cost
total_gain = fixed_gain + sub_gain
solutions.append({
"scenario": "heat_pump_with_insulation",
"items": total_items,
"fixed_items": fixed_items,
"total_cost": total_cost,
"total_gain": total_gain,
"already_installed_gain": sum([x["gain"] for x in total_items if x["already_installed"]])
})
# ------------------------------------------------------------------
# Scenario 2: Optimise without air source heat pump
# ------------------------------------------------------------------
# No special path; just exclude ASHP from options and allow us to optimise.
measures_no_heat_pump = exclude_measure_types(optimisation_measures, ["air_source_heat_pump"])
picked, total_cost, total_gain = run_optimizer(
measures_no_heat_pump,
budget=budget,
sub_target_gain=target_gain,
)
if picked is not None:
solutions.append({
"scenario": "no_heat_pump",
"items": picked,
"fixed_items": [],
"total_cost": total_cost,
"total_gain": total_gain,
"already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]])
})
solutions_df = append_solution_metrics(solutions, target_gain, p)
return solutions_df
def _get_ending_sap_without_battery(x):
gain = [y["gain"] - y.get("battery_gain", 0) for y in x["items"]]
return float(sum(gain))
def append_solution_metrics(solutions, target_gain, p):
"""
Given a set of solutions, this function will return a dataframe, with cost metrics appended, to allow
the end user to select the optimal solution.
:param solutions:
:param target_gain:
:return:
"""
solutions_df = pd.DataFrame(solutions)
if solutions_df.empty:
# We return a blank dataframe
return solutions_df
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
solutions_df["meets_upgrade_target"] = solutions_df["total_gain"] >= target_gain - 0.1
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
# We flag projects that are including batteries
solutions_df["has_battery"] = solutions_df["items"].apply(has_battery)
solutions_df["array_size"] = solutions_df["items"].apply(
lambda x: sum(float(y["array_size"]) for y in x if "array_size" in y)
)
# We need the ending SAP, but we'll need to remove the battery SAP uplift first
solutions_df["ending_sap_without_battery"] = solutions_df.apply(
lambda x: int(p.data["current-energy-efficiency"]) + _get_ending_sap_without_battery(x),
axis=1
)
solutions_df = solutions_df.sort_values("total_cost", ascending=True)
return solutions_df
# ---- helpers -------------------------------------------------------------

View file

@ -8,6 +8,7 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
class TestPrepareInputMeasures:
def test_returns_expected_structure_without_ventilation(self):
recs = [
[ # loft insulation measure

View file

@ -1,97 +1,14 @@
import numpy as np
# import pandas as pd
from pandas import Timestamp
from numpy import nan
import datetime
# import backend.app.assumptions as assumptions
# import recommendations.optimiser.optimiser_functions as optimiser_functions
#
# from backend.Funding import Funding
#
# project_scores_matrix = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/ECO4 Full Project Scores Matrix.csv")
# partial_project_scores_matrix = pd.read_csv("backend/tests/test_data/ECO4_Partial_Project_Scores_Matrix_v6.csv")
# partial_project_scores_matrix.columns = ['Measure category', 'Measure_Type', 'Pre_Main_Heating_Source',
# 'Post_Main_Heating_Source', 'Total Floor Area Band', 'Starting Band',
# 'Average Treatable Factor', 'Cost Savings', 'SAP Savings']
# whlg_eligible_postcodes = pd.DataFrame([{"Postcode": "ab12cd"}])
#
# funding = Funding(
# project_scores_matrix=project_scores_matrix,
# partial_project_scores_matrix=partial_project_scores_matrix,
# whlg_eligible_postcodes=whlg_eligible_postcodes,
# eco4_social_cavity_abs_rate=13.5,
# eco4_social_solid_abs_rate=17,
# eco4_private_cavity_abs_rate=13.5,
# eco4_private_solid_abs_rate=17,
# gbis_social_cavity_abs_rate=21,
# gbis_social_solid_abs_rate=25,
# gbis_private_cavity_abs_rate=22,
# gbis_private_solid_abs_rate=28,
# tenure="Social"
# )
#
# # Assume these costs have been adjusted
#
# # Insert the funding uplifts
# for recs in property_recommendations:
# for r in recs:
# # Insert randomly
# # Select one of 0, 0.25 or 0.45
# r["uplift"] = np.random.choice([0, 0.25, 0.45])
#
# # We calculate the innovation uplift against each measure
# for recs in property_recommendations:
# for r in recs:
# if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating"]:
# r["innovation_uplift"] = 0
# continue
# r["innovation_uplift"] = funding.get_innovation_uplift(
# measure=r,
# starting_sap=p.data["current-energy-efficiency"],
# floor_area=p.floor_area,
# is_cavity=False,
# current_wall_uvalue=1.7,
# is_partial=False,
# existing_li_thickness=150,
# mainheating=p.main_heating,
# main_fuel=p.main_fuel,
# mainheat_energy_eff=p.data["mainheat-energy-eff"],
# )
# print(r["innovation_uplift"])
#
# property_measure_types = {rec["type"] for recs in property_recommendations for rec in recs}
# property_required_measures = [m for m in property_recommendations if m[0]["type"] in []]
# measures_to_optimise = [m for m in property_recommendations if m[0]["type"] not in []]
#
# # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# # its inclusion
# needs_ventilation = any(
# x in property_measure_types for x in assumptions.measures_needing_ventilation
# ) and not p.has_ventilation
#
# input_measures = optimiser_functions.prepare_input_measures(
# measures_to_optimise, "Increasing EPC", needs_ventilation, True
# )
#
# # ---- main wrapper around your optimiser ----------------------------------
#
# # Run inputs:
# target_gain = 18.5
#
# # Run the optimiser with these inouts
# tests/test_social_fabric_only.py
import numpy as np
import pandas as pd
import pytest
from copy import deepcopy
from recommendations.optimiser import optimiser_functions
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths # wherever you defined it
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, build_heat_pump_paths
from backend.Funding import Funding
from backend.app.plan.schemas import WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, ECO4_ELIGIBILE_FABRIC_MEASURES
@ -799,3 +716,14 @@ def test_private_solid_wall_no_innovation_epc_d(p, funding, mock_project_scores_
'partial_project_funding': 2300.1000000000004, 'partial_project_score': 135.3, 'total_uplift': 0.0,
'total_uplift_score': 0.0
}
def test_build_heat_pump_paths():
eg1 = build_heat_pump_paths([], ["loft_insulation"])
assert eg1 == [{'AND': ['loft_insulation', 'air_source_heat_pump']}]
eg2 = build_heat_pump_paths(["internal_wall_insulation", "external_wall_insulation"], ["loft_insulation"])
assert eg2 == [{'AND': ['internal_wall_insulation', 'loft_insulation', 'air_source_heat_pump']},
{'AND': ['external_wall_insulation', 'loft_insulation', 'air_source_heat_pump']}]

View file

@ -66,7 +66,7 @@ functions:
- sqs:
arn: arn:aws:sqs:${self:provider.region}:${aws:accountId}:model-engine-queue
batchSize: 1
maximumConcurrency: 10 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits
maximumConcurrency: 5 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits
resources: