diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index 75f28ceb..e2b7d933 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -792,9 +792,14 @@ class GoogleSolarApi: property_instance = [p for p in input_properties if p.id == unit["property_id"]][0] # At this level, we check if the property is suitable for solar and if now, skip # Or if we have a solar non-invasive recommendation + + non_invasive_rec = next( + (r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"), {} + ).get("array_wattage") + if ( (not property_instance.is_solar_pv_valid()) or - [r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"] + non_invasive_rec is not None ): continue diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 3b6f3985..4a5b3bd4 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -394,7 +394,7 @@ async def trigger_plan(body: PlanTriggerRequest): logger.info("Getting the inputs") plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) # Check for duplicate UPRNS - input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x] + input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")] if input_uprns: # Check for dupes if len(input_uprns) != len(set(input_uprns)): diff --git a/etl/customers/aiha/bid_numbers.py b/etl/customers/aiha/bid_numbers.py new file mode 100644 index 00000000..96859f99 --- /dev/null +++ b/etl/customers/aiha/bid_numbers.py @@ -0,0 +1,92 @@ +""" +This is an adhoc script, used to pull together some of the figures that are being included in the +Warm Homes: Social Housing Wave 3 funding application +""" + +import pandas as pd +import numpy as np + +aiha_all_units = pd.read_excel( + "/Users/khalimconn-kowlessar/Downloads/AIHA Measures Packages 2024_11_13.xlsx", + sheet_name="All Properties - AIHA", + header=2 +) +modelled_units = pd.read_excel( + "/Users/khalimconn-kowlessar/Downloads/AIHA Measures Packages 2024_11_13.xlsx", + sheet_name="Modelled Properties - Measures", + header=5 +) +aiha_all_units = aiha_all_units.drop(columns=['Unnamed: 0', 'Unnamed: 1']) +aiha_extracted_property_data = pd.read_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/extracted_property_data.csv" +) +aiha_wave_3_units = aiha_all_units[aiha_all_units["Expected Package Cost"].astype(float) > 0] +# TODO: The EPC C property isn't a C! +aiha_epc_breakdown = aiha_wave_3_units["Expected EPC Rating"].replace({"D or E": "E"}).value_counts() +# For CAHA +caha_epc_breakdown = modelled_units[ + modelled_units['Survey Key'].str.contains("CAHA") +]['Current EPC Rating'].value_counts() +# For Hornsey +hornsey_epc_breakdown = modelled_units[ + modelled_units['Survey Key'].str.contains("HORNSEY") +]['Current EPC Rating'].value_counts() + +aiha_original_asset_data = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/240924- KSQ & Domna Info Merge - AIHA - SHDF Wave 3 " + "bid - Supplementary information.xlsx", + sheet_name="Archetyping Data", + header=2 +) + +# Get the units in the bid: +aiha_wave_3_features = aiha_original_asset_data[ + ['Address letter or number', 'Street address', 'Postcode', "Wall type", + "Property type", "built-form", "floor"] +].merge( + aiha_wave_3_units[['Address letter or number', 'Street address', 'Postcode']], + how="inner", + on=["Address letter or number", "Street address", "Postcode"] +) + +wall_type_breakdown = aiha_wave_3_features["Wall type"].value_counts() +property_type_breakdown = aiha_wave_3_features.groupby(["Property type", "floor"]).size().reset_index() + +# Hornsey data - contained in original asset list +hornsey_asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/SHDF - Template - EOI - Hornsey Housing " + "Trust.xlsx", + sheet_name="Ksquared-All units information", + header=3 +) + +# We don't need the first row +hornsey_asset_list = hornsey_asset_list.iloc[1:] +# Fill NA values with empty strings +hornsey_asset_list = hornsey_asset_list.fillna("") +hornsey_asset_list["Address letter or number"] = hornsey_asset_list["Address letter or number"].astype( + str +).str.strip() +hornsey_asset_list["Postcode"] = hornsey_asset_list["Postcode"].astype(str).str.strip() +hornsey_asset_list["Street address"] = hornsey_asset_list["Street address"].astype(str).str.strip() +# Replace double spaces +for col in ["Address letter or number", "Street address", "Postcode"]: + hornsey_asset_list[col] = hornsey_asset_list[col].str.replace(" ", " ") + +hornsey_asset_list = hornsey_asset_list[hornsey_asset_list["Address letter or number"] != ""] + +hornsey_asset_list["Wall Type Cleaned"] = np.where( + hornsey_asset_list["Wall type"].str.contains("Cavity"), + "Cavity", + "Solid" +) + +hornsey_asset_list["Property type"].value_counts() + +# CAHA +caha_epc_data = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/caha_extracted_property_data.xlsx" +) + +caha_epc_data["property_type"].value_counts() +caha_epc_data["wall_type"].value_counts() diff --git a/etl/customers/aiha/xml_extraction.py b/etl/customers/aiha/xml_extraction.py index f96744ec..44baef80 100644 --- a/etl/customers/aiha/xml_extraction.py +++ b/etl/customers/aiha/xml_extraction.py @@ -92,9 +92,13 @@ def main(): # THis is the data we need for the AIHA project measures_data = extracted_surveys[ - ["survey_key", "address", "postcode", "current-energy-efficiency", "current-energy-rating", "number_of_floors"] + ["survey_key", "address", "postcode", "current-energy-efficiency", "current-energy-rating", + "number_of_floors", "walls-description", "property-type", "built-form"] ] measures_data = measures_data.sort_values("survey_key", ascending=True) + measures_data.to_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/extracted_property_data.csv", + ) # Note: # The properties will still have "Very poor" ratings for their hot water diff --git a/etl/customers/ksquared/Wave3 Modelling.py b/etl/customers/ksquared/Wave3 Modelling.py index b96b261f..96ea2b03 100644 --- a/etl/customers/ksquared/Wave3 Modelling.py +++ b/etl/customers/ksquared/Wave3 Modelling.py @@ -1,9 +1,12 @@ import os import time +import re +from etl.epc.settings import EARLIEST_EPC_DATE from dotenv import load_dotenv from tqdm import tqdm import pandas as pd +import numpy as np from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc from backend.SearchEpc import SearchEpc from utils.s3 import save_csv_to_s3 @@ -12,9 +15,10 @@ load_dotenv(dotenv_path="backend/.env") EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") USER_ID = 8 PORTFOLIO_ID = 117 +CAHA_PORTFOLIO_ID = 118 -def app(): +def hornsey(): """ This script prepares the asset lists for the additional housing associations, CAHA and Hornsey Housing Trust, that are forming a consortium led by AIHA @@ -43,6 +47,12 @@ def app(): hornsey_asset_list = hornsey_asset_list[hornsey_asset_list["Address letter or number"] != ""] + hornsey_asset_list["Wall Type Cleaned"] = np.where( + "Cavity" in hornsey_asset_list["Wall type"], + "Cavity", + "Solid" + ) + missed_uprns = { "Flat 13A Stowell House": 100021213098, "Flat 24 Stowell House": 100021213110, @@ -156,3 +166,225 @@ def app(): "exclusions": ["boiler_upgrade"] } print(body) + + +def caha(): + caha_asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/Copy of AIHA - WHSHF Wave 3 bid - Consortium " + "member properties - CAHA.xlsx", + sheet_name="Ksquared-All units information", + header=3 + ) + + caha_asset_list = caha_asset_list.iloc[1:] + # Fill NA values with empty strings + caha_asset_list = caha_asset_list.fillna("") + caha_asset_list["Address letter or number"] = caha_asset_list["Address letter or number"].astype( + str + ).str.strip() + + # We Add POstcode as it wasn't populated - split on space and take the last two entries and re-concatenate on space + caha_asset_list["Street address"] = caha_asset_list["Street address"].str.strip() + caha_asset_list["Postcode"] = caha_asset_list["Street address"].str.split(" ").str[-2:].str.join(" ") + # Take just the columns we need + caha_asset_list = caha_asset_list[["Address letter or number", "Street address", "Postcode"]] + + for col in ["Address letter or number", "Street address", "Postcode"]: + caha_asset_list[col] = caha_asset_list[col].str.replace(" ", " ") + + # Pull the data from find my epc + remap = { + "Flat A, 50 Talbot Road N6 4QP": "50a Talbot Road", + "Flat A, 51 First Avenue EN1 1BN": "51a, First Avenue", + "Flat B, 51 First Avenue EN1 1BN": "51b, First Avenue" + } + + def remap_address(address): + # Match patterns like 'Flat A, 30 Grove Park Road' + match = re.match(r'Flat (\w), (\d+) (.+)', address) + if match: + flat_letter = match.group(1) # e.g., 'A' + number = match.group(2) # e.g., '30' + rest_of_address = match.group(3) # e.g., 'Grove Park Road' + + # Format the new address as '30A Grove Park Road' + return f"{number}{flat_letter} {rest_of_address}" + + # If pattern doesn't match, return original address + return address + + extracted_data = [] + asset_list = [] + for _, home in tqdm(caha_asset_list.iterrows(), total=len(caha_asset_list)): + if home["Street address"] == "35 Stanford road N11 3HY" and home["Address letter or number"] == "": + continue + + if home["Street address"] == "29 Victoria Avenue N3 1BD" and home["Address letter or number"] == "": + continue + + if home["Street address"] == "11 Victoria Avenue N3 1BD" and home["Address letter or number"] == "Flat A": + continue + + if home["Street address"] == "11 Victoria Avenue N3 1BD" and home["Address letter or number"] == "Flat C": + continue + + if home["Street address"] == "10 Forest Gardens N17 6XA" and home["Address letter or number"] == "Flat C": + continue + + if home["Street address"] == "219 Cann Hall Road E11 3NJ" and home["Address letter or number"] == "Flat B": + continue + + unit_number = home["Address letter or number"] + street = home["Street address"] + postcode = home["Postcode"] + address = ", ".join([x for x in [unit_number, street] if x]) + address = remap.get(address, address) + address = address.replace(postcode, "").strip() + if "Victoria Avenue" not in address: + address = remap_address(address) + + find_epc_searcher = RetrieveFindMyEpc(address=address, postcode=postcode) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data(sap_2012_date=EARLIEST_EPC_DATE) + time.sleep(0.5) + # We need uprn + searcher = SearchEpc( + address1=address, + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + full_address=address, + ) + searcher.find_property(skip_os=True) + newest_epc = searcher.newest_epc + + uprn = newest_epc["uprn"] + if address in ["Flat D, 11 Victoria Avenue", "Flat B, 11 Victoria Avenue"]: + uprn = None + + extracted_data.append( + { + "uprn": uprn, + **find_epc_data, + } + ) + + asset_list.append( + { + "uprn": uprn, + "address": address, + "postcode": home["Postcode"], + "property_type": newest_epc["property-type"], + "wall_type": newest_epc["walls-description"], + "built_form": newest_epc["built-form"], + "flat_storey_count": newest_epc['flat-storey-count'], + } + ) + + non_invasive_recommendations = [ + { + "uprn": r["uprn"], + "recommendations": r["recommendations"] + } for r in extracted_data + ] + # for r in non_invasive_recommendations: + # new_recommendations = [] + # extracted = [r for r in extracted_data if r["uprn"] == r["uprn"]][0] + # for rec in r["recommendations"]: + # if extracted["hotwater-description"] == "Gas boiler/circulator, no cylinder thermostat": + # if rec["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]: + # continue + # rec["survey"] = False + # new_recommendations.append(rec) + # r["recommendations"] = new_recommendations + + # We model the two properties separately + asset_list = pd.DataFrame(asset_list) + # Drop Flat D, 11 Victoria Avenue + asset_list1 = asset_list[asset_list["address"] != "Flat D, 11 Victoria Avenue"] + asset_list2 = asset_list[asset_list["address"] == "Flat D, 11 Victoria Avenue"] + + # Store the asset list in s3 + filename = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list1.csv" + save_csv_to_s3( + dataframe=asset_list1, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + filename2 = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list2.csv" + save_csv_to_s3( + dataframe=asset_list2, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename2 + ) + + # Store the non-invasive recommendations in s3 + non_invasive_recommendations_filename = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/non_invasive_recommendations.csv" + save_csv_to_s3( + dataframe=pd.DataFrame(non_invasive_recommendations), + bucket_name="retrofit-plan-inputs-dev", + file_name=non_invasive_recommendations_filename + ) + + body = { + "portfolio_id": str(CAHA_PORTFOLIO_ID), + "housing_type": "Social", + "goal": "Increasing EPC", + "goal_value": "C", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": non_invasive_recommendations_filename, + "valuation_file_path": "", + "scenario_name": "Wave 3 Packages", + "multi_plan": True, + "budget": None, + "exclusions": ["boiler_upgrade"] + } + print(body) + + body2 = { + "portfolio_id": str(CAHA_PORTFOLIO_ID), + "housing_type": "Social", + "goal": "Increasing EPC", + "goal_value": "C", + "trigger_file_path": filename2, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": non_invasive_recommendations_filename, + "valuation_file_path": "", + "scenario_name": "Wave 3 Packages", + "multi_plan": True, + "budget": None, + "exclusions": ["boiler_upgrade"] + } + print(body2) + + # + asset_list3 = [ + { + "address": "10b Forest Gardens", "postcode": "N17 6XA", "uprn": 100021180197 + } + ] + filename3 = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list3.csv" + save_csv_to_s3( + dataframe=pd.DataFrame(asset_list3), + bucket_name="retrofit-plan-inputs-dev", + file_name=filename3 + ) + body3 = { + "portfolio_id": str(119), + "housing_type": "Social", + "goal": "Increasing EPC", + "goal_value": "C", + "trigger_file_path": filename3, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": "", + "valuation_file_path": "", + "scenario_name": "Wave 3 Packages", + "multi_plan": True, + "budget": None, + "exclusions": ["boiler_upgrade"] + } + print(body3) diff --git a/etl/customers/southend/epc_data_pull_2024_11_14.py b/etl/customers/southend/epc_data_pull_2024_11_14.py new file mode 100644 index 00000000..14cd73be --- /dev/null +++ b/etl/customers/southend/epc_data_pull_2024_11_14.py @@ -0,0 +1,235 @@ +import os +import time + +import pandas as pd +from tqdm import tqdm + +from dotenv import load_dotenv +from utils.s3 import read_excel_from_s3 +from backend.SearchEpc import SearchEpc +from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes + +from recommendations.recommendation_utils import ( + estimate_perimeter, + estimate_external_wall_area, + estimate_number_of_floors +) + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + + +def get_data(asset_list): + epc_data = [] + errors = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + try: + postcode = home["Postcode"] + address1 = home["address1"].split(",")[0] + full_address = home["Address"] + + searcher = SearchEpc( + address1=str(address1), + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5 + ) + # Force the skipping of estimating the EPC + searcher.ordnance_survey_client.property_type = None + searcher.ordnance_survey_client.built_form = None + + searcher.find_property(skip_os=True) + if searcher.newest_epc is None: + continue + + # Look for EPC recommendatons + try: + property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"]) + except: + property_recommendations = {"rows": []} + + epc = { + "row_id": home["row_id"], + **searcher.newest_epc.copy(), + "recommendations": property_recommendations["rows"] + } + + epc_data.append(epc) + except Exception as e: + errors.append(home["row_id"]) + time.sleep(5) + + return epc_data, errors + + +def app(): + """ + This app is EPC pulling data for some properties owned by Livewest + + Data request contents: + Date of last EPC + Reason for EPC + SAP score on register + Property Type + Property Area + Property Age + Any Dimensions (HLP,PW,RH) + Property Wall Construction + Heating Type + Secondary Heating + Loft Insulation Depth + + Additional if possible: + Heat loss calculations + EPC recommendations + Property UPRN + + """ + asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/southend/Southend Planned programme.xlsx", + header=0, + sheet_name="Planned RM" + ) + asset_list["row_id"] = asset_list.index + asset_list["address1"] = asset_list["Address"].str.split(",").str[0] + + epc_data, errors = get_data(asset_list) + + # We now retrieve any failed properties + asset_list_failed = asset_list[asset_list["row_id"].isin(errors)] + epc_data_failed, _ = get_data(asset_list_failed) + + # Append the failed data to the main data + epc_data.extend(epc_data_failed) + + epc_df = pd.DataFrame(epc_data) + + # We expand out the recommendations + recommendations_df = epc_df[["row_id", "recommendations"]] + + unique_recommendations = set() + for _, row in recommendations_df.iterrows(): + unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) + + columns = ["row_id"] + list(unique_recommendations) + transformed_data = [] + for _, row in recommendations_df.iterrows(): + # Initialize a dictionary for this row with False for all recommendations + row_data = {col: False for col in columns} + row_data["row_id"] = row["row_id"] + + # Set True for each recommendation present in this row + for rec in row["recommendations"]: + recommendation_text = rec["improvement-summary-text"] + row_data[recommendation_text] = True + + # Append the row data to transformed_data + transformed_data.append(row_data) + + transformed_df = pd.DataFrame(transformed_data) + # Drop the column that is "" + transformed_df = transformed_df.drop(columns=[""]) + + # Retrieve just the data we need + epc_df = epc_df[ + [ + "row_id", + "uprn", + "property-type", + "built-form", + "inspection-date", + "current-energy-rating", + "current-energy-efficiency", + "roof-description", + "walls-description", + "transaction-type", + # New fields needed + "secondheat-description", + "total-floor-area", + "construction-age-band", + "floor-height", + "number-habitable-rooms", + "mainheat-description", + # + "energy-consumption-current", # kwh/m2 + "photo-supply", + ] + ] + + asset_list = asset_list.merge( + epc_df, + how="left", + on="row_id" + ).merge( + transformed_df, + how="left", + on="row_id" + ) + + asset_list = asset_list.drop(columns=["row_id"]) + + # Rename the columns + asset_list = asset_list.rename(columns={ + "inspection-date": "Date of last EPC", + "current-energy-efficiency": "SAP score on register", + "current-energy-rating": "EPC rating on register", + "property-type": "Property Type", + "built-form": "Archetype", + "total-floor-area": "Property Floor Area", + "construction-age-band": "Property Age Band", + "floor-height": "Property Floor Height", + "number-habitable-rooms": "Number of Habitable Rooms", + "walls-description": "Wall Construction", + "roof-description": "Roof Construction", + "mainheat-description": "Heating Type", + "secondheat-description": "Secondary Heating", + "transaction-type": "Reason for last EPC", + "energy-consumption-current": "Heat Demand (kWh/m2)", + "photo-supply": "% of the Roof with PV" + }) + + asset_list["Estimated Number of Floors"] = asset_list.apply( + lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull( + x["Property Type"]) else None, axis=1 + ) + + asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float) + # Replace "" value with None + asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None) + asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float) + + asset_list["Estimated Perimeter (m)"] = asset_list.apply( + lambda x: estimate_perimeter( + floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"], + num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"], + ), axis=1 + ) + + asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply( + lambda x: estimate_external_wall_area( + num_floors=x["Estimated Number of Floors"], + floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5, + perimeter=x["Estimated Perimeter (m)"], + built_form=x["Archetype"] + ), + axis=1 + ) + + asset_list["Roof Insulation Thickness"] = asset_list.apply( + lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull( + x["Roof Construction"]) else None, + axis=1 + ) + + # Store as an excel + filename = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/southend/southend EPC Data pull - 14 Nov " + "2024.xlsx") + asset_list.to_excel(filename, index=False) + + asset_list["% of the Roof with PV"].value_counts() + + asset_list[asset_list["% of the Roof with PV"] == "50.0"][["Address", "Postcode"]] diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index 889d8f88..a5bbff7b 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -37,6 +37,78 @@ def sap_to_epc(sap_points: int | float): return "G" +def extract_wall_details_summary(text): + """ + Extracts wall type, insulation, dry-lining, and thickness for each building part, + including any alternative wall details within the 7.0 Walls section of the summary PDF text. + """ + # Define data structure to hold all building part wall entries + wall_data = [] + + # Locate the entire 7.0 Walls section + wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1) + + # Define pattern to match each building part's wall entry within the section + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)\n" # Matches main wall Type + r"Insulation\s+(.*?)\n" # Matches main wall Insulation + r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining + r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown + r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness + re.DOTALL + ) + + # Define pattern to capture alternative wall details, if present + alternative_wall_pattern = re.compile( + r"Alternative Wall Area.*?\n" # Matches start of alternative wall section + r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type + r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation + r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining + r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown + r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness + re.DOTALL + ) + + # Find all building part entries within the 7.0 Walls section + for match in building_part_pattern.finditer(wall_section): + wall_label = match.group(1).strip() + main_wall_type = match.group(2).strip() + main_wall_insulation = match.group(3).strip() + main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A" + main_wall_thickness_unknown = match.group(6).strip() + main_wall_thickness = int(match.group(7)) + + # Initialize dictionary for this wall entry + wall_entry = { + "Building Part": wall_label, + "Wall Type": main_wall_type, + "Wall Insulation": main_wall_insulation, + "Wall Dry-lining": main_wall_dry_lining, + "Wall Thickness Unknown": main_wall_thickness_unknown, + "Wall Thickness (mm)": main_wall_thickness, + "Alternative Wall Type": None, + "Alternative Wall Insulation": None, + "Alternative Wall Dry-lining": "N/A", + "Alternative Wall Thickness Unknown": None, + "Alternative Wall Thickness (mm)": None, + } + + # Check if there's an alternative wall section following this wall entry + alt_match = alternative_wall_pattern.search(wall_section, match.end()) + if alt_match: + wall_entry["Alternative Wall Type"] = alt_match.group(1).strip() + wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip() + wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A" + wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip() + wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6)) + + # Append each building part as a dictionary in the wall_data list + wall_data.append(wall_entry) + + return wall_data + + def extract_summary_report(pdf_path): """ Extracts specific data from the provided PDF file. @@ -45,6 +117,7 @@ def extract_summary_report(pdf_path): - Fuel Bill - Address """ + data = { "Address": None, "Postcode": None, @@ -80,6 +153,14 @@ def extract_summary_report(pdf_path): "Main Roof Type": None, "Main Roof Insulation": None, "Main Roof Insulation Thickness": None, + "Main Wall Type": None, + "Main Wall Insulation": None, + "Main Wall Dry-lining": None, + "Main Wall Thickness": None, + "Main Building Alternative Wall Type": None, + "Main Building Alternative Wall Insulation": None, + "Main Building Alternative Wall Dry-lining": None, + "Main Building Alternative Wall Thickness": None, } with (open(pdf_path, "rb") as file): @@ -229,6 +310,18 @@ def extract_summary_report(pdf_path): insulation_thickness_match.strip() if insulation_thickness_match else None ) + walls_data = extract_wall_details_summary(text) + # Get the main building wall data + main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0] + data["Main Wall Type"] = main_building_walls["Wall Type"] + data["Main Wall Insulation"] = main_building_walls["Wall Insulation"] + data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"] + data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"] + data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"] + data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"] + data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"] + data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"] + return data @@ -498,10 +591,64 @@ def extract_roof_details_epr(text): return roof_data +def extract_wall_details_epr(text): + """ + Extracts wall type, insulation, dry-lining, and thickness for each building part + in the provided EPR PDF text. + """ + # Define data structure to hold results + wall_data = [] + + # Locate each building part section + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", + re.DOTALL + ) + + # Extract each building part's data, including wall details + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + + # Clean up the building part name + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + + part_details = match.group(2) + + # Extract Wall Type, Wall Insulation, Wall Dry-lining, and Wall Thickness + wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details) + wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details) + wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details) + wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details) + + # Extract Alternative Wall information if available + alt_wall_type_match = re.search(r"Alternative Wall Type:\s*(.*?)(?=\n|$)", part_details) + alt_wall_insulation_match = re.search(r"Alternative Wall Insulation:\s*(.*?)(?=\n|$)", part_details) + alt_wall_drylining_match = re.search(r"Alternative Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details) + alt_wall_thickness_match = re.search(r"Alternative Wall Thickness:\s*(\d+)(?=\n|$)", part_details) + + # Store results for this building part + wall_data.append({ + "Building Part": cleaned_part_name, + "Wall Type": wall_type_match.group(1).strip() if wall_type_match else None, + "Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None, + "Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None, + "Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None, + "Alternative Wall Type": alt_wall_type_match.group(1).strip() if alt_wall_type_match else None, + "Alternative Wall Insulation": alt_wall_insulation_match.group( + 1).strip() if alt_wall_insulation_match else None, + "Alternative Wall Dry-lining": alt_wall_drylining_match.group( + 1).strip() if alt_wall_drylining_match else None, + "Alternative Wall Thickness": int(alt_wall_thickness_match.group(1)) if alt_wall_thickness_match else None, + }) + + return wall_data + + def extract_epr(pdf_path): """ Extracts specific data from an Energy Report (EPR) PDF file. """ + data = { "Address": None, "Postcode": None, @@ -539,6 +686,14 @@ def extract_epr(pdf_path): "Main Roof Type": None, "Main Roof Insulation": None, "Main Roof Insulation Thickness": None, + "Main Wall Type": None, + "Main Wall Insulation": None, + "Main Wall Dry-lining": None, + "Main Wall Thickness": None, + "Main Building Alternative Wall Type": None, + "Main Building Alternative Wall Insulation": None, + "Main Building Alternative Wall Dry-lining": None, + "Main Building Alternative Wall Thickness": None, } with open(pdf_path, "rb") as file: @@ -664,6 +819,17 @@ def extract_epr(pdf_path): data["Main Roof Insulation"] = main_roof_details[0]["Roof Insulation"] data["Main Roof Insulation Thickness"] = main_roof_details[0]["Roof Insulation Thickness"] + wall_details = extract_wall_details_epr(text) + main_wall_details = [w for w in wall_details if "Main" in w["Building Part"]][0] + data["Main Wall Type"] = main_wall_details["Wall Type"] + data["Main Wall Insulation"] = main_wall_details["Wall Insulation"] + data["Main Wall Dry-lining"] = main_wall_details["Wall Dry-lining"] + data["Main Wall Thickness"] = main_wall_details["Wall Thickness"] + data["Main Building Alternative Wall Type"] = main_wall_details["Alternative Wall Type"] + data["Main Building Alternative Wall Insulation"] = main_wall_details["Alternative Wall Insulation"] + data["Main Building Alternative Wall Dry-lining"] = main_wall_details["Alternative Wall Dry-lining"] + data["Main Building Alternative Wall Thickness"] = main_wall_details["Alternative Wall Thickness"] + return data @@ -1411,5 +1577,46 @@ def find_remaining_surveys(): assert needed.shape[0] + costed.shape[0] == surveyed.shape[0] + +def append_stonewater_id(): + """ + This completes an adhoc request from Stonewater to add in their organisation Reference onto the model + :return: + """ + + model_proposed_sample = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater - Bid Packages WIP 13.11.24.xlsx", + sheet_name="Modelled Packages", + header=13 + ) + model_proposed_sample = model_proposed_sample[~pd.isnull(model_proposed_sample["Address ID"])] + model_proposed_sample["Address ID"] = model_proposed_sample["Address ID"].astype(int) + z = model_proposed_sample["Archetype ID"].drop_duplicates().sort_values() + + original_archetypes = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 " + "- Archetyped V3.1.xlsx", + header=4 + ) + original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])] + original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"] + original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int) + + matched = model_proposed_sample.merge( + original_archetypes[["Address ID", 'Org. ref.']], + on="Address ID", + how="left" + ) + + if pd.isnull(matched["Org. ref."]).sum(): + raise ValueError("Something went wrong") + + # Save as CSV + matched.to_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater IDs.xlsx", + sheet_name="Proposed Wave 3 Sample", + index=False + ) + # if __name__ == "__main__": # main() diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index dad32bf6..cd76dae4 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -1,3 +1,4 @@ +import pandas as pd import requests from bs4 import BeautifulSoup from datetime import datetime @@ -25,7 +26,7 @@ class RetrieveFindMyEpc: self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower() - def retrieve_newest_find_my_epc_data(self): + def retrieve_newest_find_my_epc_data(self, sap_2012_date=None): """ For a post code and address, we pull out all the required data from the find my epc website """ @@ -188,7 +189,7 @@ class RetrieveFindMyEpc: raise ValueError(f"Missing key: {key}") # Finally, we format the recommendations - recommendations = self.format_recommendations(recommendations) + recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date) resulting_data = { 'epc_certificate': epc_certificate, @@ -204,11 +205,13 @@ class RetrieveFindMyEpc: return resulting_data - def format_recommendations(self, recommendations): + @staticmethod + def format_recommendations(recommendations, assessment_data, sap_2012_date=None): """ This function converts the recommendations to a format that we can use in the engine as a non-intrusive survey - :param recommendations: - :return: + :param recommendations: The recommendations from the EPC + :param assessment_data: The assessment data from the EPC + :param sap_2012_date: The date of the SAP 2012 update """ measure_map = { @@ -217,6 +220,7 @@ class RetrieveFindMyEpc: "Hot water cylinder thermostat": ["cylinder_thermostat"], "High performance external doors": ["insulated_doors"], "Floor insulation (solid floor)": ["solid_floor_insulation"], + "Floor insulation (suspended floor)": ["suspended_floor_insulation"], "Double glazed windows": ["double_glazing"], "Cavity wall insulation": ["cavity_wall_insulation"], "Replace boiler with new condensing boiler": ["boiler_upgrade"], @@ -225,19 +229,42 @@ class RetrieveFindMyEpc: "roomstat_programmer_trvs", "time_temperature_zone_control" ], "Low energy lighting": ["low_energy_lighting"], + "Increase loft insulation to 270 mm": ["loft_insulation"], + "Heating controls (thermostatic radiator valves)": [ + "roomstat_programmer_trvs", "time_temperature_zone_control" + ], + "Solar water heating": ["solar_water_heating"], + "Solar photovoltaic panels, 2.5 kWp": ["solar_pv"], + "Heating controls (room thermostat and TRVs)": [ + "roomstat_programmer_trvs", "time_temperature_zone_control" + ], + "Change heating to gas condensing boiler": ["boiler_upgrade"], + "Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heaters"], + "Flat roof or sloping ceiling insulation": ["flat_roof_insulation"], + "Heating controls (room thermostat)": [ + "roomstat_programmer_trvs", "time_temperature_zone_control" + ], + "Band A condensing boiler": ["boiler_upgrade"], + "Double glazing": ["double_glazing"], } + survey = True + if sap_2012_date is not None: + certificate_date = datetime.strptime(assessment_data["Date of certificate"], "%d %B %Y") + if certificate_date < pd.to_datetime(sap_2012_date): + survey = False + formatted_recommendations = [] for rec in recommendations: - mapped = measure_map[rec["measure"]] for measure in mapped: - formatted_recommendations.append( - { - "type": measure, - "sap_points": rec["sap_points"], - "survey": True - } - ) + to_append = { + "type": measure, + "sap_points": rec["sap_points"], + "survey": survey, + } + if measure == "solar_pv": + to_append["suitable"] = True + formatted_recommendations.append(to_append) return formatted_recommendations diff --git a/recommendations/HotwaterRecommendations.py b/recommendations/HotwaterRecommendations.py index 5ff7ae4f..aed1a5e5 100644 --- a/recommendations/HotwaterRecommendations.py +++ b/recommendations/HotwaterRecommendations.py @@ -60,15 +60,21 @@ class HotwaterRecommendations: # If there is no system present, but access to the mains, we + has_tank_recommendation = [r for r in self.recommendations if r["type"] == "hot_water_tank_insulation"] + if ( (self.property.hotwater["heater_type"] in ["electric immersion"]) & (self.property.data["hot-water-energy-eff"] == "Very Poor") & - (self.property.hotwater["no_system_present"] is None) + (self.property.hotwater["no_system_present"] is None) & + len(has_tank_recommendation) == 0 ): self.recommend_tank_insulation(phase=phase) return - if self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat": + has_cylinder_recommendation = [r for r in self.recommendations if r["type"] == "cylinder_thermostat"] + + if ((self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat") & + (len(has_cylinder_recommendation) == 0)): self.recommend_cylinder_thermostat(phase=phase) return diff --git a/recommendations/SecondaryHeating.py b/recommendations/SecondaryHeating.py index 7c20bcdd..a9d5de04 100644 --- a/recommendations/SecondaryHeating.py +++ b/recommendations/SecondaryHeating.py @@ -10,8 +10,8 @@ class SecondaryHeating: """ # The list of existing heating systems that are accepted - ACCEPTED_MAINHEAT_DESCRIPTIONS = ["Boiler and radiators, mains gas"] - ACCEPTED_SECONDHEAT_DESCRIPTIONS = ["Room heaters, electric"] + ACCEPTED_MAINHEAT_DESCRIPTIONS = ["Boiler and radiators, mains gas", "Electric storage heaters"] + ACCEPTED_SECONDHEAT_DESCRIPTIONS = ["Room heaters, electric", 'Portable electric heaters (assumed)'] # These are the heaters where works are required to remove them FIXED_HEATER_DESCRIPTIONS = ["Room heaters, electric"] @@ -34,7 +34,7 @@ class SecondaryHeating: if self.property.data['secondheat-description'] in self.FIXED_HEATER_DESCRIPTIONS: # We have an associated cost otherwise, there is no cost - n_rooms = self.property.data['number-heated-rooms'] + n_rooms = self.property.data['number-habitable-rooms'] - self.property.data['number-heated-rooms'] else: n_rooms = 0