diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index 367d8c85..f9e978c6 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -256,16 +256,12 @@ class SearchEpc: else: params = {"address": self.address1, "postcode": self.postcode} + url = os.path.join(self.client.domestic.host, "search") + for retry in range(self.max_retries): try: - if "uprn" in params: - # We use the direct call method inside, since we need to implement uprn as a valid - # parameter for the search function - url = os.path.join(self.client.domestic.host, "search") - response = self.client.domestic.call(method="get", url=url, params=params) - else: - response = self.client.domestic.search(params=params, size=size) + response = self.client.domestic.call(method="get", url=url, params=params) if response: self.data = response diff --git a/etl/customers/aiha/xml_extraction.py b/etl/customers/aiha/xml_extraction.py new file mode 100644 index 00000000..531b6752 --- /dev/null +++ b/etl/customers/aiha/xml_extraction.py @@ -0,0 +1,984 @@ +import os +from io import BytesIO + +import pandas as pd + +from etl.xml_survey_extraction.XmlParser import XmlParser + +SURVEY_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/RESIDENT SURVEYS" +CONTINGENCY_RATE = 0.26 + + +def sap_to_epc(sap_points: int | float): + """ + Simple utility function to convert SAP points to EPC rating. + :param sap_points: numerical value of SAP points, typically between 0 and 100 + :return: + """ + + if sap_points <= 0: + raise ValueError("SAP points should be above 0.") + + if sap_points >= 92: + return "A" + elif sap_points >= 81: + return "B" + elif sap_points >= 69: + return "C" + elif sap_points >= 55: + return "D" + elif sap_points >= 39: + return "E" + elif sap_points >= 21: + return "F" + else: + return "G" + + +def main(): + """ + This script handles the extraction of data from the XML files in the survey folders. + :return: + """ + # Step 1: List all subfolders inside SURVEY_FOLDER_PATH. + subfolders = [f.path for f in os.scandir(SURVEY_FOLDER_PATH) if f.is_dir()] + + # Step 2: Loop through each subfolder and find the XML files. + extracted_surveys = [] + for subfolder in subfolders: + print(f"Searching in subfolder: {subfolder}") + + # Find all XML files in the current subfolder. + xml_files = [f for f in os.listdir(subfolder) if f.endswith('.xml')] + + if not xml_files: + print(f"No XML files found in subfolder: {subfolder}") + continue + + # If any XML files are found, perform the data extraction. We use the subfolder name as the survey key. + for xml_file in xml_files: + xml_path = os.path.join(subfolder, xml_file) + print(f"Processing XML file: {xml_path}") + + # Read in the XML and parse it using the XmlParser class. + with open(xml_path, 'rb') as file: + xml_data_io = BytesIO(file.read()) + uprn = None # Set the UPRN if available. + + # Create an XmlParser instance + xml_parser = XmlParser( + file=xml_data_io, + filekey=xml_path, + surveyor_company="", + uprn=uprn, + ) + + # Run the parser to extract the data + xml_parser.run() + if not xml_parser.epc: + # If we don't have a lig xml + continue + + # Store the extracted data for further processing + extracted_surveys.append({ + "survey_key": subfolder.split("/")[-1], + **xml_parser.epc, + **xml_parser.additional_data + }) + + print(f"Extracted {len(extracted_surveys)} surveys.") + # Process the extracted_surveys as needed, for example, save to a database or write to a file. + extracted_surveys = pd.DataFrame(extracted_surveys) + + # THis is the data we need for the AIHA project + measures_data = extracted_surveys[ + ["survey_key", "address", "postcode", "current-energy-efficiency", "current-energy-rating", "number_of_floors"] + ] + measures_data = measures_data.sort_values("survey_key", ascending=True) + + # Note: + # The properties will still have "Very poor" ratings for their hot water + + # TODO + # - AIH001-03 has a loft that is inaccessible - ask Chenai about why this property didn't have access to the loft + # [Can't remember, not clear - Chenai will check] + # - AIH001-08 and AIH001-09, check if it's freehold - could solar work as both of these units are part of the same + # buulding [Question for Lewis & Kevin] + # - AIH001-09 - Is it not possible to install a loft hatch? [IT IS NOT, NO ACCESS - would need to accessed from + # the other unit] + # - AIH001-09 - Is there definitely an immersion water heater? Is this definitely the case for the other units? + # [Question for Lewis & Kevin] - [YES - ASHP!!!!] + + # TODO: Check which properties are in a conservation area + # TODO: AIH001-16 - Is the loft insulation suitable (already has 100mm in the RIR) + # TODO: Adjust Archetype 14 homes to exclude double glazing? Or should we exclude entirely + + recommended_measures = [ + { + "survey_key": "AIH001-01", + "starting_sap": 69, + "recommended_measures": [], + "notes": "Is EPC C" + }, + { + "survey_key": "AIH001-02", + "starting_sap": 65, + "recommended_measures": [ + { + "measure": "Solar PV", + "description": "2.4kWp Solar PV system", + "config": [ + { + "size": "2.4W", + "orientation": "Horizontal", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 7, + "ending_sap": 72, + "notes": "The array can be mounted on the flat roof, so that panels are south facing" + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 4, + "ending_sap": 76 + } + ], + }, + { + "survey_key": "AIH001-03", + "starting_sap": 43, + "recommended_measures": [ + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 1, + "ending_sap": 44, + }, + { + "measure": "Solar PV", + "description": "4kWp Solar PV system", + "config": [ + { + "size": "4kWp", + "orientation": "East", + "elavation": 30, + "overshading": "None or little", + }, + ], + "sap_points": 10, + "ending_sap": 54 + }, + { + "measure": "Air Source Heat Pump", + "description": "Ecoforest ecoAIR EVI 4-20 20kW air source heat pump (+TTZC)", + "sap_points": 20, + "ending_sap": 74 + }, + { + "measure": "Tariff Review", + "description": "Switch to 24-hour tariff", + "sap_points": 15, + "ending_sap": 89 + } + ], + "notes": "Unclear if the loft is accessible" + }, + { + "survey_key": "AIH001-04", + "starting_sap": 48, + "recommended_measures": [ + { + "measure": "Flat Roof Insulation", + "description": "100mm flat roof insulation", + "floor_area": 39.1482, # based on area of top floor + "sap_points": 4, + "ending_sap": 52 + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 55 + }, + { + "measure": "Solar PV", + "description": "4kWp Solar PV system", + "config": [ + { + "size": "4kWp", + "orientation": "South", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 15, + "ending_sap": 70 + } + ], + "notes": "Roof is flat, PV array should be installed south facing with elevation" + }, + { + "survey_key": "AIH001-05", + "starting_sap": 54, + "recommended_measures": [ + { + "measure": "Flat Roof Insulation", + "description": "100mm flat roof insulation", + "floor_area": 49.48, # based on area of top floor + "sap_points": 5, + "ending_sap": 59, + }, + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 2, + "ending_sap": 61, + }, + { + "measure": "Solar PV", + "description": "4kWp Solar PV system", + "config": [ + { + "size": "4kW", + "orientation": "Horizontal", + "elavation": 30, + "overshading": "Modest", + } + ], + "sap_points": 9, + "ending_sap": 70 + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 73 + } + ], + "notes": "" + }, + { + "survey_key": "AIH001-06", + "starting_sap": 62, + "recommended_measures": [ + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 2, + "ending_sap": 64, + }, + { + "measure": "Solar PV", + "description": "2kWp Solar PV system", + "config": [ + { + "size": "2kW", + "orientation": "South", + "elavation": 30, + "overshading": "Modest", + } + ], + "sap_points": 6, + "ending_sap": 70 + } + ] + }, + { + "survey_key": "AIH001-07", + "starting_sap": 74, + "recommended_measures": [], + "notes": "Is EPC C" + }, + { + "survey_key": "AIH001-08", + "starting_sap": 56, + "recommended_measures": [ + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 54.2864, # Based on area of top floor + "sap_points": 2, + "ending_sap": 58, + }, + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 4, + "ending_sap": 62, + }, + { + "measure": "Internal Wall Insulation", + "description": "100mm internal wall insulation", + "hlp": 24.13 * 2.63, + "sap_points": 7, + "ending_sap": 69, + }, + { + "measure": "Ventilation", + "description": "2x DMEV fans", + "sap_points": 0, + "ending_sap": 69, + } + ] + }, + { + "survey_key": "AIH001-09", + "starting_sap": 44, + "recommended_measures": [ + { + "measure": "Internal Wall Insulation", + "description": "100mm internal wall insulation", + "hlp": (22.35 * 3.24) + (22.13 * 2.53), + "sap_points": 8, + "ending_sap": 52, + }, + { + "measure": "Cavity Wall Insulation", + "description": "CWI to rdSAP default standard", + "hlp": (2.68 * 2.39) + (5.93 * 2.63) + (6.13 * 2.39), # 1st & 2nd extension + "sap_points": 1, + "ending_sap": 53, + }, + { + "measure": "Ventilation", + "description": "2x DMEV fans", + "sap_points": 0, + "ending_sap": 53, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 56, + }, + { + "measure": "Solar PV", + "description": "1.6kWp Solar PV system", + "config": [ + { + "size": "1.6W", + "orientation": "South-East", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 6, + "ending_sap": 62 + }, + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 63.59 + 12.31, # Based on area of main building and 1st extension + "sap_points": 8, + "ending_sap": 70, + "notes": "Loft is inaccessible from this unit - would need to be accessed from the other unit, " + "which is also owned by AIHA" + } + ], + "notes": "This property is a house split into 2 flats. We can install a PV array for both units (one array" + "per unit). Area on south-east part of roof is ~22m2 with no overshadowing. Flat roof area is 8m2" + "with modest overshadowing. We suggest a 3.2kWp system, across two units" + }, + { + "survey_key": "AIH001-11", + "starting_sap": 59, + "recommended_measures": [ + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 4, + "ending_sap": 63, + }, + { + "measure": "Internal Wall Insulation", + "description": "100mm internal wall insulation", + "hlp": (18.50 * 3.12) + (19.00 * 2.75), + "sap_points": 5, + "ending_sap": 68, + }, + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 1, + "ending_sap": 69, + } + ] + }, + { + "survey_key": "AIH001-12", + "starting_sap": 46, + "recommended_measures": [ + { + "measure": "Double Glazing", + "description": "Installation of double glazing", + "n_windows": 20, # Counted the bay windows each as 3 + "windows_area": 10.66, + "sap_points": 3, + "ending_sap": 49, + }, + # { + # "measure": "Solar PV", + # "description": "3.2kWp Solar PV system", + # "config": [ + # { + # "size": "3.2W", + # "orientation": "East", + # "elavation": 30, + # "overshading": "Little or none", + # } + # ], + # "sap_points": 9, + # "ending_sap": 58 + # }, + { + "measure": "Air Source Heat Pump", + "description": "Ecoforest ecoAIR EVI 4-20 20kW air source heat pump (+TTZC)", + "sap_points": 15, + "ending_sap": 65 + }, + { + "measure": "Tariff Review", + "description": "Switch to 24-hour tariff", + "sap_points": 15, + "ending_sap": 80 + } + ] + }, + { + "survey_key": "AIH001-13", + "starting_sap": 53, + "recommended_measures": [ + { + "measure": "Roof Insulation", + "description": "100mm+ RIR insulation on all surfaces (ceiling u=0.16, walls u=0.3)", + "floor_area": 39.75, # based on the floor area of the RIR + "sap_points": 6, + "ending_sap": 59, + }, + { + "measure": "Flat Roof Insulation", + "description": "100mm flat roof insulation", + "floor_area": 33.06, # Based on area of the extension + "sap_points": 2, + "ending_sap": 61, + }, + { + "measure": "Cavity Wall Insulation", + "description": "CWI to rdSAP default standard", + "hlp": (35.40 * 2.65) + (26.70 * 2.73) + (16.30 * 2.71), # 1st & 2nd extension + "sap_points": 6, + "ending_sap": 67, + }, + { + "measure": "Ventilation", + "description": "2x DMEV fans", + "sap_points": 0, + "ending_sap": 67, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 2, + "ending_sap": 69, + }, + { + "measure": "Solar PV", + "description": "4kWp Solar PV system", + "config": [ + { + "size": "4kW", + "orientation": "Horizontal", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 9, + "ending_sap": 78 + } + ] + }, + { + "survey_key": "AIH001-14", + "starting_sap": 63, + "recommended_measures": [ + { + "measure": "Cavity Wall Insulation", + "description": "CWI to rdSAP default standard", + "hlp": (11.00 * 2.6) + (11.00 * 2.65) + (4.60 * 2.7), + "sap_points": 5, + "ending_sap": 68, + }, + { + "measure": "Ventilation", + "description": "2x DMEV fans", + "sap_points": 0, + "ending_sap": 68, + }, + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", # Based on area of main building + "floor_area": 59.20, + "sap_points": 1, + "ending_sap": 69, + }, + { + "measure": "Solar PV", + "description": "3.2kWp Solar PV system", + "sap_points": 10, + "ending_sap": 79, + } + ] + }, + { + "survey_key": "AIH001-15", + "starting_sap": 60, + "recommended_measures": [ + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 73.81, # Based on area of main building + "sap_points": 1, + "ending_sap": 61, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 64, + }, + { + "measure": "Solar PV", + "description": "3.2kWp Solar PV system", + "config": [ + { + "size": "3.2W", + "orientation": "North-West", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 7, + "ending_sap": 71, + "notes": "The array is North-west facing and therefore will be slightly less efficient than south" + "facing, however the impact is not so severe as to make the installation not worthwhile." + "Ground mounted" + } + ] + }, + { + "survey_key": "AIH001-16", + "starting_sap": 60, + "recommended_measures": [ + { + "measure": "Cavity Wall Insulation", + "description": "CWI to rdSAP default standard", + "hlp": (21.56 * 2.60) + (26.79 * 2.8) + (6.74 * 2.60), + "sap_points": 4, + "ending_sap": 64, + }, + { + "measure": "Ventilation", + "description": "2x DMEV fans", + "sap_points": 0, + "ending_sap": 64, + }, + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 20.92, # Based on floor area of RIR + "sap_points": 1, + "ending_sap": 65, + }, + { + "measure": "Solar PV", + "description": "2.4kWp Solar PV system", + "config": [ + { + "size": "2.4W", + "orientation": "South-East", + "elavation": 30, + "overshading": "Modest", + } + ], + "sap_points": 5, + "ending_sap": 70, + } + ] + }, + { + "survey_key": "AIH001-17", + "starting_sap": 62, + "recommended_measures": [ + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 1, + "ending_sap": 63, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 66, + }, + { + "measure": "Solar PV", + "description": "4kWp Solar PV system", + "config": [ + { + "size": "3.2kW", + "orientation": "East", + "elavation": 30, + "overshading": "None or little", + }, + { + "size": "0.8kW", + "orientation": "West", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 12, + "ending_sap": 78, + } + ] + }, + { + "survey_key": "AIH001-18", + "starting_sap": 58, + "recommended_measures": [ + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 37.52, # Based on area of main building and 1st extension + "sap_points": 7, + "ending_sap": 65, + }, + { + "measure": "Cylinder Insulation", + "description": "80mm cylinder insulation", + "sap_points": 1, + "ending_sap": 66, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 2, + "ending_sap": 68, + }, + { + "measure": "Solar PV", + "description": "3.2kWp Solar PV system", + "config": [ + { + "size": "3.2W", + "orientation": "North-East", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 7, + "ending_sap": 75, + } + ], + + }, + { + "survey_key": "AIH001-19", + "starting_sap": 76, + "recommended_measures": [] + }, + { + "survey_key": "AIH001-20", + "starting_sap": 82, + "recommended_measures": [] + }, + { + "survey_key": "AIH001-21", + "starting_sap": 53, + "recommended_measures": [ + { + "measure": "Cyliner Insulation", + "description": "80mm cylinder insulation", + "sap_points": 2, + "ending_sap": 55, + }, + { + "measure": "Roof Insulation", + "description": "100mm+ RIR insulation on all surfaces (ceiling u=0.16, walls u=0.3)", + "floor_area": 22.80, # Based on floor area of RIR + "sap_points": 7, + "ending_sap": 62, + }, + { + "measure": "Solar PV", + "description": "2.4kWp Solar PV system", + "config": [ + { + "size": "1.6kWp", + "orientation": "Horizontal", + "elavation": 30, + "overshading": "None or little", + }, + { + "size": "0.8kWp", + "orientation": "South-East", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 9, + "ending_sap": 71, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 74, + } + ] + }, + { + "survey_key": "AIH001-SIMULATED-01", + "elmhurst_reference": "000020", + "starting_sap": None, + "recommended_measures": [ + { + "measure": "Internal Wall Insulation", + "description": "100mm internal wall insulation", + "hlp": (22.35 * 3.24) + (22.13 * 2.53), + "sap_points": 8, + "ending_sap": 52, + }, + { + "measure": "Cavity Wall Insulation", + "description": "CWI to rdSAP default standard", + "hlp": (2.68 * 2.39) + (5.93 * 2.63) + (6.13 * 2.39), # 1st & 2nd extension + "sap_points": 1, + "ending_sap": 53, + }, + { + "measure": "Ventilation", + "description": "2x DMEV fans", + "sap_points": 0, + "ending_sap": 53, + }, + { + "measure": "TTZC", + "description": "Smart Thermostat", + "sap_points": 3, + "ending_sap": 56, + }, + { + "measure": "Solar PV", + "description": "1.6kWp Solar PV system", + "config": [ + { + "size": "1.6W", + "orientation": "South-East", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 6, + "ending_sap": 62 + }, + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 63.59 + 12.31, # Based on area of main building and 1st extension + "sap_points": 8, + "ending_sap": 70, + "notes": "Loft is inaccessible from this unit - would need to be accessed from the other unit, " + "which is also owned by AIHA" + } + ], + "notes": "This was cloned from 80A. There is no existing data for 80B" + }, + { + "survey_key": "AIH001-SIMULATED-05", + "starting_sap": 68, + "recommended_measures": [ + { + "measure": "Loft Insulation", + "description": "300mm loft insulation", + "floor_area": 42.5, + "sap_points": 1, + "ending_sap": 69, + }, + { + "measure": "Solar PV", + "description": "3.2kWp Solar PV system", + "config": [ + { + "size": "3.2W", + "orientation": "North-East", + "elavation": 30, + "overshading": "None or little", + } + ], + "sap_points": 8, + "ending_sap": 77, + } + ] + } + ] + + scaffolding_data = [ + { + "number_of_floors": 2, + "price": 841, + }, + { + "number_of_floors": 3, + "price": 1077, + } + ] + + # TODO - Need an update cost for cylinder insulation + pricing_data = [ + {'item': '80mm cylinder insulation', 'unit_price': 50, 'unit': 'unit'}, + {'item': '100mm internal wall insulation', 'unit_price': 244.8, 'unit': 'hlp_m2'}, + {'item': 'CWI to rdSAP default standard', 'unit_price': 14.21, 'unit': 'hlp_m2'}, + {'item': 'Window draught proofing improvements', 'unit_price': 63, 'unit': 'window'}, + {'item': '100mm flat roof insulation', 'unit_price': 195, 'unit': 'floor_m2'}, + {'item': 'Switch to 24-hour tariff', 'unit_price': 0, 'unit': None}, + {'item': 'Installation of double glazing', 'unit_price': 1074, 'unit': 'window'}, + {'item': 'Ecoforest ecoAIR EVI 4-20 20kW air source heat pump (+TTZC)', 'unit_price': 21189 + 1200, + 'unit': 'unit'}, + {'item': '100mm+ RIR insulation on all surfaces (ceiling u=0.16, walls u=0.3)', 'unit_price': 244.80, + 'unit': 'floor_m2'}, + {'item': '300mm loft insulation', 'unit_price': 16.07, 'unit': 'floor_m2'}, + {'item': 'Smart Thermostat', 'unit_price': 1200, 'unit': 'unit'}, + {'item': '2x DMEV fans', 'unit_price': 1070, 'unit': 'unit'}, + {'item': '1.6kWp Solar PV system', 'unit_price': 3040, 'unit': 'unit_needs_scaffolding'}, + {'item': '2kWp Solar PV system', 'unit_price': 3201, 'unit': 'unit_needs_scaffolding'}, + {'item': '2.4kWp Solar PV system', 'unit_price': 3363, 'unit': 'unit_needs_scaffolding'}, + {'item': '3.2kWp Solar PV system', 'unit_price': 3686, 'unit': 'unit_needs_scaffolding'}, + {'item': '4kWp Solar PV system', 'unit_price': 4009, 'unit': 'unit_needs_scaffolding'}, + {'item': '5.6kWp Solar PV system', 'unit_price': 5015, 'unit': 'unit_needs_scaffolding'}, + ] + pricing_data = pd.DataFrame(pricing_data) + + for recommendation in recommended_measures: + property_data = measures_data[measures_data["survey_key"] == recommendation["survey_key"]].squeeze() + total_cost = 0 + + for measure in recommendation["recommended_measures"]: + measure_pricing = pricing_data[pricing_data["item"] == measure["description"]] + measure_unit = measure_pricing["unit"].values[0] + + if measure_unit in ["unit", None]: + measure_cost = float(measure_pricing["unit_price"].values[0]) + elif measure_unit == "unit_needs_scaffolding": + n_floors = property_data["number_of_floors"] + scaffolding_cost = [x for x in scaffolding_data if x["number_of_floors"] == n_floors][0]["price"] + measure_cost = float(measure_pricing["unit_price"].values[0]) + scaffolding_cost + elif measure_unit == "floor_m2": + measure_cost = float(measure_pricing["unit_price"].values[0]) * measure["floor_area"] + elif measure_unit == "hlp_m2": + measure_cost = float(measure_pricing["unit_price"].values[0]) * measure["hlp"] + elif measure_unit == "window": + measure_cost = float(measure_pricing["unit_price"].values[0]) * measure["n_windows"] + else: + raise Exception("Unknown unit type") + + measure["Total Cost"] = measure_cost + total_cost += measure_cost + + recommendation["total_cost"] = total_cost + + # Step 1: Normalize the recommended_measures data into a DataFrame. + normalized_measures = [] + for survey in recommended_measures: + survey_key = survey["survey_key"] + starting_sap = survey["starting_sap"] + total_cost = survey.get("total_cost", 0) + + for measure in survey.get("recommended_measures", []): + # Include hlp and floor_area for each measure if available + hlp = measure.get("hlp", None) + floor_area = measure.get("floor_area", None) + + normalized_measures.append({ + "survey_key": survey_key, + "hlp": hlp, + "floor_area": floor_area, + "starting_sap": starting_sap, + "measure": measure["measure"], + "description": measure.get("description", ""), + "sap_points": measure.get("sap_points", 0), + "measure_cost": measure.get("Total Cost", 0), + "total_cost": total_cost + }) + + # Convert the normalized list into a DataFrame. + measures_df = pd.DataFrame(normalized_measures) + + # Step 2: Pivot the measures_df to have a column for each measure type, using the description as values. + pivoted_measures = measures_df.pivot_table( + index="survey_key", + columns="measure", + values="description", + aggfunc=lambda x: ' '.join(x), # Concatenate descriptions if there are multiple entries. + fill_value=None + ).reset_index() + + measures_columns = [x for x in pivoted_measures.columns if x not in ["survey_key"]] + # We add a "Cost of" column for each measure + for measure in measures_columns: + pivoted_measures[f"Cost of {measure}"] = None + + pivoted_floor_area = measures_df.pivot_table( + index="survey_key", + columns="measure", + values="floor_area", + aggfunc="first" # Use 'first' since each measure should only appear once per survey_key + ).add_prefix("floor_area - ").reset_index() + + pivoted_hlp = measures_df.pivot_table( + index="survey_key", + columns="measure", + values="hlp", + aggfunc="first" + ).add_prefix("hlp - ").reset_index() + + # Merge hlp and floor_area data + pivoted_measures = pivoted_measures.merge(pivoted_hlp, on="survey_key", how="left") + pivoted_measures = pivoted_measures.merge(pivoted_floor_area, on="survey_key", how="left") + + # Step 3: Calculate the total sap points and total cost for each survey. + totals = measures_df.groupby("survey_key").agg( + total_sap_points=("sap_points", "sum"), + ).reset_index() + + # Merge total sap points into the pivoted measures. + pivoted_measures = pd.merge(pivoted_measures, totals, on="survey_key", how="left") + # pivoted_measures["Cost Contingency"] = pivoted_measures["total_cost_of_measures"] * CONTINGENCY_RATE + # pivoted_measures["Total Cost"] = pivoted_measures["total_cost_of_measures"] + pivoted_measures["Cost Contingency"] + + # Step 4: Extract starting SAP for each survey key. + starting_sap_df = measures_df.drop_duplicates(subset=["survey_key"])[["survey_key", "starting_sap"]] + + # Merge starting SAP back onto pivoted measures. + result_df = pd.merge(pivoted_measures, starting_sap_df, on="survey_key", how="left") + + # Step 5: Calculate the ending SAP. + result_df["Ending SAP"] = result_df["starting_sap"] + result_df["total_sap_points"] + result_df["Ending EPC Rating"] = result_df["Ending SAP"].apply(sap_to_epc) + + # Step 6: Merge the result with the measures_data to get the final DataFrame. + final_measures = measures_data.merge( + result_df, how="left", on="survey_key" + ) + + final_measures.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/Measures packages.csv") + + # Store costs + pricing_data.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/Pricing data.csv") + +# if __name__ == "__main__": +# main() diff --git a/etl/customers/livewest/route_march_2024_10_28.py b/etl/customers/livewest/route_march_2024_10_28.py new file mode 100644 index 00000000..1b259fba --- /dev/null +++ b/etl/customers/livewest/route_march_2024_10_28.py @@ -0,0 +1,225 @@ +import os +import time + +import pandas as pd +from tqdm import tqdm + +from dotenv import load_dotenv +from utils.s3 import read_excel_from_s3 +from backend.SearchEpc import SearchEpc +from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes + +from recommendations.recommendation_utils import ( + estimate_perimeter, + estimate_external_wall_area, + estimate_number_of_floors +) + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + + +def get_data(asset_list): + epc_data = [] + errors = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + try: + postcode = home["Postcode"] + house_number = home["Number"] + full_address = home["Full Address"] + + searcher = SearchEpc( + address1=str(house_number), + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5 + ) + # Force the skipping of estimating the EPC + searcher.ordnance_survey_client.property_type = None + searcher.ordnance_survey_client.built_form = None + + searcher.find_property(skip_os=True) + if searcher.newest_epc is None: + continue + + # Look for EPC recommendatons + try: + property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"]) + except: + property_recommendations = {"rows": []} + + epc = { + "row_id": home["row_id"], + **searcher.newest_epc.copy(), + "recommendations": property_recommendations["rows"] + } + + epc_data.append(epc) + except Exception as e: + errors.append(home["row_id"]) + time.sleep(5) + + return epc_data, errors + + +def app(): + """ + This app is EPC pulling data for some properties owned by Livewest + + Data request contents: + Date of last EPC + Reason for EPC + SAP score on register + Property Type + Property Area + Property Age + Any Dimensions (HLP,PW,RH) + Property Wall Construction + Heating Type + Secondary Heating + Loft Insulation Depth + + Additional if possible: + Heat loss calculations + EPC recommendations + Property UPRN + + """ + asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Downloads/LIVEWEST 3578 ECO4 ECO PLUS GBIS.xlsx", header=0 + ) + asset_list["row_id"] = asset_list.index + + epc_data, errors = get_data(asset_list) + + # We now retrieve any failed properties + asset_list_failed = asset_list[asset_list["row_id"].isin(errors)] + epc_data_failed, _ = get_data(asset_list_failed) + + # Append the failed data to the main data + epc_data.extend(epc_data_failed) + + epc_df = pd.DataFrame(epc_data) + + # We expand out the recommendations + recommendations_df = epc_df[["row_id", "recommendations"]] + + unique_recommendations = set() + for _, row in recommendations_df.iterrows(): + unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) + + columns = ["row_id"] + list(unique_recommendations) + transformed_data = [] + for _, row in recommendations_df.iterrows(): + # Initialize a dictionary for this row with False for all recommendations + row_data = {col: False for col in columns} + row_data["row_id"] = row["row_id"] + + # Set True for each recommendation present in this row + for rec in row["recommendations"]: + recommendation_text = rec["improvement-summary-text"] + row_data[recommendation_text] = True + + # Append the row data to transformed_data + transformed_data.append(row_data) + + transformed_df = pd.DataFrame(transformed_data) + # Drop the column that is "" + transformed_df = transformed_df.drop(columns=[""]) + + # Retrieve just the data we need + epc_df = epc_df[ + [ + "row_id", + "uprn", + "property-type", + "built-form", + "inspection-date", + "current-energy-rating", + "current-energy-efficiency", + "roof-description", + "walls-description", + "transaction-type", + # New fields needed + "secondheat-description", + "total-floor-area", + "construction-age-band", + "floor-height", + "number-habitable-rooms", + "mainheat-description", + # + "energy-consumption-current", # kwh/m2 + ] + ] + + asset_list = asset_list.merge( + epc_df, + how="left", + on="row_id" + ).merge( + transformed_df, + how="left", + on="row_id" + ) + + asset_list = asset_list.drop(columns=["row_id"]) + + # Rename the columns + asset_list = asset_list.rename(columns={ + "inspection-date": "Date of last EPC", + "current-energy-efficiency": "SAP score on register", + "current-energy-rating": "EPC rating on register", + "property-type": "Property Type", + "built-form": "Archetype", + "total-floor-area": "Property Floor Area", + "construction-age-band": "Property Age Band", + "floor-height": "Property Floor Height", + "number-habitable-rooms": "Number of Habitable Rooms", + "walls-description": "Wall Construction", + "roof-description": "Roof Construction", + "mainheat-description": "Heating Type", + "secondheat-description": "Secondary Heating", + "transaction-type": "Reason for last EPC", + "energy-consumption-current": "Heat Demand (kWh/m2)" + }) + + asset_list["Estimated Number of Floors"] = asset_list.apply( + lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull( + x["Property Type"]) else None, axis=1 + ) + + asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float) + # Replace "" value with None + asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None) + asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float) + + asset_list["Estimated Perimeter (m)"] = asset_list.apply( + lambda x: estimate_perimeter( + floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"], + num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"], + ), axis=1 + ) + + asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply( + lambda x: estimate_external_wall_area( + num_floors=x["Estimated Number of Floors"], + floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5, + perimeter=x["Estimated Perimeter (m)"], + built_form=x["Archetype"] + ), + axis=1 + ) + + asset_list["Roof Insulation Thickness"] = asset_list.apply( + lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull( + x["Roof Construction"]) else None, + axis=1 + ) + + # Store as an excel + filename = "livewest EPC Data pull - 29 Oct.xlsx" + asset_list.to_excel(filename, index=False) diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py new file mode 100644 index 00000000..9f929db1 --- /dev/null +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -0,0 +1,1320 @@ +import os +import PyPDF2 +import re +import pandas as pd +import numpy as np +from tqdm import tqdm +from collections import Counter + +CUSTOMER_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater" +SURVEY_FOLDERS = os.path.join(CUSTOMER_FOLDER_PATH, "StonewaterSurveys_{i}") +NUM_FOLDERS = 14 + + +def sap_to_epc(sap_points: int | float): + """ + Simple utility function to convert SAP points to EPC rating. + :param sap_points: numerical value of SAP points, typically between 0 and 100 + :return: + """ + + if sap_points <= 0: + raise ValueError("SAP points should be above 0.") + + if sap_points >= 92: + return "A" + elif sap_points >= 81: + return "B" + elif sap_points >= 69: + return "C" + elif sap_points >= 55: + return "D" + elif sap_points >= 39: + return "E" + elif sap_points >= 21: + return "F" + else: + return "G" + + +def extract_summary_report(pdf_path): + """ + Extracts specific data from the provided PDF file. + Data includes: + - Current SAP rating + - Fuel Bill + - Address + """ + data = { + "Address": None, + "Postcode": None, + "Current SAP Rating": None, + "Current EPC Band": None, + "Fuel Bill": None, + "Number of Storeys": None, + "Window Age Description": None, + "Window Age Description Proportion (%)": None, + "Secondary Window Age Description": None, + "Secondary Window Age Description Proportion (%)": None, + "Number of Windows": None, + "Total Number of Doors": None, + "Number of Insulated Doors": None, + "Existing Primary Heating System": None, + "Existing Primary Heating PCDF Reference": None, + "Existing Primary Heating Controls": None, + "Existing Primary Heating % of Heat": None, + "Existing Secondary Heating System": None, + "Existing Secondary Heating PCDF Reference": None, + "Existing Secondary Heating Controls": None, + "Existing Secondary Heating % of Heat": None, + "Secondary Heating Code": None, + "Water Heating Code": None, + 'Total Floor Area (m2)': None, + 'Total Ground Floor Area (m2)': None, + 'RIR Floor Area': None, + 'Main Building Wall Area (m2)': None, + 'First Extension Wall Area (m2)': None, + "Number of Light Fittings": None, + "Number of LEL Fittings": None, + "Number of fittings needing LEL": None, + "Main Roof Type": None, + "Main Roof Insulation": None, + "Main Roof Insulation Thickness": None, + } + + with (open(pdf_path, "rb") as file): + reader = PyPDF2.PdfReader(file) + text = "" + for page in reader.pages: + text += page.extract_text() + + # Extract Current SAP rating + sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text) + data["Current SAP Rating"] = sap_match.group(1).split(" ")[1] + + # Number of storeys + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + data["Number of Storeys"] = int(storeys_match.group(1)) + + # Extract Carbon Emissions + # carbon_match = re.search(r"Emissions \(t/year\):\s*([\d.]+)\s*tonnes", text) + # data["Carbon Emissions (t/year)"] = float(carbon_match.group(1)) + + # Extract Fuel Bill + fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text) + data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}" + + # Extract individual address components + postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text) + # region = re.search(r"Region:\s*(.*?)\nHouse Name:", text) + house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text) + house_no = re.search(r"House No:\s*(.*?)\nStreet:", text) + street = re.search(r"Street:\s*(.*?)\nLocality:", text) + locality = re.search(r"Locality:\s*(.*?)\nTown:", text) + town = re.search(r"Town:\s*(.*?)\nCounty:", text) + county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text) + + # Clean extracted values and remove any prefixes + address_parts = [ + house_no.group(1).strip() if house_no else "", + house_name.group(1).strip() if house_name else "", + street.group(1).strip() if street else "", + locality.group(1).strip() if locality else "", + town.group(1).strip() if town else "", + county.group(1).strip() if county else "", + postcode.group(1).strip() if postcode else "" + ] + + # Join non-empty parts with a comma + data["Address"] = ", ".join([part for part in address_parts if part]) + data["Postcode"] = postcode.group(1).strip() + + windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + windows_text = windows_section.group(1) + window_data = extract_window_age_description(windows_text) + data.update(window_data) + + # Extract Total Number of Doors + total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text) + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text) + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + # Extract heating system + # Extract Primary Heating Data + # Extract Primary Heating Section + primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL) + primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + + primary_text = primary_heating_section.group(1) + + data["Existing Primary Heating System"] = re.search(r"Main Heating Code\s*(.*?)\n", primary_text).group( + 1).strip() + data["Existing Primary Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", primary_text + ).group(1) + data["Existing Primary Heating Controls"] = re.search( + r"Main Heating Controls\s*(.*?)\n", primary_text + ).group(1).strip() + data["Existing Primary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1) + ) + + # Extract Secondary Heating Section + secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + + if secondary_heating_section is None: + data["Existing Secondary Heating System"] = "" + data["Existing Secondary Heating PCDF Reference"] = "" + data["Existing Secondary Heating Controls"] = "" + data["Existing Secondary Heating % of Heat"] = 0 + + else: + secondary_text = secondary_heating_section.group(1) + + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip() + data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", + secondary_text).group(1) + second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) + data["Existing Secondary Heating Controls"] = ( + second_heating_controls_match.group(1).strip() if second_heating_controls_match else "" + ) + data["Existing Secondary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1) + ) + + # Extract Secondary Heating and Water Heating Codes + secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + + if data["Existing Secondary Heating System"] == "": + data["Secondary Heating Code"] = "" + else: + data["Secondary Heating Code"] = secondary_heating_code_match.group( + 1).strip() if secondary_heating_code_match else "" + + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + dimensions = extract_building_parts_summary(text) + data.update(dimensions) + + data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1)) + data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) + data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + roof_section = re.search(r"8\.0 Roofs:\n(.*?)\n9\.0 Floors:", text, re.DOTALL) + roof_text = roof_section.group(1).strip() + roof_type_match = re.search(r"Type\s*([A-Za-z0-9\s]+)", roof_text) + data["Main Roof Type"] = roof_type_match.group(1).strip() if roof_type_match else None + + # Check if "Insulation" exists between Type and Insulation Thickness + insulation_search = re.search( + r"Type\s+.*?\n(Insulation\s+(.*?)\n)?(Insulation Thickness\s+(.*?)\n)", roof_text, re.DOTALL + ) + + if insulation_search: + # Insulation match will be present if it exists, otherwise it will be None + insulation_match = insulation_search.group(2) # Optional group for Insulation + insulation_thickness_match = insulation_search.group(4) # Required group for Insulation Thickness + + # Populate insulation fields + data["Main Roof Insulation"] = insulation_match.strip() if insulation_match else None + data["Main Roof Insulation Thickness"] = ( + insulation_thickness_match.strip() if insulation_thickness_match else None + ) + + return data + + +def extract_window_age_description(windows_text): + """ + Extracts the most common window age description and its proportion. + + Parameters: + windows_text (str): The text section containing window data. + + Returns: + dict: A dictionary with the most common window age description and its proportion. + """ + # Clean up windows_text by removing line breaks for better pattern matching + windows_text = windows_text.replace("\n", "") + + # Define possible window age descriptions + window_descriptions = [ + "Double post or during 2002", + "Double pre 2002", + "Double with unknown install date", + "Secondary glazing", + "Triple glazing", + "Single glazing", + ] + + # Count occurrences of each description + description_counts = Counter() + for description in window_descriptions: + matches = re.findall(re.escape(description), windows_text) + description_counts[description] = len(matches) + + if not description_counts or not sum(description_counts.values()): + raise ValueError("Failed to extract window data.") + + # Determine the most common description and calculate its proportion + most_common_description, window_count = description_counts.most_common(1)[0] + window_proportion = window_count / sum(description_counts.values()) * 100 + + # Get the second most common and the proportion + if window_proportion == 100: + second_most_common_description = None + second_most_common_proportion = 0 + else: + second_most_common_description, second_window_count = description_counts.most_common(2)[1] + second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 + + return { + "Window Age Description": most_common_description, + "Window Age Description Proportion (%)": window_proportion, + "Secondary Window Age Description": second_most_common_description, + "Secondary Window Age Description Proportion (%)": second_most_common_proportion, + "Number of Windows": sum(description_counts.values()) + } + + +def extract_building_parts_epr(text): + """ + Extracts building parts and associated dimensions from the provided PDF text. + Each building part (main and extensions) includes floor area, room height, perimeter, and party wall length. + Handles cases where 'Room(s) in Roof area' appears within the part_name with only the Floor Area information. + """ + data = [] + + # Pattern to locate each "Building part" section + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party " + r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)", + re.DOTALL + ) + + # Extract each building part + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + floor_data = match.group(2) + + # Check for "Room(s) in Roof area" within the part_name + room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name) + if room_in_roof_match: + # Extract Room in Roof area and add it as a separate entry + floor_area = float(room_in_roof_match.group(1)) + # Clean up part name to exclude "Room(s) in Roof area" from the building part name + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + data.append({ + "Building Part": cleaned_part_name, + "Floor Level": "Room in Roof", + "Floor Area (m2)": floor_area, + "Room Height (m)": None, # Placeholder for missing data + "Perimeter (m)": None, # Placeholder for missing data + "Party Wall Length (m)": None # Placeholder for missing data + }) + else: + # Clean up part name to keep only the descriptor (e.g., "Main" or "1st Extension") + cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip() + + # Pattern to match each floor's measurements in standard cases + floor_pattern = re.compile( + r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + + # Extract floor details for each building part + for floor_match in floor_pattern.finditer(floor_data): + floor_level = floor_match.group(1) + floor_area = float(floor_match.group(2)) + room_height = float(floor_match.group(3)) + perimeter = float(floor_match.group(4)) + party_wall_length = float(floor_match.group(5)) + + # Append to data + data.append({ + "Building Part": cleaned_part_name, + "Floor Level": floor_level, + "Floor Area (m2)": floor_area, + "Room Height (m)": room_height, + "Perimeter (m)": perimeter, + "Party Wall Length (m)": party_wall_length + }) + + # Aggregated data calculation + main_building = [part for part in data if "Main" in part["Building Part"]] + first_extension = [part for part in data if "1st Extension" in part["Building Part"]] + dimensions = { + "Total Floor Area (m2)": sum([part["Floor Area (m2)"] for part in data]), + "Total Ground Floor Area (m2)": sum( + [part["Floor Area (m2)"] for part in data if "Lowest floor" in part["Floor Level"]] + ), + "RIR Floor Area": sum( + [part["Floor Area (m2)"] for part in data if "Room in Roof" in part["Floor Level"]] + ), + "Main Building Wall Area (m2)": sum( + [x["Perimeter (m)"] * x["Room Height (m)"] for x in main_building if + x["Perimeter (m)"] and x["Room Height (m)"]] + ), + "First Extension Wall Area (m2)": sum( + [x["Perimeter (m)"] * x["Room Height (m)"] for x in first_extension if + x["Perimeter (m)"] and x["Room Height (m)"]] + ) if first_extension else 0, + } + + return dimensions + + +def extract_building_parts_summary(text): + """ + Extracts building parts and associated dimensions from the summary report PDF. + This includes Main Property, multiple extensions if they exist, and Room in Roof areas. + """ + data = [] + + # Locate the Dimensions section + dimensions_section = re.search( + r"Dimensions:\s*Dimension type: Internal\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL + ) + if not dimensions_section: + raise ValueError("Failed to locate dimensions section in the text.") + + dimensions_text = dimensions_section.group(1) + + # Pattern to extract each building part, starting from Main Property and including extensions + building_part_pattern = re.compile( + r"(Main Property|\d+(?:st|nd|rd|th) Extension)\s*" + r"(.*?)(?=\d+(?:st|nd|rd|th) Extension|5\.0 Conservatory|$)", + re.DOTALL + ) + + # Loop through each building part match, including Main Property and extensions + for match in building_part_pattern.finditer(dimensions_text): + part_name = match.group(1) + floor_data = match.group(2) + + # Pattern to extract floor details: Floor Level, Floor Area, Room Height, Perimeter, Party Wall Length + floor_pattern = re.compile( + r"(1st Floor|Lowest Floor|Second floor):\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + + # Extract data for each floor within the building part + for floor_match in floor_pattern.finditer(floor_data): + floor_level = floor_match.group(1) + floor_area = float(floor_match.group(2)) + room_height = float(floor_match.group(3)) + perimeter = float(floor_match.group(4)) + party_wall_length = float(floor_match.group(5)) + + # Append to data list + data.append({ + "Building Part": part_name, + "Floor Level": floor_level, + "Floor Area (m2)": floor_area, + "Room Height (m)": room_height, + "Perimeter (m)": perimeter, + "Party Wall Length (m)": party_wall_length + }) + + # Check specifically for "Room(s) in Roof" entries, which only have Floor Area + room_in_roof_pattern = re.compile(r"Room\(s\) in Roof:\s*([\d.]+)") + room_in_roof_match = room_in_roof_pattern.search(floor_data) + if room_in_roof_match: + floor_area = float(room_in_roof_match.group(1)) + data.append({ + "Building Part": part_name, + "Floor Level": "Room in Roof", + "Floor Area (m2)": floor_area, + "Room Height (m)": None, # Placeholder for missing data + "Perimeter (m)": None, # Placeholder for missing data + "Party Wall Length (m)": None # Placeholder for missing data + }) + + # Calculate aggregated dimensions + main_property = [part for part in data if "Main Property" in part["Building Part"]] + first_extensions = [part for part in data if "1st Extension" in part["Building Part"]] + dimensions = { + "Total Floor Area (m2)": sum([part["Floor Area (m2)"] for part in data]), + "Total Ground Floor Area (m2)": sum( + [part["Floor Area (m2)"] for part in data if "Lowest Floor" in part["Floor Level"]] + ), + "RIR Floor Area": sum( + [part["Floor Area (m2)"] for part in data if "Room in Roof" in part["Floor Level"]] + ), + "Main Building Wall Area (m2)": sum([x["Perimeter (m)"] * x["Room Height (m)"] for x in main_property if + x["Perimeter (m)"] and x["Room Height (m)"]]), + "First Extension Wall Area (m2)": sum( + [x["Perimeter (m)"] * x["Room Height (m)"] for x in first_extensions if + x["Perimeter (m)"] and x["Room Height (m)"]] + ), + } + + return dimensions + + +def extract_roof_details_epr(text): + """ + Extracts roof type, insulation, and insulation thickness for each building part + in the provided EPR PDF text. + """ + # Define data structure to hold results + roof_data = [] + + # Locate each building part section + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", + re.DOTALL + ) + + # Extract each building part's data, including roof details + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + + # Clean up the building part name + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + + part_details = match.group(2) + + # Extract Roof Type, Roof Insulation, and Roof Insulation Thickness + roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details) + roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details) + roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details) + + # Store results for this building part + roof_data.append({ + "Building Part": cleaned_part_name, + "Roof Type": roof_type_match.group(1).strip() if roof_type_match else None, + "Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None, + "Roof Insulation Thickness": roof_insulation_thickness_match.group( + 1).strip() if roof_insulation_thickness_match else None, + }) + + return roof_data + + +def extract_epr(pdf_path): + """ + Extracts specific data from an Energy Report (EPR) PDF file. + """ + data = { + "Address": None, + "Postcode": None, + "Current SAP Rating": None, + "Current EPC Band": None, + "Primary Energy Use (kWh/yr)": None, + "Primary Energy Use Intensity (kWh/m2/yr)": None, + "Number of Storeys": None, + "Fuel Bill": None, + "Window Age Description": None, + "Window Age Description Proportion (%)": None, + "Secondary Window Age Description": None, + "Secondary Window Age Description Proportion (%)": None, + "Number of Windows": None, + "Total Number of Doors": None, + "Number of Insulated Doors": None, + "Existing Primary Heating System": None, + "Existing Primary Heating PCDF Reference": None, + "Existing Primary Heating Controls": None, + "Existing Primary Heating % of Heat": None, + "Existing Secondary Heating System": None, + "Existing Secondary Heating PCDF Reference": None, + "Existing Secondary Heating Controls": None, + "Existing Secondary Heating % of Heat": None, + "Secondary Heating Code": None, + "Water Heating Code": None, + 'Total Floor Area (m2)': None, + 'Total Ground Floor Area (m2)': None, + 'RIR Floor Area': None, + 'Main Building Wall Area (m2)': None, + 'First Extension Wall Area (m2)': None, + "Number of Light Fittings": None, + "Number of LEL Fittings": None, + "Number of fittings needing LEL": None, + "Main Roof Type": None, + "Main Roof Insulation": None, + "Main Roof Insulation Thickness": None, + } + + with open(pdf_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + text = "" + for page in reader.pages: + text += page.extract_text() + + # Extract Address + address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL) + data["Address"] = address_match.group(1).strip() + data["Postcode"] = data["Address"].split(",")[-1].strip() + + # Extract Current and Potential SAP ratings + sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text) + current_sap, _ = int(sap_match.group(1)), int(sap_match.group(2)) + data["Current SAP Rating"] = current_sap + + # Extract the primary energy use intensity + additional_rating_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text) + data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(additional_rating_match.group(1)) + + # Extract Number of Storeys + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + data["Number of Storeys"] = int(storeys_match.group(1)) + + # Extract Fuel Bill + fuel_bill_match = re.search(r"TOTAL\s*£(\d+)", text) + data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}" + + # Extract Total Number of Doors + total_doors_match = re.search(r"Total Doors:\s*(\d+)", text) + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text) + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + # Extract Primary Heating Section (Main Heating 1) + primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL) + # We may not have a secondary heating + primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + primary_text = primary_heating_section.group(1) + + data["Existing Primary Heating System"] = re.search( + r"Main Heating Code\s*(.*?)\n", primary_text + ).group(1).strip() + data["Existing Primary Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", primary_text + ).group(1) + data["Existing Primary Heating Controls"] = re.search( + r"Main Heating Controls\s*(.*?)\n", primary_text + ).group(1).strip() + data["Existing Primary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%?", primary_text).group(1) + ) + + # Extract Secondary Heating Section (Main Heating 2) + secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL) + if secondary_heating_section is None: + data["Existing Secondary Heating System"] = "" + data["Existing Secondary Heating PCDF Reference"] = "" + data["Existing Secondary Heating Controls"] = "" + data["Existing Secondary Heating % of Heat"] = 0 + + else: + secondary_text = secondary_heating_section.group(1) + + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip() + + data["Existing Secondary Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", secondary_text + ).group(1) + + if data["Existing Secondary Heating System"] == "": + data["Existing Secondary Heating Controls"] = "" + else: + # Might not have heating controls on 2nd system + secondary_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) + data["Existing Secondary Heating Controls"] = ( + secondary_controls_match.group(1).strip() if secondary_controls_match else "" + ) + data["Existing Secondary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%?", secondary_text).group(1) + ) + + # Extract Secondary Heating and Water Heating Codes + secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + + if data["Existing Secondary Heating System"] == "": + data["Secondary Heating Code"] = "" + else: + data["Secondary Heating Code"] = secondary_heating_code_match.group( + 1).strip() if secondary_heating_code_match else "" + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + # Extract Windows information + windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + if windows_section: + windows_text = windows_section.group(1) + window_data = extract_window_age_description(windows_text) + data.update(window_data) + + building_parts = extract_building_parts_epr(text) + data.update(building_parts) + + # Get number of lighting outlets and number of fittings needing LEL + lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text) + data["Number of Light Fittings"] = int(lighting_fittings_match.group(1)) + lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text) + data["Number of LEL Fittings"] = int(lel_fittings_match.group(1)) + data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + roof_details = extract_roof_details_epr(text) + # Get from the main building + main_roof_details = [r for r in roof_details if "Main" in r["Building Part"]] + data["Main Roof Type"] = main_roof_details[0]["Roof Type"] + data["Main Roof Insulation"] = main_roof_details[0]["Roof Insulation"] + data["Main Roof Insulation Thickness"] = main_roof_details[0]["Roof Insulation Thickness"] + + return data + + +def detect_report_type(pdf_path, pdf_file): + """ + Detects the type of report based on content or filename. + :param pdf_path: String path to the PDF file + :param pdf_file: String name of the PDF file + :return: String type of the report ("epr", "summary", or None) + """ + # Attempt to read the first page of the PDF to determine type + with open(pdf_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + first_page_text = reader.pages[0].extract_text() if reader.pages else "" + + if is_energy_report(first_page_text): + return "epr" + elif "summary" in pdf_file.lower() or is_summary_report(first_page_text): + return "summary" + elif is_condition_report(first_page_text): + return "condition" + + return None + + +def extract_retrofit_pdfs(data_folder_path): + """ + Handles extraction from a retrofit data folder if it exists and has content. + Prioritizes extracting data from an EPR if both EPR and summary report are present. + """ + retrofit_files = [f for f in os.listdir(data_folder_path) if f.endswith(".pdf")] + report_types = {"epr": None, "summary": None} + + # First, identify the types of reports available + for pdf_file in retrofit_files: + pdf_path = os.path.join(data_folder_path, pdf_file) + report_type = detect_report_type(pdf_path, pdf_file) + + if report_type == "epr": + report_types["epr"] = pdf_path + elif report_type == "summary": + report_types["summary"] = pdf_path + + # Stop checking further if both EPR and summary are found + if report_types["epr"] and report_types["summary"]: + break + + # Extract data based on report availability and priority + if report_types["epr"]: + return extract_epr(report_types["epr"]) + elif report_types["summary"]: + return extract_summary_report(report_types["summary"]) + + # If no relevant PDF is found, return None + return None + + +def is_energy_report(text): + """ + Determines if the provided text indicates that the PDF is an Energy Report. + Returns True if the text contains 'Energy Report'. + """ + return text.startswith("ENERGY REPORT") + + +def is_summary_report(text): + """ + Determines if the provided text indicates that the PDF is a Summary Report. + """ + return text.startswith("Summary Information") + + +def detect_and_parse_report(pdf_path, pdf_file): + """ + Detects the type of report and extracts the relevant data. + :param pdf_path: String path to the PDF file + :param pdf_file: String name of the PDF file + :return: + """ + # Attempt to read the first page of the PDF to determine type + with open(pdf_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + first_page_text = reader.pages[0].extract_text() if reader.pages else "" + + if is_energy_report(first_page_text): + # Treat this as an Energy Report + return extract_epr(pdf_path) + elif "summary" in pdf_file.lower() or is_summary_report(first_page_text): + # Treat this as a Summary Report + return extract_summary_report(pdf_path) + elif is_condition_report(first_page_text): + return None + else: + raise NotImplementedError("Implement me") + + +def is_condition_report(text): + """ + Determines if the provided text indicates that the PDF is a Condition Report. + """ + return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport") + + +def main(): + """ + This code prepares the data for the Warm Homes: Social Housing Fund Wave 3, for Stonewater. + """ + # List only directories in the specified FILE_PATH + survey_folders = [] + + # Loop over each survey folder and list its contents + for i in range(1, NUM_FOLDERS + 1): + folder_path = os.path.join(CUSTOMER_FOLDER_PATH, f"StonewaterSurveys_{i}") + if os.path.isdir(folder_path): # Check if folder exists + folder_contents = [os.path.join(f"StonewaterSurveys_{i}", file) for file in os.listdir(folder_path)] + survey_folders.extend(folder_contents) # Append contents to the master list + + # Get rid of .DS_Store files + survey_folders = [folder for folder in survey_folders if not folder.endswith(".DS_Store")] + + extracted_data = [] + for survey_folder in tqdm(survey_folders): + survey_folder_path = os.path.join(CUSTOMER_FOLDER_PATH, survey_folder) + + # List the folders inside of the survey folder + survey_subfolders = [name for name in os.listdir(survey_folder_path) + if os.path.isdir(os.path.join(survey_folder_path, name))] + + # Check if there's a "retrofit assessment" folder + retrofit_folder = next((name for name in survey_subfolders if "retrofit assessment" in name.lower()), None) + + ra_folder = next( + (name for name in survey_subfolders if "ra coordinator info" in name.lower() or "ra info" in name.lower()), + None + ) + + # If retrofit assessment folder exists, check if it has content + if retrofit_folder or ra_folder: + if retrofit_folder: + retrofit_folder_path = os.path.join(survey_folder_path, retrofit_folder) + else: + retrofit_folder_path = os.path.join(survey_folder_path, ra_folder) + + # Check if everything inside is a sub-folder and the number of folders is 2 + items = [item for item in os.listdir(retrofit_folder_path) if item != '.DS_Store'] + all_folders = [os.path.isdir(os.path.join(retrofit_folder_path, item)) for item in items] + if all(all_folders) and len(all_folders) == 2 and "Property Pics" in items: + # Get the folder that isn't Property Pics + retrofit_folder_path = os.path.join( + retrofit_folder_path, [item for item in items if item != "Property Pics"][0] + ) + + if os.listdir(retrofit_folder_path): # If not empty + summary_data = extract_retrofit_pdfs(data_folder_path=retrofit_folder_path) + if summary_data: + summary_data = { + "survey_folder": survey_folder, + **summary_data, + } + extracted_data.append(summary_data) + continue + else: + # Then we have an empty Retrofit Assessment folder + continue + + # If no retrofit folder or it was empty, check files in survey_folder + + summary_data = extract_retrofit_pdfs(data_folder_path=survey_folder_path) + if not summary_data: + if len(survey_subfolders) == 1: + survey_folder_path = os.path.join(survey_folder_path, survey_subfolders[0]) + summary_data = extract_retrofit_pdfs(data_folder_path=survey_folder_path) + + if summary_data: + summary_data = { + "survey_folder": survey_folder, + **summary_data, + } + extracted_data.append(summary_data) + + extracted_data = pd.DataFrame(extracted_data) + + extracted_data["Primary Energy Use (kWh/yr)"] = ( + extracted_data["Primary Energy Use Intensity (kWh/m2/yr)"] * extracted_data["Total Floor Area (m2)"] + ) + extracted_data["Current SAP Rating"] = extracted_data["Current SAP Rating"].astype(int) + extracted_data["Current EPC Band"] = extracted_data["Current SAP Rating"].apply(sap_to_epc) + + # Remove some definite duplicates + dupes = extracted_data[extracted_data["Address"].duplicated()]["Address"] + dupes = extracted_data[extracted_data["Address"].isin(dupes)] + dupes = dupes.sort_values("Address") + # Get all of the folders that end with ROSS + to_drop = dupes[dupes["survey_folder"].str.endswith("ROSS")]["survey_folder"].unique().tolist() + + extracted_data = extracted_data[ + ~extracted_data["survey_folder"].isin( + [ + "StonewaterSurveys_10/4 Beech Road, LUTON, LU1 1DP ROSS", + "StonewaterSurveys_2/135 Runley Road, LUTON, LU1 1TX ROSS", + "StonewaterSurveys_13/7 Saxon Road, LUTON, LU3 1JR ROSS" + ] + to_drop + ) + ] + + # We now merge on the coordinator data so that against each property, we can map the measures + retrofit_packages_board = pd.read_excel( + os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater 3.0 Updated SAP Pre & Modelled 29.10.24.xlsx"), + header=4 + ) + retrofit_packages_board = retrofit_packages_board[~pd.isnull(retrofit_packages_board["Name"])] + # Take just the rows that have been surveyed + retrofit_packages_board = retrofit_packages_board[ + retrofit_packages_board["RA"].isin(["Invoiced", "Completed"]) + ] + + # Replace \n with "" + extracted_data["Postcode"] = extracted_data["Postcode"].str.replace("\n", "") + + manual_filters = { + "Flat 21 Walmer Street": "StonewaterSurveys_14/91-1-Flat 21 Walmer Street-HR4 9JD", + "6 Cornewall Close": "StonewaterSurveys_14/aa 6, Cornewall Close, Moccas, HEREFORD, HR2 9LG", + "2 Bromyard Road": "StonewaterSurveys_4/192-9-2 Bromyard Road-WR15 8BZ", + 'Flat 18, 1 Raglan Court': "StonewaterSurveys_13/60-3-18 Raglan Court, 1 Raglan Court-MK41 8QT", + '14 Raglan Court, 1 Devizes Avenue': 'StonewaterSurveys_12/55-3-14 Raglan Court, Devizes Avenue-MK41 8QT', + '19 South Road': 'StonewaterSurveys_4/19 The Oaks, South Road, SMETHWICK, B67 7BY', + 'Flat 12 Pelican Lane': 'StonewaterSurveys_1/121-3-Flat 12 Lynton Court, Pelican Lane-RG14 1NN', + 'Flat C, 44 St Leonards Avenue': 'StonewaterSurveys_11/427-2-44c St. Leonards Avenue-MK42 0RB', + '16 The Crescent, Kington': 'StonewaterSurveys_9/360-3-16 The Crescent-HR5 3AS', + '2 School Lane, Leominster': 'StonewaterSurveys_5/224-1-2 School Lane-HR6 8AA', + '14 South Road': 'StonewaterSurveys_2/14 The Oaks, South Road, SMETHWICK, B67 7BY', + '1 Groves Street': 'StonewaterSurveys_4/19-5-1 Groves Street-SN2 2BW', + # '2 Sorrell Place': '', + # '72 St Ives Road': '', + # '1 The Close, Burton Gardens': '', + # '102 Cheaton Close': '', + # 'Flat 16 Spring Gardens': '', + # '4 Apple Close': '', + '25 Folly Lane': '', + + } + + # We now match this retrofit packages board to the extracted data + matching_lookup = [] + for _, home in tqdm(retrofit_packages_board.iterrows(), total=len(retrofit_packages_board)): + # Handle the case that has the wrong postcode in the asset data + if home["Name"] in manual_filters: + filtered = extracted_data[extracted_data["survey_folder"] == manual_filters[home["Name"]]].copy() + else: + filtered = extracted_data[extracted_data["Postcode"].str.lower() == home["Postcode"].lower()].copy() + + # We check that home["Name"] is contained in the survey_folder, after removing punctuation and spaces + to_filter = filtered["survey_folder"].str.replace(r"[^\w\s]", "").str.contains( + home["Name"].replace(r"[^\w\s]", "").replace("Flat", "").lstrip(), case=False + ) + if to_filter.sum() == 0: + to_filter = filtered["survey_folder"].str.replace(r"[^\w\s]", "").str.replace(",", "").str.replace(".", + "").str.contains( + home["Name"].replace(r"[^\w\s]", "").replace(",", ""), case=False + ) + filtered = filtered[to_filter] + + if filtered.empty: + continue + + if filtered.shape[0] == 1: + matching_lookup.append( + { + "survey_folder": filtered["survey_folder"].values[0], + "Address ID": home["Address ID"], + "Name": home["Name"] + } + ) + continue + + # home["Name"] should be contained in the survey_folder + filtered = filtered[filtered["survey_folder"].str.contains(home["Name"], case=False)] + # We have an edge case wher some properties have two outputs in Sharepoint + if home["Name"] == "197 Granby Court" and home["Postcode"] == "MK1 1NQ": + raise Exception("Fix me1") + # filtered = filtered[filtered["survey_folder"] == "113-1-197 Granby Court-MK1 1NQ"] + + if home["Name"] == '1 Cluny Way' and home["Postcode"] == 'SG15 6ZB': + raise Exception("Fix me2") + # filtered = filtered[filtered["survey_folder"] == "12-1-1 Cluny Way-SG15 6ZB"] + + if home["Name"] == '2 Bromyard Road' and home["Postcode"] == 'WR15 8BZ': + filtered = filtered[filtered["survey_folder"] == "StonewaterSurveys_4/192-9-2 Bromyard Road-WR15 8BZ"] + + if filtered.empty: + continue + if filtered.shape[0] != 1: + raise Exception("something went wrong") + + matching_lookup.append( + { + "survey_folder": filtered["survey_folder"].values[0], + "Address ID": home["Address ID"], + "Name": home["Name"] + } + ) + + matching_lookup = pd.DataFrame(matching_lookup) + # Find Osmosis IDs that are in the packages board but not in the matching looking + missing_ids = set(retrofit_packages_board["Address ID"]) - set(matching_lookup["Address ID"]) + missing_ids = list(missing_ids) + if missing_ids: + # We check that the missing ids have no data yet + if len(missing_ids) != 8: + raise Exception("Unacceptable number of missings") + + if matching_lookup["Address ID"].duplicated().sum(): + raise Exception("Duplicate Address IDs") + + if matching_lookup["survey_folder"].duplicated().sum(): + raise Exception("Duplicate survey folders") + + measure_columns = [ + 'Main Wall Insulation', + 'Secondary Wall Insulation', + 'Loft insulation', + 'Flat Roof', + 'Room in Roof', + 'Window Upgrade', + 'Door Upgrade', + 'Ventilation', + 'Main Heating', + 'Water Heating', + 'Heating Controls', + 'Solar PV', + 'Other measures' + ] + + # We should end up with a 1:1 mapping between the Osm. ID and the survey folder + stonewater_data = extracted_data.merge(matching_lookup, on="survey_folder", how="inner").merge( + retrofit_packages_board[ + [ + "Name", + "RA", + "Address ID", + "Archetype ID", + "Arch. Group Rank", + "Actual SAP Band", + "Actual SAP Rating", + "Modelled SAP Band", + "Modelled SAP Rating", + "Package Ref", + ] + measure_columns + ], + on=["Address ID", "Name"], + how="left" + ) + + if stonewater_data["Address ID"].duplicated().sum(): + raise Exception("Duplicate Address IDs") + + # Create a section for costs + for measure in measure_columns: + stonewater_data[f"Cost of {measure}"] = None + + stonewater_data["Total Cost of Measures"] = None + stonewater_data["Contingency Cost"] = None + stonewater_data["Total Cost of Measures inc Contingency"] = None + + # We've appended the recommended packages and modelled SAP ratings to the data + # We also want to append the windows data + windows_data = pd.read_excel( + os.path.join( + CUSTOMER_FOLDER_PATH, + "Window data included AP Copy Stonewater SHDF_3_0_Board Triage Master Filtered 26.07.24.xlsx" + ), + header=12 + ) + + windows_data = windows_data[windows_data["Address ID"] != "Address ID"] + windows_data = windows_data[~pd.isnull(windows_data["Address ID"])] + + # We get a lookup id of Osm.ID and when the windows were fitted + windows_data = windows_data[ + ["Address ID", "Window attributes - Fitted/renewed date", + "Parent Asset Window attributes - Fitted/renewed date"] + ] + # Convert to string for the moment + windows_data["Parent Asset Window attributes - Fitted/renewed date"] = windows_data[ + "Parent Asset Window attributes - Fitted/renewed date" + ].astype(str) + # Create a single date column + windows_data["Fitted/renewed date"] = np.where( + pd.notnull(windows_data["Window attributes - Fitted/renewed date"]), + windows_data["Window attributes - Fitted/renewed date"], + windows_data["Parent Asset Window attributes - Fitted/renewed date"] + ) + # Convert to a date + windows_data["Fitted/renewed date"] = pd.to_datetime(windows_data["Fitted/renewed date"]) + # Calculate the number of years since something was done on the windows + windows_data["Years since fitted/renewed"] = (pd.Timestamp.now() - windows_data[ + "Fitted/renewed date"]).dt.days / 365 + + stonewater_data["Package Includes Windows"] = ~pd.isnull(stonewater_data["Window Upgrade"]) + windows_data["Address ID"] = windows_data["Address ID"].astype(float) + stonewater_data = stonewater_data.merge(windows_data, on="Address ID", how="left") + + if stonewater_data["Address ID"].duplicated().sum(): + raise Exception("Duplicate Address IDs") + + # Save this data to excel + stonewater_data.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - costed retrofit packages.xlsx", index=False) + + cost_sheet = [ + { + "measure": "EWI 0.30 w.m2.K", "cost": 298.35, "unit": "m2" + }, + { + "measure": "CWI RdSAP Default", "cost": 14.21, "unit": "m2" + }, + { + "measure": "Poss Extract CWI & Refill (issues identified)", "cost": 14.21 + 25, "unit": "m2" + }, + { + "measure": "IWI 0.30 w.m2.K", "cost": 244.80, "unit": "m2" + }, + { + "measure": "EWI/IWI 0.3", "cost": (298.35 + 244.8) / 2, "unit": "m2" + }, + { + "measure": "Loft Insulation 0.11 w.m2.K", "cost": 16.07, "unit": "m2" + }, + { + "measure": "Flat Roof 0.11 w.m2.K", "cost": 195, "unit": "m2" + }, + { + "measure": "DG Window 1.30 w.m2.K", "cost": 1140, "unit": "each" + }, + { + "measure": "Secondary 2.40", "cost": 974, "unit": "each" + }, + { + "measure": "Ins. Door 1.30 w.m2.K", "cost": None, "unit": "each" + }, + { + "measure": "Ins. Door 1.40 w.m2.K", "cost": None, "unit": "each" + }, + { + "measure": "DMEV", "cost": 900, "unit": "each" + }, + { + "measure": "ASHP Vaillant 102607 5kw", "cost": None, "unit": "each" + }, + { + "measure": "HHRSH Quantum 150", "cost": None, "unit": "each" + }, + { + "measure": "Dual Stat Tank 210lt 50mm Foam", "cost": None, "unit": "each" + }, + { + "measure": "Dual Stat Tank 160lt 50mm Foam", "cost": None, "unit": "each" + }, + { + "measure": "Dual Stat Tank 110lt 50mm Foam", "cost": None, "unit": "each" + }, + { + "measure": "Smart Thermostat", "cost": 1200, "unit": "each" + }, + { + "measure": "TRV's", "cost": 350, "unit": "each" + }, + { + "measure": "Solar PV - 3.0kwp", "cost": 4365.0, "unit": "each" + }, + { + "measure": "Solar PV - 1.5kwp", "cost": 3881, "unit": "each" + }, + { + "measure": "LEL", "cost": 35, "unit": "per bulb" + }, + { + "measure": "Roof 0.16 - Walls 0.30", "cost": 180, "unit": "floor area m2" + }, + { + "measure": "Roof 0.16 - Walls 0.16", "cost": 180, "unit": "floor area m2" + }, + ] + cost_sheet = pd.DataFrame(cost_sheet) + + # Save cost sheet - ideally this will be used as a secondary sheet for Stonewater + cost_sheet.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - cost sheet.xlsx", index=False) + + # stonewater_data[~pd.isnull(stonewater_data["Room in Roof"])]["survey_folder"].values + + create_proposed_wave_3_bid( + costed_packages_filepath=os.path.join( + CUSTOMER_FOLDER_PATH, "Stonewater - Costed Retrofit Packages 20241030 (WIP) MR Review v1.xlsx" + ), + archetypes_sheet_filepath=os.path.join( + CUSTOMER_FOLDER_PATH, "Stonewater SHDF_3_0_Board Triage 22.05.24 - Archetyped V3.1.xlsx" + ) + ) + + +def create_proposed_wave_3_bid(costed_packages_filepath, archetypes_sheet_filepath): + # We read in the costed packages + # Note: Header as 12 is for Matt Ratcliff's reviewed version + costed_packages = pd.read_excel(costed_packages_filepath, header=13, sheet_name="Modelled Packages") + + archetypes_to_cost = costed_packages[ + [ + "Name", "Address ID", "Archetype ID", "Current SAP Rating", "Current EPC Band", "Modelled SAP Band", + "Modelled SAP Rating", "Package Ref", 'Total Cost of Measures', 'Contingency Cost', + 'Total Cost of Measures inc Contingency', 'Main Roof Type', 'Main Roof Insulation', + 'Main Roof Insulation Thickness', 'Existing Primary Heating System', + 'Existing Primary Heating PCDF Reference' + ] + ].copy() + + # Combine 'Main Roof Type', 'Main Roof Insulation', 'Main Roof Insulation Thickness', separating by colons! + archetypes_to_cost['Surveyed Main Roof'] = ( + archetypes_to_cost['Main Roof Type'] + ': ' + archetypes_to_cost['Main Roof Insulation'] + ': ' + + archetypes_to_cost['Main Roof Insulation Thickness'].astype(str) + ) + + # Combine the heating systems, separating by colons! + archetypes_to_cost['Surveyed Main Heating'] = ( + archetypes_to_cost['Existing Primary Heating System'] + ': code - ' + archetypes_to_cost[ + 'Existing Primary Heating PCDF Reference'].astype(str) + ) + + archetypes_to_cost = archetypes_to_cost.drop( + columns=['Main Roof Type', 'Main Roof Insulation', 'Main Roof Insulation Thickness', + 'Existing Primary Heating System', + 'Existing Primary Heating PCDF Reference']) + + # We take properties that are EPC D and below (61% of units) + archetypes_to_cost = archetypes_to_cost[archetypes_to_cost["Current EPC Band"].isin(["D", "E", "F", "G"])] + + archetypes_to_cost["Has been modelled"] = ~pd.isnull(archetypes_to_cost["Modelled SAP Band"]) + + average_cost = archetypes_to_cost[ + archetypes_to_cost["Has been modelled"] + ]['Total Cost of Measures inc Contingency'].mean() + print(average_cost) + + # These are the Arhetypes that will likely be suitable for Wave 3 + archetypes_sheet = pd.read_excel(archetypes_sheet_filepath, header=4) + archetypes_sheet = archetypes_sheet[~pd.isnull(archetypes_sheet["Address ID"])] + archetypes_sheet = archetypes_sheet[archetypes_sheet["Address ID"] != "Address ID"] + archetypes_sheet["Address ID"] = archetypes_sheet["Address ID"].astype(int) + + # We merge the property details onto the costed archetypes + archetypes_to_cost = archetypes_to_cost.merge( + archetypes_sheet[["Address ID", "Property Type", "Wall Type", "Roof Type", "Heating"]], + on="Address ID", + how="left" + ) + + proposed_sample = archetypes_sheet[archetypes_sheet["Archetype ID"].isin(archetypes_to_cost["Archetype ID"])] + + proposed_sample = proposed_sample[ + [ + "Name", "Postcode", "UPRN", "UDPRN", "Address ID", "Osm. ID", "Archetype ID", + "Property Type", "Wall Type", "Roof Type", "Heating" + ] + ] + + # We classify into high and low confidence + + match_classification = [] + for _, home in tqdm(proposed_sample.iterrows(), total=len(proposed_sample)): + + surveyed = archetypes_to_cost[archetypes_to_cost["Archetype ID"] == home["Archetype ID"]].copy() + surveyed["Package Ref"] = surveyed["Package Ref"].astype(str) + + package = " or ".join(sorted([x for x in surveyed["Package Ref"].unique() if x.strip()])) + package = package.replace("\n", "") + + surveyed_roofs = " or ".join(sorted([x for x in surveyed["Surveyed Main Roof"].unique() if x.strip()])) + surveyed_roofs = surveyed_roofs.replace("\n", "") + + surveyed_heating = " or ".join(sorted([x for x in surveyed["Surveyed Main Heating"].unique() if x.strip()])) + surveyed_heating = surveyed_heating.replace("\n", "") + + # We now check if we have a perfect match + surveyed = surveyed[ + (surveyed["Property Type"] == home["Property Type"]) & + (surveyed["Wall Type"] == home["Wall Type"]) & + (surveyed["Roof Type"] == home["Roof Type"]) & + (surveyed["Heating"] == home["Heating"]) + ] + + if surveyed.empty: + if package == "2B2A": + raise Exception("Fix me") + match_classification.append( + { + "Address ID": home["Address ID"], + "Match to Surveyed": "Approximate", + "Proposed Package Ref": package, + "Surveyed Archetype Roofs": surveyed_roofs, + "Surveyed Archetype Heating": surveyed_heating + } + ) + continue + # Re-do + package = " or ".join(sorted([x for x in surveyed["Package Ref"].unique() if x.strip()])) + package = package.replace("\n", "") + surveyed_roofs = " or ".join(sorted([x for x in surveyed["Surveyed Main Roof"].unique() if x.strip()])) + surveyed_roofs = surveyed_roofs.replace("\n", "") + surveyed_heating = " or ".join(sorted([x for x in surveyed["Surveyed Main Heating"].unique() if x.strip()])) + surveyed_heating = surveyed_heating.replace("\n", "") + + match_classification.append( + { + "Address ID": home["Address ID"], + "Match to Surveyed": "Exact", + "Proposed Package Ref": package, + "Surveyed Archetype Roofs": surveyed_roofs, + "Surveyed Archetype Heating": surveyed_heating + } + ) + + match_classification = pd.DataFrame(match_classification) + + proposed_sample = proposed_sample.merge( + match_classification, + on="Address ID", + how="left", + ) + + # Merge on the cost per archetype + cost_per_archetype = ( + archetypes_to_cost.groupby("Archetype ID")[['Total Cost of Measures inc Contingency']].mean().reset_index() + ) + proposed_sample = proposed_sample.merge( + cost_per_archetype, + on="Archetype ID", + how="left" + ) + + # We add on a boolean to indicate if a property from that archetype has been modelled + proposed_sample = proposed_sample.merge( + archetypes_to_cost.groupby("Archetype ID")[["Has been modelled"]].any().reset_index(), + on="Archetype ID", + how="left" + ) + + proposed_sample["Total Cost of Measures inc Contingency"] = np.where( + ~proposed_sample["Has been modelled"], + None, proposed_sample["Total Cost of Measures inc Contingency"] + ) + + # Save excel + proposed_sample.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - Proposed Wave 3 Bid (WIP).xlsx", index=False) + +# if __name__ == "__main__": +# main() diff --git a/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt b/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt new file mode 100644 index 00000000..97314b32 --- /dev/null +++ b/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt @@ -0,0 +1,4 @@ +PyPDF2 +pandas +tqdm +openpyxl diff --git a/etl/xml_survey_extraction/XmlParser.py b/etl/xml_survey_extraction/XmlParser.py index ffe191a4..ef8daf51 100644 --- a/etl/xml_survey_extraction/XmlParser.py +++ b/etl/xml_survey_extraction/XmlParser.py @@ -9,7 +9,8 @@ from etl.xml_survey_extraction.pcdb import heating_data PROPERTY_TYPE_LOOKUP = { "0": "House", "House": "House", - "2": "Flat" + "2": "Flat", + "3": "Maisonette", } @@ -107,11 +108,13 @@ class XmlParser: BUILT_FORM_MAP = { "1": "Detached", + "2": "Semi-Detached", "3": "End-Terrace", "4": "Mid-Terrace", } GLAZED_AREA_MAP = { + "2": "More than Typical", "4": "Much More Than Typical" } @@ -120,7 +123,9 @@ class XmlParser: } TRANSACTION_TYPE_MAP = { - "13": "ECO assessment" + "5": "Rented (social)", + "13": "ECO assessment", + "14": "Stock condition survey", } TENURE_MAP = { @@ -131,7 +136,8 @@ class XmlParser: TARIFF_MAP = { "1": "Dual", - "2": "Single" + "2": "Single", + "3": "Unknown" } def __init__(self, file, filekey, surveyor_company, uprn=None): @@ -400,8 +406,13 @@ class XmlParser: ] wall_areas = sum([float(f["heat_loss_perimeter"]) * float(f["room_height"]) for f in main_dwelling_floors]) - window_areas = sum([float(w["window_area"]) for w in main_dwelling_windows]) - return wall_areas - window_areas + window_areas = [float(w["window_area"]) for w in main_dwelling_windows if w["window_area"] is not None] + if not window_areas: + # We discount 10% of the wall area + insulation_wall_area = wall_areas * 0.9 + else: + insulation_wall_area = wall_areas - sum(window_areas) + return insulation_wall_area def extract_additional_data(self): @@ -415,7 +426,8 @@ class XmlParser: main_dwelling_windows = [w for w in self.windows if w["window_location"] == "0"] number_of_windows = len(main_dwelling_windows) - windows_area = sum([float(w["window_area"]) for w in main_dwelling_windows]) + windows_area = [float(w["window_area"]) for w in main_dwelling_windows if w["window_area"] is not None] + windows_area = sum(windows_area) if windows_area else None boolean_lookup = { "true": True, @@ -427,6 +439,7 @@ class XmlParser: cylinder_insulation_type = { None: "", "1": "Foam", + "2": "Jacket" } cylinder_insulation_thickness = int( @@ -461,7 +474,7 @@ class XmlParser: "cylinder_thermostat": cylinder_thermostat, "main_dwelling_ground_floor_area": float(main_dwelling_ground_floor_area), "number_of_windows": int(number_of_windows), - "windows_area": float(windows_area), + "windows_area": float(windows_area) if windows_area is not None else windows_area, } def get_node_value(self, tag_name): @@ -769,9 +782,10 @@ class XmlParser: :return: """ - sap_windows = self.xml.getElementsByTagName("SAP-Windows")[0].getElementsByTagName("SAP-Window") - glazing_type_lookup = { + "ND": "Single glazing", + "1": "double glazing installed before 2002", + "2": "double glazing installed during or after 2002", "3": "double glazing, unknown install date", "5": "Single glazing", } @@ -787,6 +801,40 @@ class XmlParser: "8": "North West" } + sap_windows = self.xml.getElementsByTagName("SAP-Windows") + + if not sap_windows: + # We look for Multi-Glazed-Proportion + multiple_glazing_type = self.xml.getElementsByTagName("SAP-Property-Details")[0].getElementsByTagName( + "Multiple-Glazing-Type" + )[0].firstChild.nodeValue + + pvc_frame = self.xml.getElementsByTagName("SAP-Property-Details")[0].getElementsByTagName( + "PVC-Window-Frames" + ) + + pvc_frame = pvc_frame[0].firstChild.nodeValue if pvc_frame else None + + multple_glazed_proportion = self.xml.getElementsByTagName("SAP-Property-Details")[0].getElementsByTagName( + "Multiple-Glazed-Proportion" + )[0].firstChild.nodeValue + + self.windows = [ + { + "window_location": "0", + "window_area": None, + "window_type": None, + "glazing_type": glazing_type_lookup[multiple_glazing_type], + "pvc_frame": pvc_frame, + "glazing_gap": None, + "orientation": None, + "multple_glazed_proportion": multple_glazed_proportion + } + ] + return + + sap_windows = sap_windows[0].getElementsByTagName("SAP-Window") + self.windows = [ self._parse_windows_content( window=window,