From 2c19b89c77cfbd8ba987a098a217e6435a3359d0 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 31 Jul 2025 19:13:16 +0100 Subject: [PATCH] allowing carbon and energy otimisation by removing slack --- asset_list/AssetList.py | 77 ++- asset_list/app.py | 112 +++- asset_list/mappings/built_form.py | 55 +- asset_list/mappings/heating_systems.py | 56 +- asset_list/mappings/property_type.py | 55 +- backend/Funding.py | 26 +- backend/app/plan/schemas.py | 17 +- backend/engine/engine.py | 33 +- epr_data_exports/app.py | 567 ++++++++++++++++++ recommendations/optimiser/GainOptimiser.py | 26 +- .../optimiser/optimiser_functions.py | 4 +- serverless.yml | 2 +- sfr/principal_pitch/2_export_data.py | 59 +- 13 files changed, 1035 insertions(+), 54 deletions(-) create mode 100644 epr_data_exports/app.py diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py index 945b5e4e..446ff4d0 100644 --- a/asset_list/AssetList.py +++ b/asset_list/AssetList.py @@ -887,6 +887,9 @@ class AssetList: self.landlord_year_built ].apply(extract_year) + for x in self.standardised_asset_list[self.landlord_year_built].values: + extract_year(x) + # We now create standard lookups to_remap = { self.landlord_property_type: { @@ -1099,6 +1102,13 @@ class AssetList: ) # Estimate the perimeter + # Handle funky edge case + self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = np.where( + (self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] == 0), + self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].mean(), + self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] + ) + self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply( lambda x: estimate_perimeter( floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS], @@ -1753,7 +1763,9 @@ class AssetList: # It's empty cavity self.standardised_asset_list["cavity_is_empty"] | # It's a cavity wall - (self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].str.contains("cavity")) + self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( + ["filled cavity", "partial insulated cavity"] + ) ) not_a_flat = ( @@ -2097,6 +2109,7 @@ class AssetList: RANGE_RE = re.compile(r'\b(\d+[A-Za-z]?)\s*[-–]\s*(\d+[A-Za-z]?)\b') NUM_RE = re.compile(r'\b\d+[A-Za-z]?\b') # captures 12, 12A, etc. + TO_RANGE_RE = re.compile(r'\b(\d+[A-Za-z]?)\s+(?:to|To|TO)\s+(\d+[A-Za-z]?)\b') # captures "13 to 15" expanded_rows = [] @@ -2121,11 +2134,12 @@ class AssetList: # 1 ─ Range (e.g. 1-7) m_range = RANGE_RE.search(addr) - if m_range: + to_range = TO_RANGE_RE.search(addr) - start, end = m_range.groups() + if m_range or to_range: + start, end = m_range.groups() if m_range else to_range.groups() start, end = int(re.match(r'\d+', start)[0]), int(re.match(r'\d+', end)[0]) - if start > end or (end - start) > 100: + if start > end or (end - start) > 200: raise ValueError(f"Suspicious range '{addr}'") # We define the looping range on whether we have odd, even or all numbers @@ -2137,10 +2151,12 @@ class AssetList: for n in house_number_range: new = row.copy() - new_addr = RANGE_RE.sub(str(n), addr, count=1) + range_text = m_range.group(0) if m_range else to_range.group(0) + new_addr = addr.replace(range_text, str(n)) + # Build the new full address by also swapping out the range_text original_full_address = new[self.STANDARD_FULL_ADDRESS] - new_full_address = original_full_address.replace(addr, new_addr) - new[self.STANDARD_ADDRESS_1] = new_addr + new_full_address = original_full_address.replace(range_text, str(n)) + new[self.STANDARD_ADDRESS_1] = str(n) new[self.STANDARD_FULL_ADDRESS] = new_full_address new[self.STANDARD_PROPERTY_TYPE] = "flat" # Keep a record of the previous address 1 @@ -2155,7 +2171,7 @@ class AssetList: # 2 ─ Explicit list (e.g. 1, 2, 5 Block) or split by an ampersand (e.g. 1 & 2 Block) nums = NUM_RE.findall(addr) - if len(nums) > 1 and (',' in addr or '&' in addr): + if len(nums) > 1 and (',' in addr or '&' in addr or ' and ' in addr.lower()): for n in nums: new = row.copy() new_addr = re.sub(NUM_RE, n, addr, count=1) # replace the first number only @@ -2174,6 +2190,10 @@ class AssetList: expanded_blocks = pd.DataFrame(expanded_rows) + # Check for duplicated domna ids + if expanded_blocks[self.DOMNA_PROPERTY_ID].duplicated().sum(): + raise ValueError("expanded blocks has duplicated IDs") + # We drop the blocks from the standardised asset list and append on the expanded blocks self.standardised_asset_list = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats" @@ -2318,18 +2338,37 @@ class AssetList: (~group["cavity_reason"].str.contains("(unlikely to quality)", case=False, na=False, regex=False)) ).sum() + n_empties_high_confidence = ( + (group["identified_empty_cavity"] == True) & + (~group["SAP Category"].isin(["SAP Rating 69-75", "SAP Rating 76 or more"])) & + (~pd.isnull(group["cavity_reason"])) & + (~group["cavity_reason"].str.contains("(unlikely to quality)", case=False, na=False, regex=False)) + ).sum() + + # Average age of the EPCs + group["time_since_epc"] = ( + pd.to_datetime("now") - pd.to_datetime( + group[self.EPC_API_DATA_NAMES["inspection-date"]]) + ).dt.days + + average_age_of_epc = group["time_since_epc"].mean() + works = group["hubspot_status"] above_threshold = works.map(LABEL_TO_ENUM.get).dropna() count_above = (above_threshold >= threshold).sum() proportion_surveyed = count_above / len(works) proportion_empty = n_empties / len(works) + proportion_empty_high_confidence = n_empties_high_confidence / len(works) # We auto-populate any blocks that have greater than 50% proportion empty block_analysis.append( { "Block Reference": block_reference, + "Block Size": len(group), + "average_age_of_epc": average_age_of_epc, "Proportion of properties suryeyed": proportion_surveyed, "Percentage of Empties": proportion_empty, + "Percentage of Empties (high confidence)": proportion_empty_high_confidence, **cavity_breakdown.to_dict(), } ) @@ -3345,6 +3384,8 @@ class AssetList: property_type_col = "PROPERTY TYPE As per table emailed" elif "PROPERTY TYPE" in master_data.columns: property_type_col = "PROPERTY TYPE" + elif 'Property Type' in master_data.columns: + property_type_col = 'Property Type' else: property_type_col = "PROPERTY TYPE (SEE DEEMED SCORES SHEET) Eg. 3W_Flat_1 (As per Matrix)" @@ -3496,8 +3537,20 @@ class AssetList: ] if df.shape[0] != 1: - # We have multiple matches - raise NotImplementedError("FIX ME") + # We have multiple matches - it's likely because the landlord has a duplicate + # that has been referenced in totally different ways so we just match to both + for _, x in df.iterrows(): + matched.append( + { + "row_id": row["row_id"], + "original_house_no": original_house_no, + "original_street": original_street, + "original_postcode": original_postcode, + self.STANDARD_LANDLORD_PROPERTY_ID: x[self.STANDARD_LANDLORD_PROPERTY_ID], + } + ) + continue + matched.append( { "row_id": row["row_id"], @@ -3594,6 +3647,10 @@ class AssetList: self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID ) + # Make sure no dupes + if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum(): + raise ValueError("duplicated ids!") + # Finally, we keep a record of the unmatched if unmatched_submissions: self.unmatched_submissions = pd.concat( diff --git a/asset_list/app.py b/asset_list/app.py index 37d9ae0d..f817dc7f 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -59,6 +59,110 @@ def app(): Property UPRN """ + data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Broadlands" + data_filename = "Broadlands Asset List.xlsx" + sheet_name = "Assets" + postcode_column = 'POSTCODE' + fulladdress_column = None + address1_column = "Address1" + address1_method = None + address_cols_to_concat = ["Address1"] + missing_postcodes_method = None + landlord_year_built = "DATEBUILT" + landlord_os_uprn = None + landlord_property_type = "PropertyType" + landlord_built_form = "PropertyType" + landlord_wall_construction = None + landlord_heating_system = "Heating Fuel" + landlord_existing_pv = None + landlord_property_id = "Row ID" + outcomes_filename = [os.path.join(data_folder, "outcomes.xlsx")] + outcomes_sheetname = ["Sheet1"] + outcomes_postcode = ["Postcode"] + outcomes_houseno = ["No."] + outcomes_address = ["Address"] + outcomes_id = [None] + master_filepaths = [ + os.path.join(data_folder, "eco3 submissions.csv"), + os.path.join(data_folder, "eco4 submissions.csv"), + ] + master_to_asset_list_filepath = None + asset_list_header = 0 + landlord_block_reference = None + master_id_colnames = [None, None] + landlord_roof_construction = None + phase = False + landlord_sap = None + ecosurv_landlords = "broadland" + # + + # Community: + data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/New Programme" + data_filename = "SUB EPC C to DOMNA - 24.07.25.xlsx" + sheet_name = "Sheet1" + postcode_column = 'POSTCODE' + fulladdress_column = "ADDRESS" + address1_column = None + address1_method = "house_number_extraction" + address_cols_to_concat = [] + missing_postcodes_method = None + landlord_year_built = "BUILD DATE" + landlord_os_uprn = None + landlord_property_type = "PROPERTY TYPE" + landlord_built_form = "Archetype" # Using the inspections archetype + landlord_wall_construction = "CONSTRUCTION TYPE" + landlord_roof_construction = None + landlord_heating_system = None + landlord_existing_pv = None + landlord_property_id = "UPRN" + landlord_sap = None + outcomes_filename = [] + outcomes_sheetname = [] + outcomes_postcode = [] + outcomes_houseno = [] + outcomes_id = [] + outcomes_address = [] + master_filepaths = [] + master_to_asset_list_filepath = None + phase = False + ecosurv_landlords = None + asset_list_header = 1 + landlord_block_reference = None + master_id_colnames = [] + + data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Ealing/Programme Analysis" + data_filename = "EalingProjectRebuildJW210725.xlsx" + sheet_name = "Refine & Houses" + postcode_column = 'Postcode' + fulladdress_column = "Address" + address1_column = None + address1_method = "house_number_extraction" + address_cols_to_concat = [] + missing_postcodes_method = None + landlord_year_built = None + landlord_os_uprn = None + landlord_property_type = None # Using the inspections property type + landlord_built_form = None + landlord_wall_construction = None + landlord_roof_construction = None + landlord_heating_system = None + landlord_existing_pv = None + landlord_property_id = "Property ref" + landlord_sap = None + outcomes_filename = [] + outcomes_sheetname = [] + outcomes_postcode = [] + outcomes_houseno = [] + outcomes_id = [] + outcomes_address = [] + master_filepaths = [] + master_to_asset_list_filepath = None + phase = False + ecosurv_landlords = None + asset_list_header = 0 + landlord_block_reference = "Block Reference" + master_id_colnames = [] + # TODO: Delete me data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/NRLA/" data_filename = "20250716 Asset List.xlsx" @@ -148,7 +252,7 @@ def app(): landlord_existing_pv = None landlord_property_id = "PropertyCode" outcomes_filename = [os.path.join(data_folder, "Rooftop_Outcomes.xlsx")] - outcomes_sheetname = ["OUTCOMESs"] + outcomes_sheetname = ["OUTCOMES"] outcomes_postcode = ["POSTCODE"] outcomes_houseno = ["NO"] outcomes_address = ["ADDRESS"] @@ -221,15 +325,15 @@ def app(): outcomes_houseno = [] outcomes_address = [] outcomes_id = [] - master_filepaths = [] + master_filepaths = [os.path.join(data_folder, "submissions.csv")] master_to_asset_list_filepath = None asset_list_header = 0 landlord_block_reference = None - master_id_colnames = [] + master_id_colnames = [None] landlord_roof_construction = None phase = False landlord_sap = None - ecosurv_landlords = None + ecosurv_landlords = "cds" # Plus Dane data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Plus Dane/New Programme July 2025/" diff --git a/asset_list/mappings/built_form.py b/asset_list/mappings/built_form.py index c9cd061f..0dc51129 100644 --- a/asset_list/mappings/built_form.py +++ b/asset_list/mappings/built_form.py @@ -385,6 +385,59 @@ BUILT_FORM_MAPPINGS = { 'Maisonette Over Shop': 'mid-floor', 'Medium Rise Flat': 'mid-floor', 'Maisonette Medium Rise': 'unknown', - 'End-terraced house': 'end-terrace' + 'End-terraced house': 'end-terrace', + + 'Ground floor study bedroom': 'ground floor', + 'End terrace bungalow': 'end-terrace', + 'End terrace house': 'end-terrace', + 'Ground floor bedsit': 'ground floor', + 'Detached bungalow': 'detached', + 'Lower ground floor flat': 'ground floor', + 'Mid terrace bungalow': 'mid-terrace', + 'Mid terrace house': 'mid-terrace', + 'Basement bedsit': 'basement', + 'Ground floor flat': 'ground floor', + 'Ground floor flat with study': 'ground floor', + 'Basement flat': 'basement', + 'Semi bungalow': 'semi-detached', + '2nd floor flat': 'mid-floor', + 'General/Communal': 'unknown', + 'Semi house': 'semi-detached', + '2nd floor flat with study': 'mid-floor', + '1st floor flat with study room': 'mid-floor', + 'Cluster House': 'detached', + 'Utility pod': 'unknown', + '3rd floor flat': 'mid-floor', + '4th floor flat': 'mid-floor', + '2nd floor study bedroom': 'mid-floor', + '1st floor study bedroom': 'mid-floor', + 'Dormer bungalow': 'detached', + '1st floor flat': 'mid-floor', + 'Block property': 'unknown', + 'Utility pod - DDA compliant': 'unknown', + '2nd floor bedsit': 'mid-floor', + '1st floor bedsit': 'mid-floor', + '2nd/3rd floor duplex flat': 'mid-floor', + + 'Bungalow - Detached': 'detached', + 'Maisonette - Detached': 'detached', + 'Bedsit - Mid Terrace': 'mid-terrace', + 'House - End Terrace': 'end-terrace', + 'House - Mid Terrace': 'mid-terrace', + 'Bungalow - End Terrace': 'end-terrace', + 'Maisonette - End Terrace': 'end-terrace', + 'Maisonette - Semi Detached': 'semi-detached', + 'House - Detached': 'detached', + 'Bedsit - End Terrace': 'end-terrace', + 'House - Semi detached': 'semi-detached', + 'Studio Flat - Mid Terrace': 'mid-terrace', + 'Bungalow - Semi detached': 'semi-detached', + 'Amenity Block - Detached': 'detached', + 'Bungalow - Mid Terrace': 'mid-terrace', + 'Amenity Block - Semi detached': 'semi-detached', + 'Maisonette - Mid Terrace': 'mid-terrace', + 'Chalet - Wheelchair': 'unknown', + 'Studio Flat': 'unknown', + 'Bungalow - Attached': 'semi-detached' } diff --git a/asset_list/mappings/heating_systems.py b/asset_list/mappings/heating_systems.py index 89bc2933..b55f13c8 100644 --- a/asset_list/mappings/heating_systems.py +++ b/asset_list/mappings/heating_systems.py @@ -377,6 +377,60 @@ HEATING_MAPPINGS = { 'Warm air Electricity': 'warm air heating', 'None': 'no heating', 'Boiler None': 'unknown', - 'Storage heaters Electricity': 'electric storage heaters' + 'Storage heaters Electricity': 'electric storage heaters', + + 'Unknown when old solid fuel system was removed': 'solid fuel', + 'Storage Heater': 'electric storage heaters', + 'Combi': 'gas condensing combi', + 'Combi condensing': 'gas condensing combi', + 'Combi Condensing': 'gas condensing combi', + 'Tenant Burner': 'unknown', + 'Wall Mounted Condens': 'gas condensing boiler', + 'Gas Pipework': 'unknown', + 'Open Fire Bck Boiler': 'solid fuel', + 'Back Boiler Unit': 'solid fuel', + 'Sharedgasboiler': 'communal gas boiler', + 'Wall Mntd Condensing': 'gas condensing boiler', + 'Flr Standing Combi': 'gas combi boiler', + 'Oil - Tenant': 'oil boiler', + 'Open Flue Fire': 'solid fuel', + 'Wall Mounted Fire': 'room heaters', + 'Gas - Unvented Cylinder': 'gas boiler, radiators', + 'Commercial Pipework': 'unknown', + 'Wall Mntd Condensin': 'gas condensing boiler', + 'Offpeakelectric': 'electric storage heaters', + 'Closed Burner': 'unknown', + 'Domesticgasboiler': 'gas boiler, radiators', + 'Elec - Storage': 'electric storage heaters', + 'Share Common Boiler': 'communal heating', + 'Down Flow Heater': 'electric radiators', + 'Inset Flame Effect': 'electric radiators', + 'Closedmulti': 'unknown', + 'Open Fire': 'solid fuel', + 'Lpgas - Domesticgasboiler': 'gas boiler, radiators', + 'Solarpvpanels': 'other', + 'Renew - Ashp': 'air source heat pump', + 'Room Sealed App': 'unknown', + '5 Year Periodic Insp': 'unknown', + 'Solarthermal': 'other', + 'Wall Mounted Combi': 'gas combi boiler', + 'Woodburner': 'solid fuel', + 'Sealed System Wl Mtd': 'unknown', + 'Room Seal App': 'unknown', + 'Shared Gas Boiler': 'communal gas boiler', + 'Heating Distribution': 'unknown', + 'Flr Standing Boiler': 'boiler - other fuel', + 'Multifuel Burner': 'solid fuel', + 'Gas - Shared': 'communal gas boiler', + 'Wall Mounted Boiler': 'gas boiler, radiators', + 'Tenant Boiler': 'gas boiler, radiators', + 'Gas - Domesticgasboiler': 'gas boiler, radiators', + 'Domestic gas boiler': 'gas boiler, radiators', + 'Combination': 'unknown', + + 'Mains Electric': 'electric fuel', + 'Unvented cylinder': 'other', + 'MVHR & Heat Recovery': 'other', + 'Solar': 'other' } diff --git a/asset_list/mappings/property_type.py b/asset_list/mappings/property_type.py index d45fd109..c6539465 100644 --- a/asset_list/mappings/property_type.py +++ b/asset_list/mappings/property_type.py @@ -283,6 +283,59 @@ PROPERTY_MAPPING = { 'Flat Over Shop': 'flat', 'Medium Rise Flat': 'flat', 'End Terraced Town House': 'house', - 'Maisonette Medium Rise': 'maisonette' + 'Maisonette Medium Rise': 'maisonette', + 'Semi bungalow': 'bungalow', + '2nd floor flat': 'flat', + 'End terrace bungalow': 'bungalow', + 'End terrace house': 'house', + 'Ground floor bedsit': 'bedsit', + 'Detached bungalow': 'bungalow', + 'Semi house': 'house', + '2nd floor flat with study': 'flat', + '1st floor flat with study room': 'flat', + 'Lower ground floor flat': 'flat', + 'Cluster House': 'house', + 'Mid terrace bungalow': 'bungalow', + 'Mid terrace house': 'house', + 'Basement bedsit': 'bedsit', + 'Detached house': 'house', + '3rd floor flat': 'flat', + '4th floor flat': 'flat', + 'Dormer bungalow': 'bungalow', + '1st floor flat': 'flat', + 'Ground floor flat': 'flat', + 'Ground floor flat with study': 'flat', + 'Basement flat': 'flat', + '2nd floor bedsit': 'bedsit', + '1st floor bedsit': 'bedsit', + '2nd/3rd floor duplex flat': 'flat', + 'Ground floor study bedroom': 'other', + 'General/Communal': 'other', + 'Utility pod': 'other', + '2nd floor study bedroom': 'other', + '1st floor study bedroom': 'other', + 'Block property': 'block of flats', + 'Utility pod - DDA compliant': 'other', + + 'Bungalow - Detached': 'bungalow', + 'Maisonette - Detached': 'maisonette', + 'Bedsit - Mid Terrace': 'bedsit', + 'Studio Flat': 'flat', + 'House - End Terrace': 'house', + 'House - Mid Terrace': 'house', + 'Bungalow - End Terrace': 'bungalow', + 'Bungalow - Attached': 'bungalow', + 'Maisonette - End Terrace': 'maisonette', + 'Maisonette - Semi Detached': 'maisonette', + 'House - Detached': 'house', + 'Bedsit - End Terrace': 'bedsit', + 'House - Semi detached': 'house', + 'Studio Flat - Mid Terrace': 'flat', + 'Bungalow - Semi detached': 'bungalow', + 'Bungalow - Mid Terrace': 'bungalow', + 'Maisonette - Mid Terrace': 'maisonette', + 'Chalet - Wheelchair': 'other', + 'Amenity Block - Detached': 'other', + 'Amenity Block - Semi detached': 'other' } diff --git a/backend/Funding.py b/backend/Funding.py index 49d2d293..d17074cb 100644 --- a/backend/Funding.py +++ b/backend/Funding.py @@ -1,3 +1,4 @@ +from enum import Enum import pandas as pd import numpy as np from typing import List @@ -413,6 +414,10 @@ class FundingOld: self.whlg() +class EligibilityCaveats(Enum): + TENANT_ON_BENEFITS_OR_LOW_INCOME = "tenant_on_benefits_or_low_income" + + class Funding: """ New class to handle funding calculation @@ -440,6 +445,9 @@ class Funding: self.project_scores_matrix = project_scores_matrix self.whlg_eligible_postcodes = whlg_eligible_postcodes + self.eco4_eligible = False + self.eligbility_caveat = None + @staticmethod def get_sap_band(sap_score_number): bands = [ @@ -478,9 +486,8 @@ class Funding: return "200" - @staticmethod def eco4_prs_eligibility( - starting_sap: int, measures: List, mainheat_description: str, heating_control_description: str + self, starting_sap: int, measures: List, mainheat_description: str, heating_control_description: str ): """ Handles the eligibility criteria for private rental properties under eco @@ -509,11 +516,19 @@ class Funding: # Is a renewable heating ashp = "air_source_heat_pump" in measures + # Meets the EPC criteria, has the measure requirement and tenant must be on benefits if meets_epc & (solar_renweable_heating or ashp or has_solid_wall): - return True + self.eco4_eligible = True + self.eligbility_caveat = EligibilityCaveats.TENANT_ON_BENEFITS_OR_LOW_INCOME + return return False + def gbis_prs_eligibiltiy(self): + """ + Determines if a project is eligible for GBIS funding for private rental properties + """ + def calculate_full_project_abs(self): # Filter the project scores matrix @@ -568,7 +583,7 @@ class Funding: # 2) GBIS if self.tenure == "Private": - is_eco4_eligible = self.eco4_prs_eligibility( + self.eco4_prs_eligibility( starting_sap=starting_sap, measures=measures, mainheat_description=mainheat_description, @@ -578,7 +593,8 @@ class Funding: # Need to implement # 1) Package has to include an insulation measure # 2) We should use the funding for the measure that has the largest partial project score - is_gbis_eligible = () + # TODO: check the rules around GBIS eligibility and heating controls + self.gbis_prs_eligibiltiy() if not is_eco4_eligible: return diff --git a/backend/app/plan/schemas.py b/backend/app/plan/schemas.py index d5b92256..9ed6f978 100644 --- a/backend/app/plan/schemas.py +++ b/backend/app/plan/schemas.py @@ -18,6 +18,12 @@ SPECIFIC_MEASURES = [ "cylinder_thermostat" ] +INSULATION_MEASURES = [ + "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", + "loft_insulation", "flat_roof_insulation", "room_roof_insulation", + "suspended_floor_insulation", "solid_floor_insulation", +] + NON_INVASIVE_SPECIFIC_MEASURES = [ "trickle_vents", "draught_proofing", "mixed_glazing", "cavity_extract_and_refill", "extension_cavity_wall_insulation" @@ -36,7 +42,7 @@ MEASURE_MAP = { "heating_controls": ["roomstat_programmer_trvs", "time_temperature_zone_control"] } -VALID_GOALS = ["Increasing EPC"] +VALID_GOALS = ["Increasing EPC", "Energy Savings", "Reducing CO2 emissions"] VALID_HOUSING_TYPES = ["Social", "Private"] VALID_EVENT_TYPES = ["remote_assessment"] @@ -74,7 +80,7 @@ class PlanTriggerRequest(BaseModel): budget: Optional[float] = None goal: Goal housing_type: HousingType - goal_value: str + goal_value: Optional[str] = None portfolio_id: int trigger_file_path: str already_installed_file_path: Optional[str] = None @@ -118,3 +124,10 @@ class PlanTriggerRequest(BaseModel): if (self.index_start is None) != (self.index_end is None): raise ValueError("Both index_start and index_end must be set or both must be None") return self + + @model_validator(mode="after") + def check_goal_value_requirement(self): + # Make sure that goal_value is set when goal is "Increasing EPC" + if self.goal == "Increasing EPC" and not self.goal_value: + raise ValueError("goal_value is required when goal is 'Increasing EPC'") + return self diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 8968eb0e..04860add 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -811,7 +811,8 @@ async def model_engine(body: PlanTriggerRequest): # we can discount the number of points required to get to the target SAP band (or increase) # in the case of ventilation needs_ventilation = any( - x in property_measure_types for x in assumptions.measures_needing_ventilation) and not p.has_ventilation + x in property_measure_types for x in assumptions.measures_needing_ventilation + ) and not p.has_ventilation input_measures = prepare_input_measures(measures_to_optimise, body.goal, needs_ventilation) @@ -849,15 +850,21 @@ async def model_engine(body: PlanTriggerRequest): 0 ) - current_sap_points = int(p.data["current-energy-efficiency"]) + if body.goal == "Increasing EPC": + current_sap_points = int(p.data["current-energy-efficiency"]) + gain = CostOptimiser.calculate_sap_gain_with_slack( + epc_to_sap_lower_bound(body.goal_value) - current_sap_points + ) - fixed_gain + if body.simulate_sap_10: + # We add 3 additional SAP points to the required gain to account for SAP 10 + gain += 3 - sap_gain = CostOptimiser.calculate_sap_gain_with_slack( - epc_to_sap_lower_bound(body.goal_value) - current_sap_points - ) - fixed_gain - - if body.simulate_sap_10: - # We add 3 additional SAP points to the required gain to account for SAP 10 - sap_gain += 3 + gain = gain if gain > 0 else 0 + elif body.goal in ["Energy Savings", "Reducing CO2 emissions"]: + # We will aim to maximise these goals, while constaining by budget + gain = None + else: + raise NotImplementedError(f"Goal {body.goal} is not supported") if not body.optimise: if body.goal != "Increasing EPC": @@ -870,15 +877,13 @@ async def model_engine(body: PlanTriggerRequest): else: if body.budget: - optimiser = GainOptimiser( - input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0 - ) + optimiser = GainOptimiser(input_measures, max_cost=body.budget, max_gain=gain) else: # The minimum gain is the minimum number of SAP points required to get to the target SAP band # If the gain is negative, the optimiser will return an empty solution optimiser = CostOptimiser( input_measures, - min_gain=sap_gain + min_gain=gain ) optimiser.setup() @@ -1111,6 +1116,8 @@ async def model_engine(body: PlanTriggerRequest): [sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()] )) + # TODO - This code only pulls in the properties that have been updated in this run, but we need to + # aggregate all properties in the portfolio. We likely need to trigger a re-aggregation aggregated_data = extract_portfolio_aggregation_data( input_properties=input_properties, total_valuation_increase=total_valuation_increase, diff --git a/epr_data_exports/app.py b/epr_data_exports/app.py new file mode 100644 index 00000000..851dfd5f --- /dev/null +++ b/epr_data_exports/app.py @@ -0,0 +1,567 @@ +""" +This is a placeholder script to extract epr data from files, where we can +""" + +""" +July 2025 LiveWest Heating Upgrades +""" +import os +import re +import PyPDF2 +import pandas as pd +from tqdm import tqdm +from collections import Counter + + +def extract_window_age_description(windows_text): + """ + Extracts the most common window age description and its proportion. + + Parameters: + windows_text (str): The text section containing window data. + + Returns: + dict: A dictionary with the most common window age description and its proportion. + """ + # Clean up windows_text by removing line breaks for better pattern matching + windows_text = windows_text.replace("\n", "") + + # Define possible window age descriptions + window_descriptions = [ + "Double post or during 2002", + "Double pre 2002", + "Double with unknown install date", + "Secondary glazing", + "Triple glazing", + "Single glazing", + "Double between 2002 \nand 2021", + "Double between 2002 and 2021" + ] + + # Count occurrences of each description + description_counts = Counter() + for description in window_descriptions: + matches = re.findall(re.escape(description), windows_text) + description_counts[description] = len(matches) + + if not description_counts or not sum(description_counts.values()): + raise ValueError("Failed to extract window data.") + + # Determine the most common description and calculate its proportion + most_common_description, window_count = description_counts.most_common(1)[0] + window_proportion = window_count / sum(description_counts.values()) * 100 + + # Get the second most common and the proportion + if window_proportion == 100: + second_most_common_description = None + second_most_common_proportion = 0 + else: + second_most_common_description, second_window_count = description_counts.most_common(2)[1] + second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 + + return { + "Window Age Description": most_common_description, + "Window Age Description Proportion (%)": window_proportion, + "Secondary Window Age Description": second_most_common_description, + "Secondary Window Age Description Proportion (%)": second_most_common_proportion, + "Number of Windows": sum(description_counts.values()) + } + + +def extract_building_parts_summary(text): + """ + Extracts building parts and associated dimensions from the summary report PDF. + This includes Main Property, multiple extensions if they exist, and Room in Roof areas. + """ + data = [] + + # Locate the Dimensions section + dimensions_section = re.search( + r"Dimensions:\s*Dimension type: Internal\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL + ) + if not dimensions_section: + dimensions_section = re.search( + r"Dimensions:\s*Dimension type: External\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL + ) + if not dimensions_section: + raise ValueError("Failed to locate dimensions section in the text.") + + dimensions_text = dimensions_section.group(1) + + # Pattern to extract each building part, starting from Main Property and including extensions + building_part_pattern = re.compile( + r"(Main Property|\d+(?:st|nd|rd|th) Extension)\s*" + r"(.*?)(?=\d+(?:st|nd|rd|th) Extension|5\.0 Conservatory|$)", + re.DOTALL + ) + + # Loop through each building part match, including Main Property and extensions + for match in building_part_pattern.finditer(dimensions_text): + part_name = match.group(1) + floor_data = match.group(2) + + # Pattern to extract floor details: Floor Level, Floor Area, Room Height, Perimeter, Party Wall Length + floor_pattern = re.compile( + r"(1st Floor|Lowest Floor|Second floor):\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + + # Extract data for each floor within the building part + for floor_match in floor_pattern.finditer(floor_data): + floor_level = floor_match.group(1) + floor_area = float(floor_match.group(2)) + room_height = float(floor_match.group(3)) + perimeter = float(floor_match.group(4)) + party_wall_length = float(floor_match.group(5)) + + # Append to data list + data.append({ + "Building Part": part_name, + "Floor Level": floor_level, + "Floor Area (m2)": floor_area, + "Room Height (m)": room_height, + "Perimeter (m)": perimeter, + "Party Wall Length (m)": party_wall_length + }) + + # Check specifically for "Room(s) in Roof" entries, which only have Floor Area + room_in_roof_pattern = re.compile(r"Room\(s\) in Roof:\s*([\d.]+)") + room_in_roof_match = room_in_roof_pattern.search(floor_data) + if room_in_roof_match: + floor_area = float(room_in_roof_match.group(1)) + data.append({ + "Building Part": part_name, + "Floor Level": "Room in Roof", + "Floor Area (m2)": floor_area, + "Room Height (m)": None, # Placeholder for missing data + "Perimeter (m)": None, # Placeholder for missing data + "Party Wall Length (m)": None # Placeholder for missing data + }) + + # Calculate aggregated dimensions + main_property = [part for part in data if "Main Property" in part["Building Part"]] + first_extensions = [part for part in data if "1st Extension" in part["Building Part"]] + dimensions = { + "Total Floor Area (m2)": sum([part["Floor Area (m2)"] for part in data]), + "Total Ground Floor Area (m2)": sum( + [part["Floor Area (m2)"] for part in data if "Lowest Floor" in part["Floor Level"]] + ), + "RIR Floor Area": sum( + [part["Floor Area (m2)"] for part in data if "Room in Roof" in part["Floor Level"]] + ), + "Main Building Wall Area (m2)": sum([x["Perimeter (m)"] * x["Room Height (m)"] for x in main_property if + x["Perimeter (m)"] and x["Room Height (m)"]]), + "First Extension Wall Area (m2)": sum( + [x["Perimeter (m)"] * x["Room Height (m)"] for x in first_extensions if + x["Perimeter (m)"] and x["Room Height (m)"]] + ), + } + + return dimensions + + +def extract_roof_details_summary(text): + """ + Extracts roof type, insulation, and insulation thickness for each building part + in the 8.0 Roofs section of the summary report. + """ + # Define data structure to hold results + roof_data = [] + + # Locate the entire 8.0 Roofs section + roof_section_match = re.search(r"8\.0 Roofs:\n(.*?)(?=\n9\.0 Floors:|$)", text, re.DOTALL) + if not roof_section_match: + return roof_data # Return empty if no roof section is found + + # Extract the roof section and append "9.0 Floors:" as the boundary + roof_section = roof_section_match.group(1).strip() + "\n9.0 Floors:" + + # Define pattern to match each building part's roof entry + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)(?=\n(?:Insulation|9\.0 Floors:|[A-Z]))" # Matches Roof Type until the next field, label, or end + r"(?:\nInsulation\s+(.*?)(?=\n(?:Insulation Thickness|9\.0 Floors:|[A-Z])))?" # Optional Insulation + r"(?:\nInsulation Thickness\s+(.*?)(?=\n(?:9\.0 Floors:|[A-Z])))?", # Optional Insulation Thickness + re.DOTALL + ) + + # Extract each building part's data + for match in building_part_pattern.finditer(roof_section): + part_name = match.group(1).strip() # Building part label + roof_type = match.group(2).strip() # Roof Type + roof_insulation = match.group(3).strip() if match.group(3) else None # Optional Insulation + roof_insulation_thickness = match.group(4).strip() if match.group(4) else None # Optional Thickness + + # Cleaning to handle annoying cases when it comes out like this: + # 'A Another dwelling above\n1st Extension' + if roof_type.startswith("A Another dwelling above"): + roof_type = "A Another dwelling above" + + # Store results for this building part + roof_data.append({ + "Building Part": part_name, + "Roof Type": roof_type, + "Roof Insulation": roof_insulation, + "Roof Insulation Thickness": roof_insulation_thickness, + }) + + return roof_data + + +def extract_wall_details_summary(text): + """ + Extracts wall type, insulation, dry-lining, and thickness for each building part, + including any alternative wall details within the 7.0 Walls section of the summary PDF text. + """ + # Define data structure to hold all building part wall entries + wall_data = [] + + # Locate the entire 7.0 Walls section + wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1) + + # Define pattern to match each building part's wall entry within the section + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)\n" # Matches main wall Type + r"Insulation\s+(.*?)\n", # Matches main wall Insulation + # r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining + # r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown + # r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness + re.DOTALL + ) + + # Define pattern to capture alternative wall details, if present + alternative_wall_pattern = re.compile( + r"Alternative Wall Area.*?\n" # Matches start of alternative wall section + r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type + r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation + r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining + r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown + r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness + re.DOTALL + ) + + # Find all building part entries within the 7.0 Walls section + for match in building_part_pattern.finditer(wall_section): + + wall_label = match.group(1).strip() + main_wall_type = match.group(2).strip() + main_wall_insulation = match.group(3).strip() + # main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A" + # main_wall_thickness_unknown = match.group(6).strip() + # main_wall_thickness = int(match.group(7)) + + # Initialize dictionary for this wall entry + wall_entry = { + "Building Part": wall_label, + "Wall Type": main_wall_type, + "Wall Insulation": main_wall_insulation, + # "Wall Dry-lining": main_wall_dry_lining, + # "Wall Thickness Unknown": main_wall_thickness_unknown, + # "Wall Thickness (mm)": main_wall_thickness, + "Alternative Wall Type": None, + "Alternative Wall Insulation": None, + "Alternative Wall Dry-lining": "N/A", + "Alternative Wall Thickness Unknown": None, + "Alternative Wall Thickness (mm)": None, + } + + # Check if there's an alternative wall section following this wall entry + alt_match = alternative_wall_pattern.search(wall_section, match.end()) + if alt_match: + wall_entry["Alternative Wall Type"] = alt_match.group(1).strip() + wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip() + wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A" + wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip() + wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6)) + + # Append each building part as a dictionary in the wall_data list + wall_data.append(wall_entry) + + return wall_data + + +def extract_summary_report(pdf_path): + """ + Extracts specific data from the provided PDF file. + Data includes: + - Current SAP rating + - Fuel Bill + - Address + """ + + data = { + "Address": None, + "Postcode": None, + "Current SAP Rating": None, + "Current EPC Band": None, + "Fuel Bill": None, + "Main Building Age Band": None, + "Number of Storeys": None, + "Window Age Description": None, + "Window Age Description Proportion (%)": None, + "Secondary Window Age Description": None, + "Secondary Window Age Description Proportion (%)": None, + "Number of Windows": None, + "Total Number of Doors": None, + "Number of Insulated Doors": None, + "Existing Primary Heating System": None, + "Existing Primary Heating PCDF Reference": None, + "Existing Primary Heating Controls": None, + "Existing Primary Heating % of Heat": None, + "Existing Secondary Heating System": None, + "Existing Secondary Heating PCDF Reference": None, + "Existing Secondary Heating Controls": None, + "Existing Secondary Heating % of Heat": None, + "Secondary Heating Code": None, + "Water Heating Code": None, + 'Total Floor Area (m2)': None, + 'Total Ground Floor Area (m2)': None, + 'RIR Floor Area': None, + 'Main Building Wall Area (m2)': None, + 'First Extension Wall Area (m2)': None, + "Number of Light Fittings": None, + "Number of LEL Fittings": None, + "Number of fittings needing LEL": None, + "Main Roof Type": None, + "Main Roof Insulation": None, + "Main Roof Insulation Thickness": None, + "Main Wall Type": None, + "Main Wall Insulation": None, + "Main Wall Dry-lining": None, + "Main Wall Thickness": None, + "Main Building Alternative Wall Type": None, + "Main Building Alternative Wall Insulation": None, + "Main Building Alternative Wall Dry-lining": None, + "Main Building Alternative Wall Thickness": None, + } + + with (open(pdf_path, "rb") as file): + reader = PyPDF2.PdfReader(file) + text = "" + for page in reader.pages: + text += page.extract_text() + + # Extract Current SAP rating + sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text) + data["Current SAP Rating"] = sap_match.group(1).split(" ")[1] + + data["Property Type"] = ( + re.search(r"Property type:\s*(.*?)\n2\.0", text, re.DOTALL) + .group(1).replace('\n', ' ').strip().replace(" ", " ") + ) + + # Extract age + age_band_match = re.search( + r"3\.0 Date Built:\s*Main Property\s*[A-Z]?\s*(\d{4}-\d{4}|before \d{4}|\d{4} onwards)", + text + ) + data["Main Building Age Band"] = age_band_match.group(1) + + # Number of storeys + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + data["Number of Storeys"] = int(storeys_match.group(1)) + + # Grab number of heated rooms, number of habitable rooms + data["Number of Heated Rooms"] = int(re.search(r"Heated Habitable Rooms:\s*(\d+)", text).group(1)) + data["Number of Habitable rooms"] = int(re.search(r"Habitable Rooms:\s*(\d+)", text).group(1)) + + # Extract Carbon Emissions + # carbon_match = re.search(r"Emissions \(t/year\):\s*([\d.]+)\s*tonnes", text) + # data["Carbon Emissions (t/year)"] = float(carbon_match.group(1)) + + # Extract Fuel Bill + fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text) + data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}" + + # Extract individual address components + postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text) + # region = re.search(r"Region:\s*(.*?)\nHouse Name:", text) + house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text) + house_no = re.search(r"House No:\s*(.*?)\nStreet:", text) + street = re.search(r"Street:\s*(.*?)\nLocality:", text) + locality = re.search(r"Locality:\s*(.*?)\nTown:", text) + town = re.search(r"Town:\s*(.*?)\nCounty:", text) + county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text) + + # Clean extracted values and remove any prefixes + address_parts = [ + house_no.group(1).strip() if house_no else "", + house_name.group(1).strip() if house_name else "", + street.group(1).strip() if street else "", + locality.group(1).strip() if locality else "", + town.group(1).strip() if town else "", + county.group(1).strip() if county else "", + postcode.group(1).strip() if postcode else "" + ] + + # Join non-empty parts with a comma + data["Address"] = ", ".join([part for part in address_parts if part]) + data["Postcode"] = postcode.group(1).strip() + + # windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + # windows_text = windows_section.group(1) + # window_data = extract_window_age_description(windows_text) + # data.update(window_data) + + # Extract Total Number of Doors + total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text) + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text) + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + # Extract heating system + # Extract Primary Heating Data + # Extract Primary Heating Section + primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL) + primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + + primary_text = primary_heating_section.group(1) + + # Handle extracting main heating code: + mainheat_search = re.search(r"Main Heating Code\s*(.*?)\n", primary_text) + if mainheat_search is None: + mainheat_search = re.search(r"Main Heating EES Code\s*(.*?)\n", primary_text) + if mainheat_search is None: + mainheat_search = re.search(r"PCDF boiler Reference\s*(.*?)\n", primary_text) + + data["Existing Primary Heating System"] = mainheat_search.group(1).strip() + + data["Existing Primary Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", primary_text + ).group(1) + + controls_search = re.search( + r"Main Heating Controls Sap\s*(.*?)\n", primary_text + ) + if controls_search is None: + controls_search = re.search( + r"Main Heating Controls\s*(.*?)\n", primary_text + ) + data["Existing Primary Heating Controls"] = controls_search.group(1).strip() + data["Existing Primary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1) + ) + + # Extract Secondary Heating Section + secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + + if secondary_heating_section is None: + data["Existing Secondary Heating System"] = "" + data["Existing Secondary Heating PCDF Reference"] = "" + data["Existing Secondary Heating Controls"] = "" + data["Existing Secondary Heating % of Heat"] = 0 + + else: + secondary_text = secondary_heating_section.group(1) + + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + if main_heating_code_match_secondary is None: + main_heating_code_match_secondary = re.search( + r"Main Heating EES Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + + data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip() + data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", + secondary_text).group(1) + second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) + data["Existing Secondary Heating Controls"] = ( + second_heating_controls_match.group(1).strip() if second_heating_controls_match else "" + ) + data["Existing Secondary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1) + ) + + # Extract Secondary Heating and Water Heating Codes + secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + + if data["Existing Secondary Heating System"] == "": + data["Secondary Heating Code"] = "" + else: + data["Secondary Heating Code"] = secondary_heating_code_match.group( + 1).strip() if secondary_heating_code_match else "" + + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + dimensions = extract_building_parts_summary(text) + data.update(dimensions) + + # Need to get the hot water + section_match = re.search(r"15\.0.*?\n(.*?)15\.1", text, re.DOTALL) + section_text = section_match.group(1) + + # Extract Water Heating Code + code_match = re.search(r"Water Heating Code\s+(\S+)", section_text) + fuel_match = re.search(r"Water Heating Fuel Type\s+(.+)", section_text) + if fuel_match is None: + fuel_type = None + else: + fuel_type = fuel_match.group(1).strip() + + code = code_match.group(1) + data["Hot Water System"] = code + data["Hot Water Fuel"] = fuel_type + + # data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1)) + # data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) + # data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + extracted_roof_data = extract_roof_details_summary(text) + main_roof_data = [roof for roof in extracted_roof_data if "Main" in roof["Building Part"]][0] + data["Main Roof Type"] = main_roof_data["Roof Type"] + data["Main Roof Insulation"] = main_roof_data["Roof Insulation"] + data["Main Roof Insulation Thickness"] = main_roof_data["Roof Insulation Thickness"] + + walls_data = extract_wall_details_summary(text) + # Get the main building wall data + main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0] + data["Main Wall Type"] = main_building_walls["Wall Type"] + data["Main Wall Insulation"] = main_building_walls["Wall Insulation"] + # data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"] + # data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"] + # data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"] + # data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"] + # data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"] + # data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"] + + return data + + +folder_location = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/July 2025 Heating Upgrades" + +df = pd.read_csv("/Users/khalimconn-kowlessar/Documents/hestia/July 2025 Surveys/export_summary_table.csv") + +property_data = [] +for _, x in tqdm(df.iterrows(), total=len(df)): + + if not pd.isnull(x["error"]): + continue + + filepath = x["filepath"] + if filepath in ["No summary file found"]: + continue + summary_data = extract_summary_report(pdf_path=filepath) + property_data.append( + { + **x.to_dict(), + **summary_data + } + ) + +property_data = pd.DataFrame(property_data) +# Store as excel +property_data.to_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/July 2025 Heating " + "Upgrades/property_table_24th_july.xlsx" +) + +sandwell_data = property_data[property_data["company"] == "sandwell.gov.uk"] +sandwell_data.to_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/July 2025 Heating " + "Upgrades/Sandwell EPR data (WIP).xlsx" +) diff --git a/recommendations/optimiser/GainOptimiser.py b/recommendations/optimiser/GainOptimiser.py index 6652ffbf..b11f7e87 100644 --- a/recommendations/optimiser/GainOptimiser.py +++ b/recommendations/optimiser/GainOptimiser.py @@ -9,7 +9,7 @@ class GainOptimiser: This class is used to maximise gain, given a constrained cost """ - def __init__(self, components, max_cost, max_gain): + def __init__(self, components, max_cost, max_gain, allow_slack=True): """ This function will try and maximise the gain, given a constrained cost. If we specific a max_gain, then the optimisation routine is constained to try not to exceed a maximum increase @@ -21,6 +21,8 @@ class GainOptimiser: :param components: List of components, where each component is a dictionary with keys "id", "cost" and "gain" :param max_cost: Maximum cost constraint :param max_gain: Maximum gain constraint + :param allow_slack: If True, allows the model to use slack variables to relax the cost constraint if the model + is infeasible. Defaults to True. """ self.components = components self.max_cost = max_cost @@ -32,6 +34,7 @@ class GainOptimiser: self.solution = [] self.solution_gain = None self.solution_cost = None + self.allow_slack = allow_slack def setup(self): # Initialize Model @@ -124,15 +127,18 @@ class GainOptimiser: if (self.m.status == OptimizationStatus.INFEASIBLE) or ( (self.m.status == OptimizationStatus.OPTIMAL) and not len(solution) ): - logger.info("We have an infeasible model, setting up slack model") - self.setup_slack() - self.m.optimize() - solution = [ - item for group, group_vars in zip(self.components, self.variables) for item, var in - zip(group, group_vars) - if - var.x >= 0.99 - ] + if self.allow_slack: + logger.info("We have an infeasible model, setting up slack model") + self.setup_slack() + self.m.optimize() + solution = [ + item for group, group_vars in zip(self.components, self.variables) for item, var in + zip(group, group_vars) + if + var.x >= 0.99 + ] + else: + logger.info("Infeasible but slack disabled - returning empty solution") self.solution = solution diff --git a/recommendations/optimiser/optimiser_functions.py b/recommendations/optimiser/optimiser_functions.py index 6909a3f0..45e04a1f 100644 --- a/recommendations/optimiser/optimiser_functions.py +++ b/recommendations/optimiser/optimiser_functions.py @@ -13,7 +13,9 @@ def prepare_input_measures(property_recommendations, goal, needs_ventilation): """ goal_map = { - "Increasing EPC": "sap_points" + "Increasing EPC": "sap_points", + "Energy Savings": "kwh_savings", + "Reducing CO2 emissions": "co2_equivalent_savings", } goal_key = goal_map[goal] diff --git a/serverless.yml b/serverless.yml index f9c5f74e..c1fc0b09 100644 --- a/serverless.yml +++ b/serverless.yml @@ -66,7 +66,7 @@ functions: - sqs: arn: arn:aws:sqs:${self:provider.region}:${aws:accountId}:model-engine-queue batchSize: 1 - maximumConcurrency: 2 + maximumConcurrency: 2 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits resources: diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index 5660b78d..79238273 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -7,10 +7,12 @@ from backend.app.utils import sap_to_epc from sqlalchemy.orm import sessionmaker from backend.app.db.connection import db_engine from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations -from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel +from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel, PropertyDetailsSpatial -PORTFOLIO_ID = 206 -SCENARIOS = [389] +# PORTFOLIO_ID = 206 +# SCENARIOS = [389] +PORTFOLIO_ID = 221 +SCENARIOS = [427] def get_data(portfolio_id, scenario_ids): @@ -125,17 +127,64 @@ df["predicted_post_works_sap"] = df["predicted_post_works_sap"].round() df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(lambda x: sap_to_epc(x)) # We merge this back to the main dataframe, which will contain the bathrooms -from utils.s3 import read_csv_from_s3 +from utils.s3 import read_csv_from_s3, read_excel_from_s3 -asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv') +# asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv') +asset_list = read_excel_from_s3( + bucket_name="retrofit-plan-inputs-dev", file_key='8/221/20250722T202328736Z/asset_list.xlsx', + header_row=0, sheet_name="320 - edited" +) asset_list = pd.DataFrame(asset_list) +asset_list = asset_list[["domna_full_address", "domna_postcode", "epc_os_uprn", ]].copy() +asset_list = asset_list.rename(columns={"epc_os_uprn": "uprn"}) df["uprn"] = df["uprn"].astype(str) +asset_list["uprn"] = asset_list["uprn"].astype("Int64").astype(str) asset_list = asset_list.merge( df.drop(columns=["address", "postcode", "property_type", "total_floor_area"]), how="left", on="uprn" ) + +# Get conservation area data from property details spatial. based on the UPRNs +def get_conservation_area_data(uprns): + session = sessionmaker(bind=db_engine)() + session.begin() + + # Query to get conservation area data + spatial_query = session.query( + PropertyDetailsSpatial + ).filter( + PropertyDetailsSpatial.uprn.in_(uprns) # Filter by UPRNs + ).all() + + # Transform spatial data to include all fields dynamically + spatial_data = [ + {col.name: getattr(spatial, col.name) for col in PropertyDetailsSpatial.__table__.columns} + for spatial in spatial_query + ] + + session.close() + return pd.DataFrame(spatial_data) + + +uprns = asset_list[ + ~pd.isna(asset_list["uprn"]) & (asset_list["uprn"] != "") + ]["uprn"].astype(int).unique().tolist() +conservation_area_data = get_conservation_area_data(uprns) +conservation_area_data["uprn"] = conservation_area_data["uprn"].astype(str) +asset_list = asset_list.merge( + conservation_area_data[["uprn", "conservation_status", "is_listed_building", "is_heritage_building"]], + how="left", + on="uprn" +) + +# For exporting NCHA +asset_list.to_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/NCHA/320 Portfolio/asset_list_epc_b.xlsx", + index=False +) + condition_costs = pd.read_excel( "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx", sheet_name="Prices - Khalim",