diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py index da20432b..940c723a 100644 --- a/asset_list/AssetList.py +++ b/asset_list/AssetList.py @@ -997,7 +997,15 @@ class AssetList: # Keep a record of duplicates self.duplicated_addresses = self.standardised_asset_list[ self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated() - ][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy() + ][[self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname, self.postcode_colname]].copy() + + df = self.standardised_asset_list[ + self.standardised_asset_list[self.DOMNA_PROPERTY_ID].isin( + self.duplicated_addresses[self.DOMNA_PROPERTY_ID]) + ][[self.landlord_property_id, self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname, + self.postcode_colname]].copy() + + df = df.sort_values(by=[self.DOMNA_PROPERTY_ID]) self.standardised_asset_list = self.standardised_asset_list[ ~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated() diff --git a/asset_list/app.py b/asset_list/app.py index 3d8a0fae..c58eccd7 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -59,6 +59,39 @@ def app(): Property UPRN """ + data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Warmfront/SCIS") + data_filename = "SCIS_Historic_Deemed_Combined_Workings.xlsx" + sheet_name = "SCIS" + postcode_column = 'POSTCODE' + address1_column = "NO" + address1_method = None + fulladdress_column = None + address_cols_to_concat = ["NO", "Street / Block Name", "Town/Area"] + missing_postcodes_method = None + landlord_year_built = None + landlord_os_uprn = None + landlord_property_type = "PROPERTY TYPE As per table emailed" + landlord_built_form = "PROPERTY TYPE As per table emailed" + landlord_wall_construction = None + landlord_roof_construction = None + landlord_heating_system = None + landlord_existing_pv = None + landlord_property_id = "Row ID" + landlord_sap = None + outcomes_filename = None + outcomes_sheetname = None + outcomes_postcode = None + outcomes_houseno = None + outcomes_id = None + outcomes_address = None + master_filepaths = [] + master_id_colnames = [] + master_to_asset_list_filepath = None + phase = False + ecosurv_landlords = None + asset_list_header = 0 + landlord_block_reference = None + # Peabody data for cleaning data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting " "Project/data_validation") diff --git a/asset_list/mappings/built_form.py b/asset_list/mappings/built_form.py index 58686d6b..a9defdef 100644 --- a/asset_list/mappings/built_form.py +++ b/asset_list/mappings/built_form.py @@ -464,6 +464,60 @@ BUILT_FORM_MAPPINGS = { 'EnclosedEndTerrace': 'enclosed end-terrace', 'EndTerrace': 'end-terrace', 'SemiDetached': 'semi-detached', - 'MidTerrace': 'mid-terrace' + 'MidTerrace': 'mid-terrace', + + '1st FLOOR FLAT': 'mid-floor', + 'END TERRACE HOUSE': 'end-terrace', + 'BUNGALOW-END TERRACE': 'end-terrace', + 'BUNGALOW END TERRACE': 'end-terrace', + 'END-TERRACE': 'end-terrace', + 'SEMI DETACHED': 'semi-detached', + 'Mid flat Ground Floor': 'ground floor', + 'MID TERRACED': 'mid-terrace', + 'Mid Terrace bungalow': 'mid-terrace', + 'BUNGLAOW SEMI DETACHED': 'detached', + 'Bungalow ENd Terrace': 'end-terrace', + 'Bungalow Semi detached': 'detached', + 'BUNGALOW - SEMI DETACHED': 'detached', + 'Bungalow mid terrace': 'mid-terrace', + 'BUNGALOW - MID TERRACED': 'mid-terrace', + 'BUNGALOW - MID TERRACE': 'mid-terrace', + 'Bungalow end terrace': 'end-terrace', + 'BUNGALOW SEMI-DETACHED': 'detached', + 'MID TERR': 'mid-terrace', + 'Bungalow - mid terrace': 'mid-terrace', + 'MID-TERRACE': 'mid-terrace', + 'Bunagalow Semi Detached': 'semi-detached', + 'SEMI DETACHED BUNGALOW': 'semi-detached', + 'MID TERRACE HOUSE': 'mid-terrace', + 'END - TERRACE': 'end-terrace', + 'BUNGALOW-SEMI DETACHED': 'semi-detached', + 'Semi-Detached': 'semi-detached', + 'End-Terrace house': 'end-terrace', + 'BUNGALOW MID TERRACE': 'mid-terrace', + 'SEMI DETACHED HOUSE': 'semi-detached', + 'BUNGALOW SEMI DETACHED': 'detached', + 'MID - TERRACE': 'mid-terrace', + '3 EXT WALL FLAT': 'end-terrace', + '3 Ext wall flat': 'end-terrace', + '3 EX WALL FLAT': 'end-terrace', + '2 ext wall flats': 'mid-terrace', + '2 EXT WALLS': 'mid-terrace', + '3.EXT.WALL FLAT': 'end-terrace', + 'FLAT 3 WALLS': 'end-terrace', + '2 Ext Wall flat': 'mid-terrace', + 'DETATCHED HOUSE': 'detached', + '3 EXT. WALL FLAT': 'end-terrace', + '3 ext wall flat': 'end-terrace', + '3 EXT WALLS': 'end-terrace', + '3 EXT WALL - NOW 2 EXT': 'unknown', + '3 EXT-WALL FLAT': 'end-terrace', + 'FLAT 2 WALLS': 'mid-terrace', + '3 EX WALL MAISONETTE': 'end-terrace', + '3 Ext Wall Flat': 'end-terrace', + 'Semi Bungalow': 'semi-detached', + '2 EXT WALL FLAT': 'mid-terrace', + '2.EXT.WALL FLAT': 'mid-terrace', + '2 EXT. WALL FLAT': 'mid-terrace', } diff --git a/asset_list/mappings/property_type.py b/asset_list/mappings/property_type.py index 1c236d96..1f251598 100644 --- a/asset_list/mappings/property_type.py +++ b/asset_list/mappings/property_type.py @@ -362,6 +362,71 @@ PROPERTY_MAPPING = { 'Maisonette: Semi Detached: Mid Floor': 'maisonette', 'Maisonette: Detached: Mid Floor': 'maisonette', - 'House: EnclosedMidTerrace': 'house' + 'House: EnclosedMidTerrace': 'house', + + '3 EXT WALL FLAT': 'flat', + '1st FLOOR FLAT': 'flat', + '3 Ext wall flat': 'flat', + '3 EX WALL FLAT': 'flat', + 'END TERRACE HOUSE': 'house', + 'BUNGALOW-END TERRACE': 'bungalow', + 'BUNGALOW END TERRACE': 'bungalow', + '2 ext wall flats': 'flat', + 'Mid flat Ground Floor': 'flat', + '3.EXT.WALL FLAT': 'flat', + 'FLAT 3 WALLS': 'flat', + 'Mid Terrace bungalow': 'bungalow', + 'Bungalow ENd Terrace': 'bungalow', + '2 Ext Wall flat': 'flat', + 'DETATCHED HOUSE': 'house', + 'Bungalow Semi detached': 'bungalow', + 'BUNGALOW - SEMI DETACHED': 'bungalow', + 'Bungalow mid terrace': 'bungalow', + 'BUNGALOW - MID TERRACED': 'bungalow', + 'BUNGALOW - MID TERRACE': 'bungalow', + 'Bungalow end terrace': 'bungalow', + '3 EXT. WALL FLAT': 'flat', + '3 ext wall flat': 'flat', + 'BUNGALOW SEMI-DETACHED': 'bungalow', + '3 EXT-WALL FLAT': 'flat', + 'Bungalow - mid terrace': 'bungalow', + 'SEMI DETACHED BUNGALOW': 'bungalow', + 'FLAT 2 WALLS': 'flat', + 'MID TERRACE HOUSE': 'house', + '3 EX WALL MAISONETTE': 'maisonette', + 'BUNGALOW-SEMI DETACHED': 'bungalow', + '3 Ext Wall Flat': 'flat', + 'Semi Bungalow': 'bungalow', + 'End-Terrace house': 'house', + 'BUNGALOW MID TERRACE': 'bungalow', + 'Mid-terrace house': 'house', + 'SEMI DETACHED HOUSE': 'house', + 'Semi-detached house': 'house', + '2 EXT WALL FLAT': 'flat', + '2.EXT.WALL FLAT': 'flat', + 'BUNGALOW SEMI DETACHED': 'bungalow', + '2 EXT. WALL FLAT': 'flat', + 'END-TERRACE': 'unknown', + 'SEMI DETACHED': 'unknown', + '2 EXT WALLS': 'unknown', + 'MID TERRACED': 'unknown', + 'BUNGLAOW SEMI DETACHED': 'bungalow', + 'END TERRACE': 'unknown', + '3 EXT WALLS': 'unknown', + 'Mid Terrace': 'unknown', + '3 EXT WALL - NOW 2 EXT': 'unknown', + 'MID TERR': 'unknown', + 'DETACHED': 'unknown', + 'MID-TERRACE': 'unknown', + 'Bunagalow Semi Detached': 'bungalow', + 'End-terrace': 'unknown', + 'END - TERRACE': 'unknown', + 'SEMI-DETACHED': 'unknown', + 'Semi-Detached': 'unknown', + 'MID TERRACE': 'unknown', + 'End Terrace': 'unknown', + 'Detached': 'unknown', + 'Mid-terrace': 'unknown', + 'MID - TERRACE': 'unknown' } diff --git a/backend/Property.py b/backend/Property.py index f8013fb5..10af56cc 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -23,6 +23,7 @@ from recommendations.recommendation_utils import ( from backend.ml_models.AnnualBillSavings import AnnualBillSavings from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions +from backend.app.db.models.portfolio import rating_lookup ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev") DATA_BUCKET = os.environ.get( @@ -80,6 +81,7 @@ class Property: postcode, address, epc_record, + uprn=None, # Pass as an optional input property_valuation=None, already_installed=None, non_invasive_recommendations=None, @@ -120,7 +122,7 @@ class Property: self.valuation = property_valuation - self.uprn = epc_record.get("uprn") + self.uprn = uprn if uprn is not None else epc_record.get("uprn") self.uprn_source = self.data.get("uprn-source") self.full_sap_epc = epc_record.get("full_sap_epc") @@ -828,7 +830,7 @@ class Property: return property_data @classmethod - def _prepare_rating_field(cls, field, rating_lookup): + def _prepare_rating_field(cls, field): """ Utility function for usage in the lambda, for preparing the _rating fields """ @@ -838,7 +840,7 @@ class Property: else None ) - def get_property_details_epc(self, portfolio_id: int, rating_lookup): + def get_property_details_epc(self, portfolio_id: int): if self.current_energy_bill is None: raise ValueError("Current energy bill has not been set") @@ -869,37 +871,21 @@ class Property: "full_address": self.data["address"], "total_floor_area": float(self.data["total-floor-area"]), "walls": self.walls["clean_description"], - "walls_rating": self._prepare_rating_field( - self.data["walls-energy-eff"], rating_lookup - ), + "walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"]), "roof": self.roof["clean_description"], - "roof_rating": self._prepare_rating_field( - self.data["roof-energy-eff"], rating_lookup - ), + "roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"]), "floor": self.floor["clean_description"], - "floor_rating": self._prepare_rating_field( - self.data["floor-energy-eff"], rating_lookup - ), + "floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"]), "windows": self.windows["clean_description"], - "windows_rating": self._prepare_rating_field( - self.data["windows-energy-eff"], rating_lookup - ), + "windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"]), "heating": self.main_heating["clean_description"], - "heating_rating": self._prepare_rating_field( - self.data["mainheat-energy-eff"], rating_lookup - ), + "heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"]), "heating_controls": self.main_heating_controls["clean_description"], - "heating_controls_rating": self._prepare_rating_field( - self.data["mainheatc-energy-eff"], rating_lookup - ), + "heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"]), "hot_water": self.hotwater["clean_description"], - "hot_water_rating": self._prepare_rating_field( - self.data["hot-water-energy-eff"], rating_lookup - ), + "hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"]), "lighting": self.lighting["clean_description"], - "lighting_rating": self._prepare_rating_field( - self.data["lighting-energy-eff"], rating_lookup - ), + "lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"]), "mainfuel": self.main_fuel["clean_description"], "ventilation": self.ventilation["ventilation"], "solar_pv": self.solar_pv["solar_pv"], @@ -908,9 +894,7 @@ class Property: "floor_height": self.floor_height, "heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"], "unheated_corridor_length": self.heat_loss_corridor["length"], - "number_of_open_fireplaces": self.number_of_open_fireplaces[ - "number_of_open_fireplaces" - ], + "number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"], "number_of_extensions": self.number_of_extensions["number_of_extensions"], "number_of_storeys": self.number_of_storeys["number_of_storeys"], "mains_gas": self.mains_gas, @@ -1296,6 +1280,13 @@ class Property: else: raise NotImplementedError(f"Unhandled fuel {self.main_fuel['fuel_type']}") + # We handle edge case where no heating system is indicated + if self.main_fuel["fuel_type"] in fuel_map: + mapped_fuel = fuel_map[self.main_fuel["fuel_type"]] + self.heating_energy_source = mapped_fuel + self.hot_water_energy_source = mapped_fuel + return + if len(self.heating_energy_source) > 1: # We treat this as a community scheme self.heating_energy_source = ["Varied (Community Scheme)"] diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index cb465239..9af7330b 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -553,22 +553,31 @@ class SearchEpc: else: raise ValueError("Multiple UPRNs found - investigate me") - if uprns: - uprn = uprns.pop() - # Convert to int - if not pd.isnull(uprn): - uprn = int(uprn) - else: - newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED - uprn = hash(self.address1 + self.postcode) + # if uprns: + # epc_uprn = uprns.pop() + # # Convert to int + # if not pd.isnull(epc_uprn): + # uprn = int(epc_uprn) + # else: + # newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED + # uprn = hash(self.address1 + self.postcode) + + if self.uprn is not None and uprns: + epc_uprn = uprns.pop() + if int(epc_uprn) != self.uprn: + logger.warning( + f"Provided UPRN {self.uprn} does not match EPC UPRN {epc_uprn}, using provided UPRN" + ) + # We overwrite but in this instance, we've likely got the wrong EPC data + newest_epc["uprn"] = self.uprn if self.fast: - return newest_epc, [], {}, "", "", None, "" + return newest_epc, [], {}, "", "", "" # Retrieve postcode and address address_epc, postcode_epc, address_postal_town = self.format_address(newest_epc=newest_epc) - return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn, address_postal_town + return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, address_postal_town @staticmethod def filter_newest_epc(list_of_epcs: List): @@ -923,7 +932,7 @@ class SearchEpc: @staticmethod def calculate_weighted_lodgement_datetime(epc_data): - numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).view('int64') + numeric_dates = pd.to_datetime(epc_data['lodgement-datetime']).astype('int64') # Calculate the weighted sum of dates weighted_sum = (numeric_dates * epc_data['weight']).sum() @@ -991,7 +1000,7 @@ class SearchEpc: if response["status"] == 200: ( - self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn, + self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.address_postal_town ) = self.extract_epc_data(address=self.full_address) @@ -1085,7 +1094,7 @@ class SearchEpc: response = self.get_epc() if response["status"] == 200: ( - self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.uprn, + self.newest_epc, self.older_epcs, self.full_sap_epc, self.address_clean, self.postcode_clean, self.address_postal_town ) = self.extract_epc_data() return diff --git a/backend/addresses/Address.py b/backend/addresses/Address.py new file mode 100644 index 00000000..9b95f5e0 --- /dev/null +++ b/backend/addresses/Address.py @@ -0,0 +1,67 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass(slots=True) +class Address: + uprn: Optional[int] + landlord_property_id: Optional[str] + address: Optional[str] + full_address: Optional[str] + postcode: str + property_type: Optional[str] + built_form: Optional[str] + estimated: bool + + # Additional address data, associated to a standardised asset list + domna_full_address: Optional[str] + domna_address_1: Optional[str] + landlord_heating_system: Optional[str] = None + solar_reason: Optional[str] = None + cavity_reason: Optional[str] = None + + @property + def address1(self): + + if self.domna_address_1 is not None: + address1 = self.domna_address_1 + else: + address1 = self.address + + # Format + address1 = str(int(address1)) if isinstance(address1, float) else str(address1) + return address1 + + @property + def request_data(self) -> dict[str, Optional[str]]: + """ + Canonical request payload for downstream services. + """ + data = { + "uprn": self.uprn, + "landlord_property_id": self.landlord_property_id, + "postcode": self.postcode, + "address1": self.address1, + "full_address": self.full_address, + } + + # Drop nulls + return {k: v for k, v in data.items() if v is not None} + + @property + def heating_system(self): + """ + Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited, + placeholder function to cover some initial immediate cases. + :return: + """ + + ll_heating = self.landlord_property_id + if not ll_heating: + return None + + if ll_heating == "electric storage heaters": + # Return with the same format at the EPC + return "Electric storage heaters" + + return None diff --git a/backend/addresses/Addresses.py b/backend/addresses/Addresses.py new file mode 100644 index 00000000..22822c6b --- /dev/null +++ b/backend/addresses/Addresses.py @@ -0,0 +1,84 @@ +from backend.addresses.Address import Address + + +class Addresses: + def __init__(self, addresses: list[Address]): + self._addresses = addresses + # self._identity_index = self._build_identity_index() + + def __getitem__(self, index: int) -> Address: + return self._addresses[index] + + def __len__(self) -> int: + return len(self._addresses) + + @classmethod + def from_plan_input(cls, plan_input: list[dict], body) -> "Addresses": + addresses = [] + for row in plan_input: + addresses.append(cls._parse_row(row, body)) + return cls(addresses) + + def get_uprns(self): + return [x.uprn for x in self._addresses if x.uprn is not None] + + def get_landlord_ids(self): + return [x.landlord_property_id for x in self._addresses if x.landlord_property_id is not None] + + def get_unique_postcodes(self): + return list({x.postcode for x in self._addresses}) + + def get_postcodes_for_flats(self): + # Method to extract all of the postcodes associated to a flat, which is used for remote assessments + # on flats + return [x.postcode for x in self._addresses if x.property_type in ["Flat", "flat"]] + + def get_property_requests(self): + return [x.request_data for x in self._addresses] + + @staticmethod + def _parse_row(row: dict, body) -> Address: + def clean_uprn(v): + try: + return int(float(v)) + except (TypeError, ValueError): + return None + + uprn = clean_uprn(row.get("uprn")) + + address = row.get("address") + if not address and body.file_format == "domna_asset_list": + address = row.get("domna_address_1") + + full_address = ( + row.get("domna_full_address") + if body.file_format == "domna_asset_list" + else None + ) + if not isinstance(full_address, str): + full_address = None + + postcode = str(row["postcode"]).strip().upper() + + return Address( + uprn=uprn, + landlord_property_id=str(row["landlord_property_id"]) + if row.get("landlord_property_id") else None, + address=str(address).strip() if address else None, + full_address=str(full_address).strip() if full_address else None, + postcode=postcode, + property_type=row.get("property_type"), + built_form=row.get("built_form"), + estimated=bool(row.get("estimated", False)), + domna_full_address=row.get("domna_full_address"), + domna_address_1=row.get("domna_address_1"), + ) + + # def _build_identity_index(self) -> dict: + # index = {} + # for addr in self._addresses: + # key = addr.identity_key() + # if key in index: + # raise ValueError(f"Duplicate address identity detected: {key}") + # index[key] = addr + # return index diff --git a/backend/app/db/functions/address_functions.py b/backend/app/db/functions/address_functions.py index b04f14c9..4b8ad5f2 100644 --- a/backend/app/db/functions/address_functions.py +++ b/backend/app/db/functions/address_functions.py @@ -20,7 +20,7 @@ def _get_associated_records(results, uprn, uprn_key="UPRN"): return matched_record -def get_associated_uprns(session: Session, postcode: str, uprn: str): +def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str | int): """ Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based on parent UPRN. This will be properties in the same building @@ -28,40 +28,87 @@ def get_associated_uprns(session: Session, postcode: str, uprn: str): Parent UPRN is referenced in the following docs: https://static.geoplace.co.uk/downloads/GeoPlace-Data-Entry-Conventions-Best-Practice-for-Addresses.pdf - :param session: The database session - :param postcode: The postcode string to search for + :param PostcodeSearch postcode_search: The postcode search record :param uprn: The UPRN string to match :return: The matching PostcodeSearch record, or None if not found """ - try: - record = ( - session.query(PostcodeSearch) - .filter(func.upper(PostcodeSearch.postcode) == postcode) - .first() - ) - if not record: - # No record found for this postcode - return [] + if not postcode_search: + return [] - matched_record = _get_associated_records(results=record.result_data["results"], uprn=uprn) + if isinstance(uprn, int): + # For this, coerce to string + uprn = str(uprn) - if len(matched_record) != 1: - logger.error("Something went wrong, about to return nothing") - return [] + matched_record = _get_associated_records(results=postcode_search.result_data["results"], uprn=uprn) - if not matched_record[0].get("PARENT_UPRN"): - logger.info("No parent UPRN found, cannot get associated records") - return [] + if len(matched_record) != 1: + return [] - associated_records = _get_associated_records( - results=record.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN" - ) - # We now fetch all UPRNS with the same parent UPRN - associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)] + if not matched_record[0].get("PARENT_UPRN"): + logger.info("No parent UPRN found, cannot get associated records") + return [] - return associated_uprns + associated_records = _get_associated_records( + results=postcode_search.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN" + ) + # We now fetch all UPRNS with the same parent UPRN + associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)] - except SQLAlchemyError as e: - session.rollback() - raise e + return associated_uprns + + +def get_by_postcodes(session: Session, postcodes: list[str]) -> dict[str, PostcodeSearch]: + """ + Given a list of postcodes, retrieves postcode data from the database form the PostcodeSearch table + :param session: + :param postcodes: + :return: + """ + if not postcodes: + return {} + + normalised = {p.upper() for p in postcodes if p} + + records = ( + session.query(PostcodeSearch) + .filter(func.upper(PostcodeSearch.postcode).in_(normalised)) + .all() + ) + + return {r.postcode.upper(): r for r in records} + + +def get_associated_uprns_from_record(record: PostcodeSearch, uprn: str) -> list[int]: + """ + Given the postcode sra + :param record: + :param uprn: + :return: + """ + if not record: + return [] + + matched_record = _get_associated_records( + results=record.result_data["results"], + uprn=uprn + ) + + if len(matched_record) != 1: + return [] + + parent_uprn = matched_record[0].get("PARENT_UPRN") + if not parent_uprn: + return [] + + associated_records = _get_associated_records( + results=record.result_data["results"], + uprn=parent_uprn, + uprn_key="PARENT_UPRN" + ) + + return [ + int(x["UPRN"]) + for x in associated_records + if x["UPRN"] != str(uprn) + ] diff --git a/backend/app/db/functions/energy_assessment_functions.py b/backend/app/db/functions/energy_assessment_functions.py index bbdaaac7..c9e40b3f 100644 --- a/backend/app/db/functions/energy_assessment_functions.py +++ b/backend/app/db/functions/energy_assessment_functions.py @@ -1,3 +1,4 @@ +from typing import Iterable from backend.app.db.models.energy_assessments import ( EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum ) @@ -63,27 +64,48 @@ def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> D return uprn_to_assessment_id -def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]: +def get_latest_assessments_for_uprns( + session: Session, + uprns: Iterable[int], +) -> dict[int, dict]: """ - Retrieve the latest energy assessment for a given UPRN based on the inspection date. + Fetch the latest energy assessment per UPRN in a single query. - :param session: The database session - :param uprn: The unique property reference number - :return: The latest EnergyAssessment object or None if not found + Returns a dict: + uprn -> assessment_dict | empty_response """ - if not uprn: - return EnergyAssessment.empty_response() + uprns = [u for u in uprns if u] + if not uprns: + return {} - try: - # Query the EnergyAssessment model, filter by uprn, order by inspection_date in descending order - latest_assessment = session.query(EnergyAssessment).filter_by(uprn=uprn).order_by( - desc(EnergyAssessment.inspection_date)).first() + # DISTINCT ON requires matching ORDER BY + records = ( + session.query(EnergyAssessment) + .filter(EnergyAssessment.uprn.in_(uprns)) + .order_by( + EnergyAssessment.uprn, + desc(EnergyAssessment.inspection_date), + ) + .distinct(EnergyAssessment.uprn) + .all() + ) - return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response() - except Exception as e: - logger.info(f"An error occurred: {e}") - return None + result: dict[int, dict] = {} + + for record in records: + result[record.uprn] = record.to_dict() + + # Fill missing uprns with empty response + uprn_set = set(uprns) + found_set = set(result.keys()) + + missing_uprns = uprn_set - found_set + + for uprn in missing_uprns: + result[uprn] = EnergyAssessment.empty_response() + + return result def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict): diff --git a/backend/app/db/functions/epc_functions.py b/backend/app/db/functions/epc_functions.py index 4b675f1f..defc24c9 100644 --- a/backend/app/db/functions/epc_functions.py +++ b/backend/app/db/functions/epc_functions.py @@ -1,7 +1,9 @@ +from typing import List from datetime import datetime, timedelta, timezone -from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError from backend.app.db.models.epc import EpcStore +from sqlmodel import Session +from sqlalchemy.dialects.postgresql import insert class EpcStoreService: @@ -50,6 +52,77 @@ class EpcStoreService: "epc_page_created_at": record.epc_page_created_at, } + @classmethod + def get_epcs_for_uprns(cls, session: Session, uprns: List[int]) -> dict[int, dict]: + """ + Given a list of uprns, return a dict mapping each uprn to its EPC data status and content. + :param session: + :param uprns: + :return: + """ + if not uprns: + return {} + + cutoff = datetime.now(timezone.utc) - timedelta(days=cls.FRESHNESS_DAYS) + + records = ( + session.query(EpcStore) + .filter(EpcStore.uprn.in_(uprns)) + .all() + ) + + result: dict[int, dict] = {} + + for record in records: + if not record.epc_api_created_at: + result[record.uprn] = { + "status": cls.MISSING, + "epc_api": None, + "epc_page": None, + "epc_page_rrn": None, + "epc_api_created_at": None, + "epc_page_created_at": None, + } + continue + + if record.epc_api_created_at.date() < cutoff.date(): + # We only expose epc_page when epc_api is fresh. + result[record.uprn] = { + "status": cls.EXPIRED, + "epc_api": None, + "epc_page": None, + "epc_page_rrn": None, + "epc_api_created_at": None, + "epc_page_created_at": None, + } + continue + + result[record.uprn] = { + "status": cls.FRESH, + "epc_api": record.epc_api, + "epc_page": record.epc_page, + "epc_page_rrn": record.epc_page_rrn, + "epc_api_created_at": record.epc_api_created_at, + "epc_page_created_at": record.epc_page_created_at, + } + + # For the uprns not found in records, mark them as missing + requested = set(uprns) + found = set(result.keys()) + + missing = requested - found + for uprn in missing: + result[uprn] = { + "status": cls.MISSING, + "epc_api": None, + "epc_page": None, + "epc_page_rrn": None, + "epc_api_created_at": None, + "epc_page_created_at": None, + } + + return result + @classmethod def check_insert_needed(cls, epc_cache, epc_estimated, uprn): """ @@ -115,11 +188,42 @@ class EpcStoreService: ) session.add(record) - session.flush() - session.commit() - return record except SQLAlchemyError as e: - session.rollback() raise e + + @classmethod + def bulk_upsert_epc_data(cls, session: Session, rows_to_insert: list[dict]): + if not rows_to_insert: + return + + now = datetime.now(timezone.utc) + + values = [ + { + "uprn": row["uprn"], + "epc_api": row["epc_api"], + "epc_api_created_at": now, + "epc_page": row["epc_page"], + "epc_page_rrn": row["epc_page_rrn"], + "epc_page_created_at": now if row["epc_page"] else None, + } + for row in rows_to_insert + ] + + insert_stmt = insert(EpcStore).values(values) + + stmt = insert_stmt.on_conflict_do_update( + index_elements=["uprn"], + set_={ + "epc_api": insert_stmt.excluded.epc_api, + "epc_api_created_at": insert_stmt.excluded.epc_api_created_at, + "epc_page": insert_stmt.excluded.epc_page, + "epc_page_rrn": insert_stmt.excluded.epc_page_rrn, + "epc_page_created_at": insert_stmt.excluded.epc_page_created_at, + }, + ) + + session.execute(stmt) + session.commit() diff --git a/backend/app/db/functions/funding_functions.py b/backend/app/db/functions/funding_functions.py index 51dffa21..df36d308 100644 --- a/backend/app/db/functions/funding_functions.py +++ b/backend/app/db/functions/funding_functions.py @@ -1,5 +1,6 @@ from sqlalchemy.orm import Session from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import insert from backend.app.db.models.funding import FundingPackage, FundingPackageMeasures @@ -69,3 +70,72 @@ def upload_funding(session: Session, p, plan_id, recommendations_to_upload): session.rollback() print(f"An error occurred: {e}") return False + + +def bulk_upload_funding_packages( + session: Session, + funding_payload: list[dict], +): + """ + Bulk upload: + - funding_package + - funding_package_measures + + Assumes caller manages the transaction. + """ + + if not funding_payload: + return + + # --------------------------------------------------------- + # 1. Prepare funding package rows + # --------------------------------------------------------- + funding_rows = [] + measures_by_index = [] + + for f in funding_payload: + funding_rows.append({ + "plan_id": f["plan_id"], + "scheme": f["scheme"], + "project_funding": f["project_funding"], + "total_uplift": f["total_uplift"], + "full_project_score": f["full_project_score"], + "partial_project_score": f["partial_project_score"], + "uplift_project_score": f["uplift_project_score"], + }) + + measures_by_index.append(f.get("measures", [])) + + # --------------------------------------------------------- + # 2. Insert funding packages and get IDs + # --------------------------------------------------------- + result = session.execute( + insert(FundingPackage) + .values(funding_rows) + .returning(FundingPackage.id) + ) + + funding_package_ids = [row[0] for row in result] + + # --------------------------------------------------------- + # 3. Insert funding package measures + # --------------------------------------------------------- + measures_rows = [] + + for funding_package_id, measures in zip( + funding_package_ids, measures_by_index + ): + for m in measures: + measures_rows.append({ + "funding_package_id": funding_package_id, + "measure": m["measure"], + "material_id": m["material_id"], + "innovation_uplift": m["innovation_uplift"], + "partial_project_score": m["partial_project_score"], + "uplift_project_score": m["uplift_project_score"], + }) + + if measures_rows: + session.execute( + insert(FundingPackageMeasures).values(measures_rows) + ) diff --git a/backend/app/db/functions/inspections_functions.py b/backend/app/db/functions/inspections_functions.py index d66154cb..b1c1eeb5 100644 --- a/backend/app/db/functions/inspections_functions.py +++ b/backend/app/db/functions/inspections_functions.py @@ -2,7 +2,6 @@ import re from dataclasses import dataclass, asdict from typing import Optional, Dict, Any, Type, TypeVar from sqlalchemy.orm import Session -from datetime import timezone from enum import Enum from datetime import datetime, timedelta @@ -24,7 +23,6 @@ from backend.app.db.models.inspections import ( InspectionsCladding, InspectionsAccessIssues, ) -from sqlalchemy.dialects.postgresql import insert NON_INTRUSIVE_PREFIX = "non-intrusives:" diff --git a/backend/app/db/functions/property_functions.py b/backend/app/db/functions/property_functions.py index fc49d205..99cc8ed7 100644 --- a/backend/app/db/functions/property_functions.py +++ b/backend/app/db/functions/property_functions.py @@ -3,12 +3,16 @@ ### import datetime import pytz +from sqlalchemy import select, or_, bindparam, update from sqlalchemy.orm import Session +from sqlalchemy.orm.exc import NoResultFound +from sqlalchemy.dialects.postgresql import insert + +from backend.addresses.Address import Address from backend.app.db.models.portfolio import ( PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel, PropertyDetailsSpatial ) -from sqlalchemy.orm.exc import NoResultFound def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str, @@ -203,3 +207,162 @@ def update_or_create_property_spatial_details(session: Session, uprn: int, prope session.flush() return True + + +def get_existing_properties(session, portfolio_id, uprns, landlord_ids): + """ + Bulk method for checking for existing properties + :param session: + :param portfolio_id: + :param uprns: + :param landlord_ids: + :return: + """ + return ( + session.exec( + select(PropertyModel) + .where(PropertyModel.portfolio_id == portfolio_id) + .where( + or_( + PropertyModel.uprn.in_(uprns), + PropertyModel.landlord_property_id.in_(landlord_ids), + ) + ) + ) + .scalars() + .all() + ) + + +def bulk_create_properties( + session, + body, + addresses: list[Address], # these are *new* addresses + energy_assessment_by_uprn: dict[int, dict], +): + rows = [] + + for addr in addresses: + energy_assessment = energy_assessment_by_uprn.get(addr.uprn, {}) + status = ( + PortfolioStatus.ASSESSMENT.value + if not energy_assessment.get("epc") + else PortfolioStatus.SURVEY.value + ) + + rows.append( + { + "address": addr.address1, + "postcode": addr.postcode, + "portfolio_id": body.portfolio_id, + "uprn": addr.uprn, + "landlord_property_id": addr.landlord_property_id, + "creation_status": PropertyCreationStatus.LOADING, + "status": status, + "has_pre_condition_report": False, + "has_recommendations": False, + } + ) + + if not rows: + return [] + + stmt = ( + insert(PropertyModel) + .values(rows) + .on_conflict_do_nothing( + index_elements=["portfolio_id", "uprn"], + index_where=PropertyModel.uprn.isnot(None), + ) + .returning( + PropertyModel.id, + PropertyModel.uprn, + PropertyModel.landlord_property_id, + ) + ) + + result = session.execute(stmt) + session.flush() + + return result.fetchall() + + +def bulk_update_properties(session: Session, property_updates: list[dict]): + if not property_updates: + return + + now = datetime.datetime.now(pytz.utc) + + stmt = ( + update(PropertyModel.__table__) + .where( + PropertyModel.id == bindparam("b_id"), + PropertyModel.portfolio_id == bindparam("b_portfolio_id"), + ) + .values( + **{k: bindparam(k) for k in property_updates[0]["data"].keys()}, + updated_at=now, + ) + ) + + payload = [ + { + "b_id": row["property_id"], # renamed bind param + "b_portfolio_id": row["portfolio_id"], + **row["data"], + } + for row in property_updates + ] + + session.execute( + stmt, + payload, + execution_options={"synchronize_session": False}, + ) + + +def bulk_upsert_property_details_epc(session: Session, rows: list[dict]): + if not rows: + return + + insert_stmt = insert(PropertyDetailsEpcModel).values(rows) + + update_cols = { + col.name: insert_stmt.excluded[col.name] + for col in PropertyDetailsEpcModel.__table__.columns + if col.name not in ("id",) + } + + stmt = insert_stmt.on_conflict_do_update( + index_elements=["portfolio_id", "property_id"], + set_=update_cols, + ) + + session.execute(stmt) + + +def bulk_upsert_property_spatial(session: Session, rows: list[dict]): + if not rows: + return + + values = [] + for row in rows: + values.append({ + "uprn": row["uprn"], + **row["data"], + }) + + insert_stmt = insert(PropertyDetailsSpatial).values(values) + + update_cols = { + col.name: insert_stmt.excluded[col.name] + for col in PropertyDetailsSpatial.__table__.columns + if col.name not in ("id", "uprn") + } + + stmt = insert_stmt.on_conflict_do_update( + index_elements=["uprn"], + set_=update_cols, + ) + + session.execute(stmt) diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index 14596749..5b39f86e 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -96,27 +96,47 @@ def create_plan(session: Session, plan): raise e -def create_scenario(session: Session, scenario): - """ - This function will create a record for the scenario in the database if it does not exist. - :param session: The database session - :param scenario: dictionary of data representing a scenario to be created - """ - try: +def bulk_create_plans(session: Session, plans_to_create: list[dict]) -> dict[int, int]: + if not plans_to_create: + return {} - # Before creating a new scenario, we check if there is a scenario for this portfolio id already - # If there is, it means that any new scnario created will NOT be the default scenario - existing_scenario = session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first() - scenario["is_default"] = True if not existing_scenario else False + payload = [ + { + "property_id": p["property_id"], + **p["plan_data"], + } + for p in plans_to_create + ] - new_scenario = Scenario(**scenario) - session.add(new_scenario) - session.flush() - session.commit() - return new_scenario - except SQLAlchemyError as e: - session.rollback() - raise e + stmt = ( + insert(Plan) + .values(payload) + .returning(Plan.id, Plan.property_id) + ) + + result = session.execute(stmt).all() + + # property_id -> plan_id + return {row.property_id: row.id for row in result} + + +def create_scenario(session: Session, scenario: dict) -> int: + existing_scenario = ( + session.query(Scenario) + .filter_by(portfolio_id=scenario["portfolio_id"]) + .first() + ) + + scenario["is_default"] = not bool(existing_scenario) + + new_scenario = Scenario(**scenario) + session.add(new_scenario) + session.flush() # ensures ID is populated + + scenario_id = new_scenario.id + session.commit() + + return scenario_id def create_recommendation(session: Session, recommendation): @@ -237,6 +257,94 @@ def upload_recommendations(session: Session, recommendations_to_upload, property return False +def bulk_upload_recommendations_and_materials( + session: Session, + recommendation_payload: list[dict], +): + if not recommendation_payload: + return + + # --------------------------------------------------------- + # 1. Prepare recommendation rows + # --------------------------------------------------------- + recommendation_rows = [] + parts_by_index = [] + plan_ids_by_index = [] + + for rec in recommendation_payload: + recommendation_rows.append({ + "property_id": rec["property_id"], + "type": rec["type"], + "measure_type": rec["measure_type"], + "description": rec["description"], + "estimated_cost": rec["estimated_cost"], + "default": rec["default"], + "starting_u_value": rec["starting_u_value"], + "new_u_value": rec["new_u_value"], + "sap_points": rec["sap_points"], + "heat_demand": rec["heat_demand"], + "kwh_savings": rec["kwh_savings"], + "co2_equivalent_savings": rec["co2_equivalent_savings"], + "energy_savings": rec["energy_savings"], + "energy_cost_savings": rec["energy_cost_savings"], + "total_work_hours": rec["total_work_hours"], + "labour_days": rec["labour_days"], + "already_installed": rec["already_installed"], + }) + + parts_by_index.append(rec["parts"]) + plan_ids_by_index.append(rec["plan_id"]) + + # --------------------------------------------------------- + # 2. Insert recommendations and get IDs + # --------------------------------------------------------- + result = session.execute( + insert(Recommendation) + .values(recommendation_rows) + .returning(Recommendation.id) + ) + + recommendation_ids = [row[0] for row in result] + + # --------------------------------------------------------- + # 3. Insert recommendation materials + # --------------------------------------------------------- + materials_rows = [] + + for recommendation_id, parts in zip(recommendation_ids, parts_by_index): + for part in parts: + materials_rows.append({ + "recommendation_id": recommendation_id, + "material_id": part["material_id"], + "depth": part["depth"], + "quantity": part["quantity"], + "quantity_unit": part["quantity_unit"], + "estimated_cost": part["estimated_cost"], + }) + + if materials_rows: + session.execute( + insert(RecommendationMaterials).values(materials_rows) + ) + + # --------------------------------------------------------- + # 4. Insert plan ↔ recommendation links + # --------------------------------------------------------- + plan_recommendation_rows = [ + { + "plan_id": plan_id, + "recommendation_id": recommendation_id, + } + for plan_id, recommendation_id in zip( + plan_ids_by_index, recommendation_ids + ) + ] + + session.execute( + insert(PlanRecommendations).values(plan_recommendation_rows) + ) + + def chunked(iterable, size=100): for i in range(0, len(iterable), size): yield iterable[i:i + size] diff --git a/backend/app/plan/utils.py b/backend/app/plan/utils.py index 717638cf..52e2b0c4 100644 --- a/backend/app/plan/utils.py +++ b/backend/app/plan/utils.py @@ -1,9 +1,9 @@ +import ast import os -import time import msgpack from uuid import UUID -from typing import Any from utils.s3 import read_from_s3 +from backend.addresses.Address import Address from backend.app.config import get_settings from backend.app.plan.data_classes import PropertyRequestData from backend.app.db.functions.tasks.Tasks import SubTaskInterface @@ -52,21 +52,20 @@ def patch_epc(patch, epc_records): def extract_property_request_data( - config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn + address: Address, patches, already_installed, non_invasive_recommendations, valuation_data, uprn ): patch_has_uprn = "uprn" in patches[0] if patches else True if patch_has_uprn: patch = next(( - x for x in patches if str(x["uprn"]) == str(config["uprn"]) + x for x in patches if str(x["uprn"]) == str(address.uprn) ), {}) else: patch = next(( - x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode) ), {}) property_already_installed = next(( - x for x in already_installed if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + x for x in already_installed if (x["address"] == address.address) and (x["postcode"] == address.postcode) ), []) # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN @@ -85,7 +84,7 @@ def extract_property_request_data( else: property_non_invasive_recommendations = next(( x for x in non_invasive_recommendations if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + (x["address"] == address.address) and (x["postcode"] == address.postcode) ), {}) if isinstance(property_non_invasive_recommendations.get("recommendations"), str): @@ -114,7 +113,7 @@ def extract_property_request_data( else: property_valuation = next(( float(x["valuation"]) for x in valuation_data if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + (x["address"] == address.address) and (x["postcode"] == address.postcode) ), None) # Return data class to give a structured format @@ -126,14 +125,14 @@ def extract_property_request_data( ) -def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[ +def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[ None, None, None, list]: - solar_identification = config.get("solar_reason", None) - cavity_identification = config.get("cavity_reason", None) + solar_identification = addr.solar_reason + cavity_identification = addr.cavity_reason if not solar_identification and not cavity_identification: return None, None, None, [] - landlord_heating_system = config["landlord_heating_system"] + landlord_heating_system = addr.landlord_heating_system # This is the initial version of tackling "already installed" measures already_installed = [] if landlord_heating_system == "air source heat pump": diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 0c6ed1de..14087f83 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -23,7 +23,6 @@ from backend.app.db.connection import db_engine import backend.app.db.functions as db_funcs from backend.app.db.functions.tasks.Tasks import SubTaskInterface -from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES from backend.app.plan.utils import ( get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url @@ -34,6 +33,7 @@ import backend.app.assumptions as assumptions from backend.ml_models.api import ModelApi from backend.Property import Property from backend.apis.GoogleSolarApi import GoogleSolarApi +from backend.addresses.Addresses import Addresses from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser @@ -45,7 +45,7 @@ from etl.bill_savings.KwhData import KwhData from etl.spatial.OpenUprnClient import OpenUprnClient from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc -from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths +from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, optimise_with_scenarios from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value from utils.logger import setup_logger @@ -368,24 +368,6 @@ def get_funding_data(): return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes -def parse_heating_system(config): - """ - Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited, - placeholder function to cover some initial immediate cases. - :return: - """ - - ll_heating = config.get("landlord_heating_system", None) - if not ll_heating: - return None - - if ll_heating == "electric storage heaters": - # Return with the same format at the EPC - return "Electric storage heaters" - - return None - - def check_duplicate_uprns(plan_input): """ Simple function to check if the input data contains duplicated UPRNS. @@ -426,6 +408,13 @@ def check_duplicate_property_ids(input_properties): # de-dupe input_uprns raise ValueError(f"Duplicate property IDs in the input data: {duplicates}") + # Check for dupe UPRNS + input_uprns = [x.uprn for x in input_properties if x.uprn is not None] + if input_uprns: + if len(input_uprns) != len(set(input_uprns)): + duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1]) + raise ValueError(f"Duplicate UPRNs in the input properties: {duplicates}") + return True @@ -682,71 +671,108 @@ async def model_engine(body: PlanTriggerRequest): bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) - input_properties, inspections_map, eco_packages = [], {}, {} - for config in tqdm(plan_input): + # Prepare input data + addresses = Addresses.from_plan_input(plan_input, body) - uprn, address1, full_address = extract_address_data(config, body) - heating_system = parse_heating_system(config) + uprns = addresses.get_uprns() + landlord_ids = addresses.get_landlord_ids() + postcodes = addresses.get_postcodes_for_flats() - # ---------- 1) fetch data ---------- - epc_api_data, epc_page, rrn, epc_cache = None, None, None, {} - with db_read_session() as session: - epc_cache = {} - if uprn: - epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn) + # Check if we've seen these properties before + with db_read_session() as session: + existing_properties = db_funcs.property_functions.get_existing_properties( + session, body.portfolio_id, uprns, landlord_ids + ) + property_lookup = {} + for prop in existing_properties: + if prop.uprn: + property_lookup[("uprn", prop.uprn)] = prop.id + if prop.landlord_property_id: + property_lookup[("landlord_property_id", prop.landlord_property_id)] = prop.id - # For remote assessments of flats, we get associated UPRNs - associated_uprns = [] - if body.event_type == "remote_assessment" and config.get("property_type") == "Flat": - associated_uprns = db_funcs.address_functions.get_associated_uprns( - session, postcode=config["postcode"], uprn=uprn - ) + # List of properties that need to be created in the db + to_create = [] + for addr in addresses: + key = ("uprn", addr.uprn) if addr.uprn else ("landlord_property_id", addr.landlord_property_id) + if key not in property_lookup: + to_create.append(addr) - # We check for an energy assessment we have performed on this property: - energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn( - session, uprn + # Pre-requests to the db + with db_read_session() as session: + epc_cache_by_uprn = db_funcs.epc_functions.EpcStoreService.get_epcs_for_uprns(session, uprns) + postcode_searches = db_funcs.address_functions.get_by_postcodes(session, list(postcodes)) + energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns( + session, uprns + ) + + # If we have properties that need to be created, we cerate them in bulk + new_property_ids = set() + if to_create: + logger.info("Creating %d new properties", len(to_create)) + with db_session() as session: + inserted = db_funcs.property_functions.bulk_create_properties( + session, body, to_create, energy_assessments_by_uprn ) + for prop_id, uprn, landlord_property_id in inserted: + new_property_ids.add(prop_id) + # We append the newly created properties to property_lookup + for prop_id, uprn, landlord_property_id in inserted: + if uprn is not None: + property_lookup[("uprn", uprn)] = prop_id + if landlord_property_id: + property_lookup[("landlord_property_id", landlord_property_id)] = prop_id + + input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, [] + for addr, config in tqdm( + zip(addresses, plan_input), + total=len(addresses), + desc="Processing properties", + ): + # ---------- 1) filter fetched data ---------- + epc_cache = epc_cache_by_uprn[addr.uprn] + epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] # Extract from EPC cache if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH: epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] + # Extract associated UPRNs from the database response + associated_uprns = db_funcs.address_functions.get_associated_uprns( + postcode_searches.get(addr.postcode.upper()), uprn=addr.uprn + ) + + energy_assessment = energy_assessments_by_uprn.get(addr.uprn) + epc_searcher = SearchEpc( - address1=address1, - postcode=config["postcode"], - uprn=uprn, + address1=addr.address1, + postcode=addr.postcode, + uprn=addr.uprn, auth_token=get_settings().EPC_AUTH_TOKEN, os_api_key="", - full_address=full_address, - heating_system=heating_system, + full_address=addr.full_address, + heating_system=addr.heating_system, associated_uprns=associated_uprns ) - epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) - epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) + epc_searcher.ordnance_survey_client.built_form = addr.built_form + epc_searcher.ordnance_survey_client.property_type = addr.property_type # For the moment, our OS API access is unavailable, so we skip and interpolate epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True) epc_searcher.set_uprn_source(file_format=body.file_format) - # ---------- 2) ensure property exists ---------- - with db_session() as session: - property_id, is_new = db_funcs.property_functions.ensure_property_exists( - session, body, epc_searcher, energy_assessment, - landlord_property_id=config.get("landlord_property_id") - ) + lookup_key = ( + ("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id) + ) + property_id = property_lookup[lookup_key] - if not property_id or (not is_new and not body.multi_plan): + if not property_id: + logger.error("Could not find property ID for address: %s", addr.request_data) + # Should not happen unless input data is inconsistent continue - if is_new: - with db_session() as session: - db_funcs.property_functions.create_property_targets( - session, - property_id=property_id, - portfolio_id=body.portfolio_id, - epc_target=body.goal_value, - heat_demand_target=None - ) + is_new = property_id in new_property_ids + if not is_new and not body.multi_plan: + continue # If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that. # Otherwise, we use the newest EPC @@ -757,12 +783,12 @@ async def model_engine(body: PlanTriggerRequest): ) req_data = extract_property_request_data( - config=config, + address=addr, patches=patches, already_installed=already_installed, non_invasive_recommendations=non_invasive_recommendations, valuation_data=valuation_data, - uprn=epc_searcher.uprn, + uprn=addr.uprn, ) # Pull this out as it may get overwritten property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch @@ -776,25 +802,21 @@ async def model_engine(body: PlanTriggerRequest): epc_page=epc_page, rrn=rrn, cleaned_address=epc_searcher.address_clean, - config_address=config["address"], + config_address=addr.address, address_postal_town=epc_searcher.address_postal_town ) ) epc_records = patch_epc(patch, epc_records) - prepared_epc = EPCRecord( - epc_records=epc_records, - run_mode="newdata", - cleaning_data=cleaning_data, - ) + prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) # TODO: This is a temp function to handle a specific edge case with Peabody. We should # factor this into EPCRecord as part of the cleaning however we need some more testing prepared_epc = averages_cleaning(prepared_epc, cleaning_data) # If we have an ECO project, we parse the cavity/solar reasons - eco_packages[property_id] = parse_eco_packages(config, prepared_epc) + eco_packages[property_id] = parse_eco_packages(addr, prepared_epc) # Final step - extract inspections data, if we have it - we inject into property for usage property_inspections = db_funcs.inspections_functions.extract_inspection_data(config) @@ -804,6 +826,7 @@ async def model_engine(body: PlanTriggerRequest): input_properties.append( Property( id=property_id, + uprn=addr.uprn, is_new=is_new, address=epc_searcher.address_clean, postcode=epc_searcher.postcode_clean, @@ -822,30 +845,31 @@ async def model_engine(body: PlanTriggerRequest): # 2) A real EPC # 3) A UPRN (meaning that a UPRN could be fetched against that property) # We store this data - with db_session() as session: - if db_funcs.epc_functions.EpcStoreService.check_insert_needed( - epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn - ): - # We store the EPC data we have found for this property - db_funcs.epc_functions.EpcStoreService.upsert_epc_data( - session=session, - uprn=epc_searcher.uprn, - epc_api=epc_searcher.data, - epc_page=epc_page_source.get("page_source"), - epc_page_rrn=epc_page_source.get("rrn"), - ) + uprn_to_check_against = addr.uprn if addr.uprn is not None else epc_searcher.uprn # Until we enforce uprn + if db_funcs.epc_functions.EpcStoreService.check_insert_needed( + epc_cache, epc_searcher.newest_epc.get("estimated"), uprn_to_check_against, + ): + epc_upserts.append({ + "uprn": uprn_to_check_against, + "epc_api": epc_searcher.data, + "epc_page": epc_page_source.get("page_source"), + "epc_page_rrn": epc_page_source.get("rrn"), + }) if not input_properties: return Response(status_code=204) check_duplicate_property_ids(input_properties) + logger.info("Inserting property data") + # We now bulk upload all of the EPC data + with db_session() as session: + db_funcs.epc_functions.EpcStoreService.bulk_upsert_epc_data(session, epc_upserts) + # We check if we have inspections data and store it in the database if so. We'll update or create # aginst each property if - if inspections_map: - logger.info("Inserting inspections data") - with db_session() as session: - db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map) + with db_session() as session: + db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map) # Set up model api and warm up the lambdas model_api = ModelApi( @@ -865,7 +889,7 @@ async def model_engine(body: PlanTriggerRequest): with db_read_session() as session: materials = db_funcs.materials_functions.get_materials(session) cleaned = get_cleaned() - project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data() + # project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data() kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True) @@ -956,12 +980,12 @@ async def model_engine(body: PlanTriggerRequest): logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) + # Temp putting this here + recommendations_scoring_data["is_post_sap10_ending"] = True recommendations_scoring_data = recommendations_scoring_data.drop( - columns=[ - "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", - "carbon_ending" - ] + columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", + "carbon_ending"] ) all_predictions = await model_api.async_paginated_predictions( @@ -1018,6 +1042,7 @@ async def model_engine(body: PlanTriggerRequest): property_instance.current_energy_bill = property_current_energy_bill # Insert the predictions into the recommendations and run the optimiser + logger.info("Optimising measures") for p in input_properties: if not recommendations.get(p.id): continue @@ -1045,74 +1070,14 @@ async def model_engine(body: PlanTriggerRequest): ) gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages) - funding = Funding( - tenure=body.housing_type, - project_scores_matrix=project_scores_matrix, - partial_project_scores_matrix=partial_project_scores_matrix, - whlg_eligible_postcodes=whlg_eligible_postcodes, - eco4_social_cavity_abs_rate=13, - eco4_social_solid_abs_rate=17, - eco4_private_cavity_abs_rate=13, - eco4_private_solid_abs_rate=17, - gbis_social_cavity_abs_rate=21, - gbis_social_solid_abs_rate=25, - gbis_private_cavity_abs_rate=21, - gbis_private_solid_abs_rate=28, - ) - - li_thickness = convert_thickness_to_numeric( - p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"] - ) - current_wall_u_value = p.walls["thermal_transmittance"] - if current_wall_u_value is None: - current_wall_u_value = get_wall_u_value( - clean_description=p.walls["clean_description"], - age_band=p.age_band, - is_granite_or_whinstone=p.walls["is_granite_or_whinstone"], - is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"], - ) - # We insert the innovation uplift measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) # TODO: Turn this into a function and store the innovaiton uplift for group in measures_to_optimise_with_uplift: for r in group: - - if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating", - "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]: - ( - r["partial_project_score"], - r["partial_project_funding"], - r["innovation_uplift"], - r["uplift_project_score"], - ) = ( - 0, 0, 0, 0 - ) - continue - - ( - r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], - r["uplift_project_score"] - ) = funding.get_innovation_uplift( - measure=r, - starting_sap=int(p.data["current-energy-efficiency"]), - floor_area=p.floor_area, - is_cavity=p.walls["is_cavity_wall"], - current_wall_uvalue=current_wall_u_value, - is_partial="partial" in p.walls["clean_description"].lower(), - existing_li_thickness=li_thickness, - mainheating=p.main_heating, - main_fuel=p.main_fuel, - mainheat_energy_eff=p.data["mainheat-energy-eff"], - ) - - if r["already_installed"]: - # if already installed, we zero out the uplift and funding - (r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], - r["uplift_project_score"]) = ( - 0, 0, 0, 0 - ) + (r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], + r["uplift_project_score"]) = (0, 0, 0, 0) input_measures = optimiser_functions.prepare_input_measures( measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True, @@ -1122,62 +1087,36 @@ async def model_engine(body: PlanTriggerRequest): # When the goal is Increasing EPC, we can run the funding optimiser if body.goal == "Increasing EPC": - solutions = optimise_with_funding_paths( + solutions = optimise_with_scenarios( p=p, input_measures=input_measures, - housing_type=body.housing_type, budget=body.budget, target_gain=gain, - funding=funding, - work_package=eco_packages[p.id][2] + enforce_heat_pump_insulation=True, + enforce_fabric_first=False ) # if handle the empty case if solutions.empty: - scheme = "none" - funded_measures, solution = [], [] - ( - project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score, - battery_sap_score - ) = 0, 0, 0, 0, 0, 0 + solution, battery_sap_score = [], 0 else: - solutions = solutions[ - (solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none") - ] if solutions["meets_upgrade_target"].any(): # If we have a solution that meets the upgrade target, we select that one optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] else: - # Pick the cheapest + # We re-organise, taking the solution with the most gain and then the cheapest + solutions = solutions.sort_values( + by=["total_gain", "total_cost"], ascending=[False, True] + ) optimal_solution = solutions.iloc[0] - # This is the list of measures that we will recommend - scheme = optimal_solution["scheme"] - # We create this full list of selected measures, which is used in the next section for setting # default measures - solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"]) - funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else [] - - # This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£) - project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \ - optimal_solution["partial_project_funding"] - # This is the total amount of funding associated to the uplift (£) - total_uplift = optimal_solution["total_uplift"] - # This is the funding scheme selected - # This is the full project ABS - full_project_score = optimal_solution["project_score"] - # This is the partial project ABS - partial_project_score = optimal_solution["partial_project_score"] - # This is the uplift score ABS - uplift_project_score = optimal_solution["total_uplift_score"] - # This is the SAP score associated to a battery - pv_size = next( - (m["array_size"] for m in optimal_solution["items"] if m["type"] == "solar_pv"), 0 - ) + solution = deepcopy(optimal_solution["items"]) + pv_size = float(optimal_solution["array_size"]) battery_sap_score = BatterySAPScorer.score( - starting_sap=optimal_solution["ending_sap"], pv_size=pv_size + starting_sap=optimal_solution["ending_sap_without_battery"], pv_size=pv_size ) else: # We optimise and then we determine eligibility for funding, based on the measures selected @@ -1192,52 +1131,6 @@ async def model_engine(body: PlanTriggerRequest): gain = optimiser.solution_gain post_sap = int(p.data["current-energy-efficiency"]) + gain - recommendation_types = [] - for measures in input_measures: - for measure in measures: - recommendation_types.append(measure["type"]) - recommendation_types = set(recommendation_types) - - has_wall_insulation_recommendation = any( - (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in - WALL_INSULATION_MEASURES - ) - has_roof_insulation_recommendation = any( - (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in - ROOF_INSULATION_MEASURES - ) - - funding.check_funding( - measures=solution, - starting_sap=int(p.data["current-energy-efficiency"]), - ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]), - floor_area=p.floor_area, - mainheat_description=p.main_heating["clean_description"], - heating_control_description=p.main_heating_controls["clean_description"], - is_cavity=p.walls["is_cavity_wall"], - current_wall_uvalue=current_wall_u_value, - is_partial="partial" in p.walls["clean_description"].lower(), - existing_li_thickness=li_thickness, - mainheating=p.main_heating, - main_fuel=p.main_fuel, - mainheat_energy_eff=p.data["mainheat-energy-eff"], - has_wall_insulation_recommendation=has_wall_insulation_recommendation, - has_roof_insulation_recommendation=has_roof_insulation_recommendation, - ) - - # Determine the scheme - scheme = "none" - if funding.eco4_eligible: - scheme = "eco4" - if scheme == "none" and funding.gbis_eligible: - scheme = "gbis" - - funded_measures = solution if scheme in ["gbis", "eco4"] else [] - project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs - total_uplift = funding.eco4_uplift - full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs - partial_project_score = funding.partial_project_abs - uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift pv_size = next( (m["array_size"] for m in solution if m["type"] == "solar_pv"), 0 ) @@ -1258,21 +1151,6 @@ async def model_engine(body: PlanTriggerRequest): p.id, recommendations, selected, battery_sap_score ) - # TODO: functionise - for measure in funded_measures: - if "+mechanical_ventilation" in measure["type"]: - measure["type"] = measure["type"].split("+mechanical_ventilation")[0] - - p.insert_funding( - scheme=scheme, - funded_measures=funded_measures, - project_funding=project_funding, - total_uplift=total_uplift, - full_project_score=full_project_score, - partial_project_score=partial_project_score, - uplift_project_score=uplift_project_score - ) - # when we have buildings, we tweak our solar PV recommendations as if one unit needs it, we apply it to all # of them # TODO: We can probably do better and optimise at the building level - this is temp @@ -1308,7 +1186,7 @@ async def model_engine(body: PlanTriggerRequest): scenario_id = body.scenario_id else: with db_session() as session: - engine_scenario = db_funcs.recommendations_functions.create_scenario( + scenario_id = db_funcs.recommendations_functions.create_scenario( session=session, scenario={ "name": body.scenario_name, @@ -1326,66 +1204,94 @@ async def model_engine(body: PlanTriggerRequest): "multi_plan": body.multi_plan } ) - scenario_id = engine_scenario.id - for i in tqdm( - range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE)) - ): - try: - # Take a slice of the input_properties list to make a batch - batch_properties = input_properties[i:i + BATCH_SIZE] - with db_session() as session: - for p in batch_properties: - recommendations_to_upload = recommendations.get(p.id, []) - default_recommendations = [r for r in recommendations_to_upload if r["default"]] - total_sap_points = sum([r["sap_points"] for r in default_recommendations]) - new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points - new_epc = sap_to_epc(new_sap_points) + property_updates, property_epc_details, property_spatial_updates = [], [], [] + plans_to_create, recommendations_to_create = [], [] - total_cost = sum([r["total"] for r in default_recommendations]) + # Prepare the data that will need to be uploaded in bulk + for p in input_properties: + recommendations_for_property = recommendations.get(p.id, []) + default_recommendations = [r for r in recommendations_for_property if r["default"]] + total_sap_points = sum([r["sap_points"] for r in default_recommendations]) + new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points + new_epc = sap_to_epc(new_sap_points) + total_cost = sum([r["total"] for r in default_recommendations]) + valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc, total_cost=total_cost) - valuations = PropertyValuation.estimate( - property_instance=p, target_epc=new_epc, total_cost=total_cost - ) + # --- property-level updates (always) --- + property_updates.append({ + "property_id": p.id, + "portfolio_id": body.portfolio_id, + "data": p.get_full_property_data(current_valuation=valuations["current_value"]) + }) - property_plan_data = db_funcs.recommendations_functions.prepare_plan_data( - p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, - default_recommendations - ) + property_epc_details.append(p.get_property_details_epc(portfolio_id=body.portfolio_id)) - property_details_epc = p.get_property_details_epc( - portfolio_id=body.portfolio_id, rating_lookup=rating_lookup, - ) - property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) - db_funcs.property_functions.create_property_details_epc(session, property_details_epc) + property_spatial_updates.append({"uprn": p.uprn, "data": p.spatial}) - db_funcs.property_functions.update_or_create_property_spatial_details( - session, p.uprn, p.spatial - ) + # --- skip plan creation if no recommendations --- + if not recommendations_for_property: + continue - db_funcs.property_functions.update_property_data( - session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data - ) + plan_data = db_funcs.recommendations_functions.prepare_plan_data( + p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations + ) + plans_to_create.append({"property_id": p.id, "plan_data": plan_data}) - if not recommendations_to_upload: - continue + # store recommendations keyed by property + for r in recommendations_for_property: + recommendations_to_create.append({ + "property_id": p.id, + # ---- Recommendation core ---- + "type": r["type"], + "measure_type": r["measure_type"], + "description": r["description"], + "estimated_cost": float(r["total"]), + "default": r["default"], + "starting_u_value": float(r["starting_u_value"]) if r.get("starting_u_value") else None, + "new_u_value": float(r["new_u_value"]) if r.get("new_u_value") else None, + "sap_points": float(r["sap_points"]), + "energy_savings": float(r["heat_demand"]), + "kwh_savings": float(r["kwh_savings"]), + "co2_equivalent_savings": float(r["co2_equivalent_savings"]), + "total_work_hours": float(r["labour_hours"]), + "energy_cost_savings": float(r["energy_cost_savings"]), + "labour_days": float(r["labour_days"]), + "already_installed": r["already_installed"], + "heat_demand": float(r["heat_demand"]), - new_plan_id = db_funcs.recommendations_functions.create_plan( - session, plan=property_plan_data - ) + # ---- parts ---- + "parts": [ + { + "material_id": part["id"], + "depth": int(part["depth"]) if part.get("depth") else None, + "quantity": float(part["quantity"]) if part.get("quantity") else None, + "quantity_unit": part.get("quantity_unit"), + "estimated_cost": float(part.get("total", part.get("total_cost"))), + } + for part in r.get("parts", []) + ], + }) - db_funcs.recommendations_functions.upload_recommendations( - session, recommendations_to_upload, p.id, new_plan_id - ) - db_funcs.funding_functions.upload_funding( - session, p, new_plan_id, recommendations_to_upload - ) + # Bulk upload property data + logger.info("Uploading property data in bulk") + with db_session() as session: + db_funcs.property_functions.bulk_update_properties(session, property_updates) + db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details) + db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates) + # # Bulk create plans + plan_id_by_property = db_funcs.recommendations_functions.bulk_create_plans(session, plans_to_create) + recommendation_payload = [ + { + "plan_id": plan_id_by_property[r["property_id"]], + **{k: v for k, v in r.items() if k not in ["parts"]}, + "parts": r["parts"], + } for r in recommendations_to_create if r["property_id"] in plan_id_by_property + ] - except Exception as e: - # Rollback the session if an error occurs - logger.warning("Failed i = %s" % str(i)) - logger.error(f"An error occurred during batch starting at index {i}: {e}") - logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}") + db_funcs.recommendations_functions.bulk_upload_recommendations_and_materials( + session, recommendation_payload + ) logger.info("Work completed, updating log status") diff --git a/recommendations/Costs.py b/recommendations/Costs.py index 1184d5ed..86062433 100644 --- a/recommendations/Costs.py +++ b/recommendations/Costs.py @@ -1,6 +1,7 @@ import numpy as np from recommendations.county_to_region import county_to_region_map from utils.logger import setup_logger +from backend.ml_models.AnnualBillSavings import AnnualBillSavings logger = setup_logger() @@ -21,25 +22,6 @@ regional_labour_variations = [ {"Region": "Northern Ireland", "Adjustment_Factor": 0.76} ] -# This data is based on the MCS database - taken the figures for June 2024 -MCS_SOLAR_PV_COST_DATA = { - "last_updated": "2024-07-10", - "average_cost_per_kwh": 1825, - "average_cost_per_kwh-Outer London": 1950, - "average_cost_per_kwh-Inner London": 1950, - "average_cost_per_kwh-South East England": 1966, - "average_cost_per_kwh-South West England": 1864, - "average_cost_per_kwh-East of England": 1719, - "average_cost_per_kwh-East Midlands": 1730, - "average_cost_per_kwh-West Midlands": 1789, - "average_cost_per_kwh-North East England": 1872, - "average_cost_per_kwh-North West England": 1860, - "average_cost_per_kwh-Yorkshire and the Humber": 1789, - "average_cost_per_kwh-Wales": 1676, - "average_cost_per_kwh-Scotland": 1781, - "average_cost_per_kwh-Northern Ireland": 1347, -} - # Installers are now working with 435 watt panels PANEL_SIZE = 0.435 @@ -61,47 +43,40 @@ INSTALLER_SOLAR_COSTS = [ {'n_panels': 18, 'array_kwp': 18 * PANEL_SIZE, 'cost': 6792.57, 'installer': 'CEG'} ] +# These are costs we received from CRG, for pricing up air source heat pumps +# These are costs that we have been provided from CRG specifically for air source heat pumps +ASHP_SMALL_SYSTEM_COST = 8812.92 # 4.8 to 8.5, based on their pricing +ASHP_LARGE_SYSTEM_COST = 11053.25 +ASHP_SECURITY = 455.00 +ASHP_WALL_BRACKET = 574.17 +ASHP_DISTRIBUTION_SYSTEM_COSTS = [ + {"n_radiators": 4, "cost": 3380.00}, + {"n_radiators": 5, "cost": 3607.50}, + {"n_radiators": 6, "cost": 4116.67}, + {"n_radiators": 7, "cost": 4647.50}, + {"n_radiators": 8, "cost": 5200.00}, + {"n_radiators": 9, "cost": 5730.83}, + {"n_radiators": 10, "cost": 6283.33}, + {"n_radiators": 11, "cost": 6857.50}, + {"n_radiators": 12, "cost": 7431.67}, + {"n_radiators": 13, "cost": 8016.67}, + {"n_radiators": 14, "cost": 8612.50}, + {"n_radiators": 15, "cost": 9219.17}, + {"n_radiators": 16, "cost": 9804.17}, + {"n_radiators": 17, "cost": 10389.17}, +] +ASHP_CYLINDER_COSTS = [ + {"capacity_l": 120, "cost": 3318.25}, + {"capacity_l": 180, "cost": 3480.75}, + {"capacity_l": 200, "cost": 3853.42}, + {"capacity_l": 250, "cost": 3961.75}, +] + # CEG uses use Solshare as an inverter to provide solar PV to multiple flats. This costs £7500 for the inverter alone # https://midsummerwholesale.co.uk/buy/solshare INSTALLER_SOLAR_PV_INVERTER_COST = 7500 INSTALLER_SOLAR_PV_INVERTER_LABOUR_COST = 500 # Just a rough guess to labour costs -# INSTALLER_SCAFFOLDING_COSTS = [ -# {'stories': 1, 'description': '1 Story Scaffold', 'cost': 531.00, 'installer': 'CEG'}, -# {'stories': 2, 'description': '2 Story Scaffold', 'cost': 841.00, 'installer': 'CEG'}, -# {'stories': 3, 'description': '3 Story Scaffold', 'cost': 1077.00, 'installer': 'CEG'} -# ] - -# This data is based on the MCS database, We use the larger figure between the 2023 and 2024 average, -# to be conservative -MCS_AIR_SOURCE_HEAT_PUMP_COST_DATA = { - "Outer London": 13220, - "Inner London": 13220, - "South East England": 13547, - "South West England": 12776, - "East of England": 12585, - "East Midlands": 12239, - "West Midlands": 13182, - "North East England": 11829, - "North West England": 11714, - "Yorkshire and the Humber": 11919, - "Wales": 13701, - "Scotland": 12586, - "Northern Ireland": 12000, # There are hardly any air source heat pump installs going on in Northern Ireland -} - -INSTALLER_ASHP_COSTS = [ - {'capacity_kw': 5.0, 'brand': 'Mitsubishi', 'tank_size_liters': 150, 'cost': 10149.53, 'installer': 'CEG'}, - {'capacity_kw': 6.0, 'brand': 'Mitsubishi', 'tank_size_liters': 170, 'cost': 10823.48, 'installer': 'CEG'}, - {'capacity_kw': 8.5, 'brand': 'Mitsubishi', 'tank_size_liters': 200, 'cost': 11312.43, 'installer': 'CEG'}, - {'capacity_kw': 11.2, 'brand': 'Mitsubishi', 'tank_size_liters': 250, 'cost': 12156.75, 'installer': 'CEG'}, - {'capacity_kw': 14.0, 'brand': 'Mitsubishi', 'tank_size_liters': 300, 'cost': 14405.54, 'installer': 'CEG'}, - {'capacity_kw': 14.0, 'brand': 'Mitsubishi', 'tank_size_liters': 300, 'cost': 14405.54, 'installer': 'CEG'}, - {'capacity_kw': 17.0, 'brand': 'Grant', 'tank_size_liters': 300, 'cost': 14445.00, 'installer': 'CEG'}, - {'capacity_kw': 20.0, 'brand': 'Ecoforest', 'tank_size_liters': 400, 'cost': 21189.41, 'installer': 'CEG'}, - {'capacity_kw': None, 'brand': '2 x cascaded ASHPs', 'tank_size_liters': 500, 'cost': 22950.00, 'installer': 'CEG'} -] - INSTALLER_SOLAR_BATTERY_COSTS = [ {'capacity_kwh': 5, 'description': 'Battery Add on', 'cost': 3769.89, 'installer': 'JJC'}, # {'capacity_kwh': 10, 'description': 'Battery Add on', 'cost': 4300.00, 'installer': 'CEG'}, @@ -350,16 +325,31 @@ class Costs: total_cost = material["total_cost"] * insulation_floor_area - labour_hours = material["labour_hours_per_unit"] * insulation_floor_area - # To install suspended floor insulation, a small to medium size project might be conducted by a team of 3 - # people - labour_days = (labour_hours / 8) / 3 + # We assume the average house takes ~7 days to complete at £300/day incl. VAT, as per checkatrade + # which can be seen here: https://www.checkatrade.com/blog/cost-guides/floor-insulation-cost + # Assumptions + base_days = 7 # The quickest it will be completed + base_area = 45 # The area that can be completed in that time (for a typical 90m2 house) + labour_exponent = 0.85 # Non-linear scaling + daily_labour_rate = 300 # Based on checkatrade + + min_days = 3 # Fewest days it will take + labour_days = max( + min_days, + base_days * (insulation_floor_area / base_area) ** labour_exponent + ) + + labour_cost = labour_days * daily_labour_rate + + total_cost = total_cost + labour_cost + + total_cost = round(total_cost) return { "total": total_cost, "contingency": self.CONTINGENCIES["solid_floor_insulation"] * total_cost, "contingency_rate": self.CONTINGENCIES["solid_floor_insulation"], - "labour_hours": labour_hours, + "labour_hours": labour_days * 8, "labour_days": labour_days, } @@ -838,32 +828,55 @@ class Costs: "labour_days": labour_days, } - def air_source_heat_pump(self, ashp_size): - """ - Based on the region and type of property, this function will produce a cost estimation for an air source heat - pump. This cost will include the boiler upgrade scheme grant - - """ - # This is the average cost of a project, we'll add some additional contingency - - if ashp_size is None: - cost = [x for x in INSTALLER_ASHP_COSTS if x["capacity_kw"] is None][0]["cost"] + @staticmethod + def _select_cylinder_capacity(occupants: float): + if occupants <= 2: + return 120 + elif occupants <= 3: + return 180 + elif occupants <= 4: + return 200 else: - cost = [x for x in INSTALLER_ASHP_COSTS if x][0]["cost"] + return 250 - # The costs from installers exclude VAT - vat = cost * self.VAT_RATE - cost = cost + vat + def air_source_heat_pump(self, ashp_size: float, number_heated_rooms: int, total_floor_area: float) -> dict: + """ + We produce a cost estimation for an air source heat pump, based on costs we have received from installers. - # We assume 5 days installation - labour_days = 5 - labour_hours = labour_days * 8 + """ + + system_cost = ( + (ASHP_SMALL_SYSTEM_COST if ashp_size <= 8.5 else ASHP_LARGE_SYSTEM_COST) + ASHP_SECURITY + ASHP_WALL_BRACKET + ) + + available_n_rads = [x["n_radiators"] for x in ASHP_DISTRIBUTION_SYSTEM_COSTS] + if number_heated_rooms < min(available_n_rads): + # We use the smallest value + rads_to_use = min(available_n_rads) + elif number_heated_rooms > max(available_n_rads): + # We use the largest value + rads_to_use = max(available_n_rads) + else: + rads_to_use = int(number_heated_rooms) + + distribution_system_cost = [ + x for x in ASHP_DISTRIBUTION_SYSTEM_COSTS if x["n_radiators"] == rads_to_use + ][0]["cost"] + + # Cylinder cost + est_n_occupants = AnnualBillSavings.calculate_occupants(total_floor_area) + cylinder_capacity = self._select_cylinder_capacity(est_n_occupants) + cylinder_cost = [ + x for x in ASHP_CYLINDER_COSTS if x["capacity_l"] == cylinder_capacity + ][0]["cost"] + + total = system_cost + distribution_system_cost + cylinder_cost return { - "total": cost, - "contingency": cost * self.CONTINGENCIES["air_source_heat_pump"], + "total": total, + "contingency": total * self.CONTINGENCIES["air_source_heat_pump"], "contingency_rate": self.CONTINGENCIES["air_source_heat_pump"], - "vat": vat, - "labour_hours": labour_hours, - "labour_days": labour_days, + "vat": 0, + "labour_hours": 80, + "labour_days": 10, } diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py index c5aa8b38..b1f6205c 100644 --- a/recommendations/HeatingRecommender.py +++ b/recommendations/HeatingRecommender.py @@ -526,14 +526,14 @@ class HeatingRecommender: # 1) Best available path: HLP → direct peak if heat_loss_parameter_W_per_m2K is not None: peak_kw = heat_loss_parameter_W_per_m2K * floor_area_m2 * ΔT / 1000.0 - return (peak_kw, peak_kw) # no range needed + return peak_kw, peak_kw # no range needed # 2) Second-best: space-heating demand → HDD method if space_heat_kwh_per_m2_yr is not None: annual_space_kwh = space_heat_kwh_per_m2_yr * floor_area_m2 Htot = annual_space_kwh * 1000.0 / (hdd_base_dd * 24.0) # W/K peak_kw = Htot * ΔT / 1000.0 - return (peak_kw, peak_kw) + return peak_kw, peak_kw # 3) Minimal inputs: primary energy + assumed fraction → range assert epc_primary_kwh_per_m2_yr is not None @@ -547,7 +547,7 @@ class HeatingRecommender: low = to_peak(space_heat_fraction_range[0]) high = to_peak(space_heat_fraction_range[1]) - return (low, high) + return low, high @staticmethod def pick_model(peak_kw_range, models_kw=(5, 6, 8.5, 11.2, 14, 17, 20)): @@ -555,7 +555,9 @@ class HeatingRecommender: for kw in models_kw: if kw >= target: return kw - return None + + # Return the largest + return max(models_kw) def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations, _return=False): """ @@ -586,7 +588,15 @@ class HeatingRecommender: ) ashp_size = self.pick_model(estimated_load) - ashp_costs = self.costs.air_source_heat_pump(ashp_size) + number_heated_rooms = self._estimate_n_heated_rooms() + # We now adjust this depending on the floor area to get number of communcal rooms (e.g. hallways) + communal_heated_rooms = self._estimate_n_communal_heated_rooms() + + ashp_costs = self.costs.air_source_heat_pump( + ashp_size, + number_heated_rooms=number_heated_rooms + communal_heated_rooms, + total_floor_area=self.property.floor_area + ) if non_intrusive_recommendation: # Update with non-intrusive recommendation if non_intrusive_recommendation.get("cost"): @@ -907,6 +917,56 @@ class HeatingRecommender: return already_has_hhr and already_has_hhr_contols + def _estimate_n_heated_rooms(self): + # If the property is off-gas and has no heating system in place, the number of heated rooms will actually + # be 0, so we use the number of rooms as the figure + number_heated_rooms = ( + self.property.data["number-heated-rooms"] if self.property.data["number-heated-rooms"] > 0 + else ( + self.property.number_of_rooms - 1 if self.property.number_of_rooms > 1 else + self.property.number_of_rooms + ) + ) + # To be conservative, we adjust if we still have 1 room + if (number_heated_rooms == 1) and (self.property.number_of_rooms > 2): + number_heated_rooms = self.property.number_of_rooms - 1 + + return number_heated_rooms + + def _estimate_n_communal_heated_rooms(self) -> int: + """ + Estimate number of communal circulation rooms (hallways / landings) that may reasonably contain a heater + """ + + # Base assumptions + base_by_type = { + "Flat": 1, + "Maisonette": 1, + "Bungalow": 1, + "House": 2, + } + + # Fallback if property type unknown + base = base_by_type.get(self.property.data["property-type"], 1) + + # Area-based adjustments + if self.property.data["property-type"] in ("Flat", "Maisonette"): + if self.property.floor_area > 90: + return base + 1 # duplex or very large flat + return base + + if self.property.data["property-type"] == "Bungalow": + if self.property.floor_area > 100: + return base + 1 # secondary corridor + return base + + if self.property.data["property-type"] == "House": + if self.property.floor_area > 140: + return base + 1 # extra landing / circulation + return base + + return base + def recommend_hhr_storage_heaters(self, phase, system_change, heating_controls_only, _return=False): """ We will recommend upgrading to a high heat retention storage system, if the current system is not already @@ -1010,18 +1070,7 @@ class HeatingRecommender: else: heating_simulation_config["hot_water_energy_eff_ending"] = self.property.data["hot-water-energy-eff"] - # If the property is off-gas and has no heating system in place, the number of heated rooms will actually - # be 0, so we use the number of rooms as the figure - number_heated_rooms = ( - self.property.data["number-heated-rooms"] if self.property.data["number-heated-rooms"] > 0 - else ( - self.property.number_of_rooms - 1 if self.property.number_of_rooms > 1 else - self.property.number_of_rooms - ) - ) - # To be conservative, we adjust if we still have 1 room - if (number_heated_rooms == 1) and (self.property.number_of_rooms > 2): - number_heated_rooms = self.property.number_of_rooms - 1 + number_heated_rooms = self._estimate_n_heated_rooms() # We focus on the 700 watt product hhrsh_product = next((x for x in self.hhrsh_products if x["size"] == 700), {}) diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index f984acc3..0b3d1635 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -615,7 +615,7 @@ class Recommendations: if metric == "sap": property_phase_impact[metric] = round(property_phase_impact[metric], 2) else: - # We prevent these from being positive + # We prevent mechanical ventilation from being positive property_phase_impact[metric] = ( 0 if property_phase_impact[metric] > 0 else property_phase_impact[metric] ) @@ -632,6 +632,38 @@ class Recommendations: property_phase_impact["carbon"], rec["co2_equivalent_savings"] ) + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + current_phase_values["carbon"] = previous_phase_values["carbon"] - property_phase_impact["carbon"] + + # We also ensure that mechanical ventilation doesn't have an ovely strong negative SAP impact + if rec["type"] == "mechanical_ventilation": + # ventilation is capped by having no greater and a -4 impact + ventilation_sap_limit = -4 + + def _check_veniltation_out_of_bounds(sap_impact): + return (sap_impact < ventilation_sap_limit) or (sap_impact >= 0) + + def _adjust_ventilation_sap(sap_impact): + if sap_impact >= 0: + return -1 + if sap_impact < ventilation_sap_limit: + return ventilation_sap_limit + + ventilation_out_of_bounds = _check_veniltation_out_of_bounds(property_phase_impact["sap"]) + + if ventilation_out_of_bounds: + previous_modelled_sap = previous_phase_values.get("sap_prediction", 0) + proposed_sap_impact = current_phase_sap - previous_modelled_sap + proposal_out_of_bounds = _check_veniltation_out_of_bounds(proposed_sap_impact) + if proposal_out_of_bounds: + property_phase_impact["sap"] = _adjust_ventilation_sap(proposed_sap_impact) + else: + property_phase_impact["sap"] = proposed_sap_impact + + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] + if rec["type"] == "loft_insulation": # When we have a loft insulation recommendation, where there is an extension and the existing # amount of loft insulation is already good, we limit the SAP points @@ -642,6 +674,8 @@ class Recommendations: ) if li_sap_limit is not None: property_phase_impact["sap"] = min(property_phase_impact["sap"], li_sap_limit) + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] if rec["type"] == "solar_pv": # We use the SAP points in the recommendation as a minimum @@ -649,6 +683,8 @@ class Recommendations: rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else property_phase_impact["sap"] ) + # Update the current phase values + current_phase_values["sap"] = previous_phase_values["sap"] + property_phase_impact["sap"] # Insert this information into the recommendation. if not rec.get("survey", False): @@ -669,7 +705,8 @@ class Recommendations: "representative": rec["recommendation_id"] in representative_ids, "recommendation_id": rec["recommendation_id"], "measure_type": rec["measure_type"], - **current_phase_values + **current_phase_values, + "sap_prediction": phase_energy_efficiency_metrics["sap_change"] } ) diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index c54c00d9..328f1ab8 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -168,7 +168,7 @@ class WallRecommendations(Definitions): ): return - if u_value: + if u_value is not None: if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT: raise NotImplementedError( diff --git a/recommendations/optimiser/funding_optimiser.py b/recommendations/optimiser/funding_optimiser.py index a8b998ae..083b5e99 100644 --- a/recommendations/optimiser/funding_optimiser.py +++ b/recommendations/optimiser/funding_optimiser.py @@ -10,6 +10,7 @@ In the future, we will adapt this into a class-based structure to allow for more from copy import deepcopy import pandas as pd import numpy as np +from itertools import product from backend.app.plan.schemas import ( WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, ECO4_ELIGIBILE_FABRIC_MEASURES @@ -587,6 +588,288 @@ def optimise_with_funding_paths( return solutions +def build_heat_pump_paths( + remaining_wall_measures, + remaining_roof_measures, +): + """ + Build AND-paths using cartesian products. + + Rules: + - Always include air_source_heat_pump + - Choose 1 wall measure if any exist + - Choose 1 roof measure if any exist + """ + + # If a category is empty, use [None] so product still works + wall_choices = remaining_wall_measures or [None] + roof_choices = remaining_roof_measures or [None] + + paths = [] + + for wall, roof in product(wall_choices, roof_choices): + parts = [] + + if wall is not None: + parts.append(wall) + if roof is not None: + parts.append(roof) + + parts.append("air_source_heat_pump") + + paths.append({"AND": parts}) + + return paths + + +def exclude_measure_types(input_measures, excluded_types): + excluded = set(excluded_types) + filtered = [] + + for group in input_measures: + kept = [ + opt for opt in group + if opt["type"] not in excluded + ] + if kept: + filtered.append(kept) + + return filtered + + +def optimise_with_scenarios( + p, + input_measures, + budget=None, + target_gain=None, + enforce_heat_pump_insulation=True, + enforce_fabric_first=False +): + """ + Scenario-based optimiser (funding-agnostic). + + Currently implemented scenarios: + 1) With air source heat pump AND required insulation + """ + + solutions = [] + paths = [] + # Produce the unique list of measure types + all_measure_types = [] + for inputs in input_measures: + all_measure_types.extend([x["type"] for x in inputs]) + all_measure_types = list(set(all_measure_types)) + + # We modify the solar PV gain, if there is a battery, to include an estimated SAP battery uplift, should + # the property hit the upgrade target, plus 1. We add the additional 1 because the higher the starting SAP, + # the lower the battery SAP uplift, so this is a conservative approach since the true SAP score is + # re-calculated later on. + optimisation_measures = deepcopy(input_measures) + for measures in optimisation_measures: + if measures[0]["type"] == "solar_pv": + for x in measures: + if x["has_battery"]: + x["battery_gain"] = BatterySAPScorer.score( + starting_sap=int(p.data["current-energy-efficiency"]) + target_gain + 1, + pv_size=x["array_size"] + ) + x["gain"] += x["battery_gain"] + + if enforce_fabric_first: + # If this is true, it means we only want to consider a fabric first approach. This means that + # - We treat the fabric of the house first + # - Only once the fabric has been upgraded, do we consider heating upgrades + + # This should be wall insulation, roof insulation, floor insulation and windows + fabric_measures = WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES + ECO4_ELIGIBILE_FABRIC_MEASURES + + fabric_only_measures = [ + [opt for opt in group if opt["type"] in fabric_measures] for group in optimisation_measures + ] + fabric_only_measures = [g for g in fabric_only_measures if g] + + if not fabric_only_measures: + # If we have no fabric measures, it means the work has already been done and we can proceed + # straight to heating optimisation + picked_fabric, fabric_cost, fabric_gain = [], 0, 0 + else: + picked_fabric, fabric_cost, fabric_gain = run_optimizer( + input_measures=fabric_only_measures, + budget=budget, + sub_target_gain=target_gain, + # If we can achieve the target gain with just insulation measures, we're done + ) + + picked_fabric_types = {m["type"] for m in picked_fabric} + + remaining_measures = [] + for group in optimisation_measures: + kept = [m for m in group if m["type"] not in picked_fabric_types] + if kept: + remaining_measures.append(kept) + + picked_extra, extra_cost, extra_gain = run_optimizer( + remaining_measures, + budget=budget - fabric_cost if budget is not None else None, + sub_target_gain=( + target_gain - fabric_gain + if target_gain is not None + else None + ) + ) + + if picked_extra is None: + picked_extra, extra_cost, extra_gain = [], 0, 0 + + solutions.append({ + "scenario": "fabric_first", + "items": picked_fabric + picked_extra, + "fixed_items": picked_fabric, + "total_cost": fabric_cost + extra_cost, + "total_gain": fabric_gain + extra_gain, + "already_installed_gain": sum([x["gain"] for x in picked_fabric + picked_extra if x["already_installed"]]) + }) + + return append_solution_metrics(solutions, target_gain, p) + + # ------------------------------------------------------------------ + # Scenario 1: Air source heat pump with required insulation + # ------------------------------------------------------------------ + if enforce_heat_pump_insulation: + # Wall measures could be IWI or EWI + remaining_wall_measures = [ + x for x in all_measure_types if x in WALL_INSULATION_MEASURES + [ + "internal_wall_insulation+mechanical_ventilation", "external_wall_insulation+mechanical_ventilation" + ] + ] + remaining_roof_measures = [x for x in all_measure_types if x in ROOF_INSULATION_MEASURES] + + # Mandatory structure: + # - must include ASHP + # - must include >=1 wall insulation (if still needed) + # - must include >=1 roof insulation (if still needed) + # We need all of the combinations of remaining wall and remaining roof measures + heat_pump_paths = build_heat_pump_paths(remaining_wall_measures, remaining_roof_measures) + paths.extend(heat_pump_paths) + + fixed_selections = expand_funding_path(optimisation_measures, paths) + + for fixed in fixed_selections: + + # fixed = [(gi, oi, opt), ...] + fixed_items = [opt for (_, _, opt) in fixed] + fixed_groups = {gi for (gi, _, _) in fixed} + + fixed_cost, fixed_gain = sum_cost_gain(fixed_items) + + # Remaining measures (all other groups) + remaining_measures = [ + grp for gi, grp in enumerate(optimisation_measures) + if gi not in fixed_groups + ] + + # Optimise remaining measures + if ( + target_gain is not None + and fixed_gain >= target_gain + ): + picked, sub_cost, sub_gain = [], 0, 0 + else: + picked, sub_cost, sub_gain = run_optimizer( + remaining_measures, + budget=budget - fixed_cost if budget is not None else None, + sub_target_gain=( + target_gain - fixed_gain + if target_gain is not None + else None + ) + ) + + if picked is None: + continue + + total_items = fixed_items + picked + total_cost = fixed_cost + sub_cost + total_gain = fixed_gain + sub_gain + + solutions.append({ + "scenario": "heat_pump_with_insulation", + "items": total_items, + "fixed_items": fixed_items, + "total_cost": total_cost, + "total_gain": total_gain, + "already_installed_gain": sum([x["gain"] for x in total_items if x["already_installed"]]) + }) + + # ------------------------------------------------------------------ + # Scenario 2: Optimise without air source heat pump + # ------------------------------------------------------------------ + # No special path; just exclude ASHP from options and allow us to optimise. + measures_no_heat_pump = exclude_measure_types(optimisation_measures, ["air_source_heat_pump"]) + + picked, total_cost, total_gain = run_optimizer( + measures_no_heat_pump, + budget=budget, + sub_target_gain=target_gain, + ) + + if picked is not None: + solutions.append({ + "scenario": "no_heat_pump", + "items": picked, + "fixed_items": [], + "total_cost": total_cost, + "total_gain": total_gain, + "already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]]) + }) + + solutions_df = append_solution_metrics(solutions, target_gain, p) + + return solutions_df + + +def _get_ending_sap_without_battery(x): + gain = [y["gain"] - y.get("battery_gain", 0) for y in x["items"]] + return float(sum(gain)) + + +def append_solution_metrics(solutions, target_gain, p): + """ + Given a set of solutions, this function will return a dataframe, with cost metrics appended, to allow + the end user to select the optimal solution. + :param solutions: + :param target_gain: + :return: + """ + + solutions_df = pd.DataFrame(solutions) + + if solutions_df.empty: + # We return a blank dataframe + return solutions_df + + # Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the + # final upgrade target, we then look to perform a final optimisation pass to meet the target gain. + solutions_df["meets_upgrade_target"] = solutions_df["total_gain"] >= target_gain - 0.1 + # We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4 + # We flag projects that are including batteries + solutions_df["has_battery"] = solutions_df["items"].apply(has_battery) + solutions_df["array_size"] = solutions_df["items"].apply( + lambda x: sum(float(y["array_size"]) for y in x if "array_size" in y) + ) + + # We need the ending SAP, but we'll need to remove the battery SAP uplift first + + solutions_df["ending_sap_without_battery"] = solutions_df.apply( + lambda x: int(p.data["current-energy-efficiency"]) + _get_ending_sap_without_battery(x), + axis=1 + ) + + solutions_df = solutions_df.sort_values("total_cost", ascending=True) + + return solutions_df + + # ---- helpers ------------------------------------------------------------- diff --git a/recommendations/tests/test_optimiser_functions.py b/recommendations/tests/test_optimiser_functions.py index 031bb9ac..865e3398 100644 --- a/recommendations/tests/test_optimiser_functions.py +++ b/recommendations/tests/test_optimiser_functions.py @@ -8,6 +8,7 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser class TestPrepareInputMeasures: + def test_returns_expected_structure_without_ventilation(self): recs = [ [ # loft insulation measure diff --git a/recommendations/tests/test_optimisers.py b/recommendations/tests/test_optimisers.py index e81aac69..ecc6ea56 100644 --- a/recommendations/tests/test_optimisers.py +++ b/recommendations/tests/test_optimisers.py @@ -1,97 +1,14 @@ -import numpy as np -# import pandas as pd from pandas import Timestamp from numpy import nan import datetime -# import backend.app.assumptions as assumptions -# import recommendations.optimiser.optimiser_functions as optimiser_functions -# -# from backend.Funding import Funding -# -# project_scores_matrix = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/ECO4 Full Project Scores Matrix.csv") -# partial_project_scores_matrix = pd.read_csv("backend/tests/test_data/ECO4_Partial_Project_Scores_Matrix_v6.csv") -# partial_project_scores_matrix.columns = ['Measure category', 'Measure_Type', 'Pre_Main_Heating_Source', -# 'Post_Main_Heating_Source', 'Total Floor Area Band', 'Starting Band', -# 'Average Treatable Factor', 'Cost Savings', 'SAP Savings'] -# whlg_eligible_postcodes = pd.DataFrame([{"Postcode": "ab12cd"}]) -# -# funding = Funding( -# project_scores_matrix=project_scores_matrix, -# partial_project_scores_matrix=partial_project_scores_matrix, -# whlg_eligible_postcodes=whlg_eligible_postcodes, -# eco4_social_cavity_abs_rate=13.5, -# eco4_social_solid_abs_rate=17, -# eco4_private_cavity_abs_rate=13.5, -# eco4_private_solid_abs_rate=17, -# gbis_social_cavity_abs_rate=21, -# gbis_social_solid_abs_rate=25, -# gbis_private_cavity_abs_rate=22, -# gbis_private_solid_abs_rate=28, -# tenure="Social" -# ) -# -# # Assume these costs have been adjusted - - -# -# # Insert the funding uplifts -# for recs in property_recommendations: -# for r in recs: -# # Insert randomly -# # Select one of 0, 0.25 or 0.45 -# r["uplift"] = np.random.choice([0, 0.25, 0.45]) -# -# # We calculate the innovation uplift against each measure -# for recs in property_recommendations: -# for r in recs: -# if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating"]: -# r["innovation_uplift"] = 0 -# continue -# r["innovation_uplift"] = funding.get_innovation_uplift( -# measure=r, -# starting_sap=p.data["current-energy-efficiency"], -# floor_area=p.floor_area, -# is_cavity=False, -# current_wall_uvalue=1.7, -# is_partial=False, -# existing_li_thickness=150, -# mainheating=p.main_heating, -# main_fuel=p.main_fuel, -# mainheat_energy_eff=p.data["mainheat-energy-eff"], -# ) -# print(r["innovation_uplift"]) -# -# property_measure_types = {rec["type"] for recs in property_recommendations for rec in recs} -# property_required_measures = [m for m in property_recommendations if m[0]["type"] in []] -# measures_to_optimise = [m for m in property_recommendations if m[0]["type"] not in []] -# -# # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore -# # its inclusion -# needs_ventilation = any( -# x in property_measure_types for x in assumptions.measures_needing_ventilation -# ) and not p.has_ventilation -# -# input_measures = optimiser_functions.prepare_input_measures( -# measures_to_optimise, "Increasing EPC", needs_ventilation, True -# ) -# -# # ---- main wrapper around your optimiser ---------------------------------- -# -# # Run inputs: -# target_gain = 18.5 -# -# # Run the optimiser with these inouts - - -# tests/test_social_fabric_only.py import numpy as np import pandas as pd import pytest from copy import deepcopy from recommendations.optimiser import optimiser_functions -from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths # wherever you defined it +from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths, build_heat_pump_paths from backend.Funding import Funding from backend.app.plan.schemas import WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, ECO4_ELIGIBILE_FABRIC_MEASURES @@ -799,3 +716,14 @@ def test_private_solid_wall_no_innovation_epc_d(p, funding, mock_project_scores_ 'partial_project_funding': 2300.1000000000004, 'partial_project_score': 135.3, 'total_uplift': 0.0, 'total_uplift_score': 0.0 } + + +def test_build_heat_pump_paths(): + eg1 = build_heat_pump_paths([], ["loft_insulation"]) + + assert eg1 == [{'AND': ['loft_insulation', 'air_source_heat_pump']}] + + eg2 = build_heat_pump_paths(["internal_wall_insulation", "external_wall_insulation"], ["loft_insulation"]) + + assert eg2 == [{'AND': ['internal_wall_insulation', 'loft_insulation', 'air_source_heat_pump']}, + {'AND': ['external_wall_insulation', 'loft_insulation', 'air_source_heat_pump']}] diff --git a/serverless.yml b/serverless.yml index 38d8da89..d2d8f50a 100644 --- a/serverless.yml +++ b/serverless.yml @@ -66,7 +66,7 @@ functions: - sqs: arn: arn:aws:sqs:${self:provider.region}:${aws:accountId}:model-engine-queue batchSize: 1 - maximumConcurrency: 10 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits + maximumConcurrency: 5 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits resources: