diff --git a/etl/customers/aiha/bid_numbers.py b/etl/customers/aiha/bid_numbers.py index 96859f99..b371e2e5 100644 --- a/etl/customers/aiha/bid_numbers.py +++ b/etl/customers/aiha/bid_numbers.py @@ -52,6 +52,20 @@ aiha_wave_3_features = aiha_original_asset_data[ wall_type_breakdown = aiha_wave_3_features["Wall type"].value_counts() property_type_breakdown = aiha_wave_3_features.groupby(["Property type", "floor"]).size().reset_index() +aiha_wave_3_features[aiha_wave_3_features["Property type"] == "Flat"][["Street address", "Postcode"]] + +# 4 Yetev Lev Court  ... Semi-Detached mid - Medium +# B 86 Bethune Road ... Mid-Terrace top. - Low +# A 80 Bethune Road ... Mid-Terrace ground. - Low +# B 80 Bethune Road ... \n \n - Low +# A 9 Clapton Common ... Semi-Detached ground. - Low +# C 9 Clapton Common ... End-Terrace \n. - Low +# B 89 Manor Road ... \n \n. - Low +# A 6 Northfield Road ... Detached top. - Low +# 13 Northfield Rd ... Semi-Detached \n - Low +# A 73 Manor Road ... End-Terrace \n - Low +# B 73 Manor Road ... Detached top - Low + # Hornsey data - contained in original asset list hornsey_asset_list = pd.read_excel( "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/SHDF - Template - EOI - Hornsey Housing " @@ -88,5 +102,5 @@ caha_epc_data = pd.read_excel( "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/caha_extracted_property_data.xlsx" ) -caha_epc_data["property_type"].value_counts() -caha_epc_data["wall_type"].value_counts() +caha_epc_data[caha_epc_data["address"] != "33 Woodhouse Road"]["property_type"].value_counts() +caha_epc_data[caha_epc_data["address"] != "33 Woodhouse Road"]["wall_type"].value_counts() diff --git a/etl/customers/ksquared/Wave3 Modelling.py b/etl/customers/ksquared/Wave3 Modelling.py index 96ea2b03..7bfa33b3 100644 --- a/etl/customers/ksquared/Wave3 Modelling.py +++ b/etl/customers/ksquared/Wave3 Modelling.py @@ -8,6 +8,7 @@ from tqdm import tqdm import pandas as pd import numpy as np from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc +from etl.spatial.OpenUprnClient import OpenUprnClient from backend.SearchEpc import SearchEpc from utils.s3 import save_csv_to_s3 @@ -60,6 +61,7 @@ def hornsey(): } extracted_data = [] asset_list = [] + hornsey_asset_list["row_id"] = hornsey_asset_list.index for _, home in tqdm(hornsey_asset_list.iterrows(), total=len(hornsey_asset_list)): if home["Address letter or number"] == "Flat 1 36 Haringey Park": @@ -108,12 +110,24 @@ def hornsey(): asset_list.append( { "uprn": newest_epc["uprn"], + "row_id": home["row_id"], "address": home["Address letter or number"], "postcode": home["Postcode"], "property_type": "Flat", # They're all flats } ) + # Get conservation area data + # uprns = [x["uprn"] for x in extracted_data] + # conservation_area_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev") + # + # addresses = pd.DataFrame(asset_list) + # addresses["uprn"] = addresses["uprn"].astype(int) + # conservation_area_df = conservation_area_data.merge(addresses, how="left", right_on="uprn", left_on="UPRN") + # conservation_area_df.to_csv( + # "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/hornsey_conservation_area_data.csv" + # ) + # We format the extracted data so that is has the same structure as non-intrusive recommendations # We then get the UPRNs and create the asset list @@ -213,6 +227,8 @@ def caha(): # If pattern doesn't match, return original address return address + caha_asset_list["row_id"] = caha_asset_list.index + extracted_data = [] asset_list = [] for _, home in tqdm(caha_asset_list.iterrows(), total=len(caha_asset_list)): @@ -270,6 +286,7 @@ def caha(): asset_list.append( { + "row_id": home["row_id"], "uprn": uprn, "address": address, "postcode": home["Postcode"], @@ -280,6 +297,24 @@ def caha(): } ) + # Missing row ids + missed = [r for r in caha_asset_list["row_id"].tolist() if r not in [x["row_id"] for x in asset_list]] + + no_data = [x for x in asset_list if x["uprn"] in [None, ""]] + no_data = pd.DataFrame(no_data) + + # Get conservation area data + uprns = [x["uprn"] for x in extracted_data if x["uprn"] not in ["", None]] + conservation_area_data = OpenUprnClient.get_spatial_data([100022526362], "retrofit-data-dev") + + addresses = pd.DataFrame(asset_list) + addresses["uprn"] = addresses["uprn"].astype(str) + conservation_area_data["UPRN"] = conservation_area_data["UPRN"].astype(str) + conservation_area_df = conservation_area_data.merge(addresses, how="left", right_on="uprn", left_on="UPRN") + conservation_area_df.to_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/caha_conservation_area_data.csv" + ) + non_invasive_recommendations = [ { "uprn": r["uprn"], diff --git a/etl/customers/remote_assessments/app.py b/etl/customers/remote_assessments/app.py index a0d01f7d..59e0e868 100644 --- a/etl/customers/remote_assessments/app.py +++ b/etl/customers/remote_assessments/app.py @@ -1,7 +1,7 @@ import pandas as pd from utils.s3 import save_csv_to_s3 -PORTFOLIO_ID = 111 +PORTFOLIO_ID = 120 USER_ID = 8 @@ -13,10 +13,11 @@ def app(): asset_list = [ { - "uprn": 100050770761, - "address": "12 Sheardown Street", - "postcode": "DN4 0BH" + "uprn": 100030334057, + "address": "5, Lynton Street", + "postcode": "DE22 3RW" } + ] asset_list = pd.DataFrame(asset_list) @@ -30,11 +31,22 @@ def app(): non_invasive_recommendations = [ { - "uprn": 100050770761, + "uprn": 100030334057, "recommendations": [ { - "type": "extension_cavity_wall_insulation", + "type": "internal_wall_insulation", + "sap_points": 9, + "survey": True + }, + { + "type": "external_wall_insulation", + "sap_points": 9, + "survey": True + }, + { + "type": "suspended_floor_insulation", "sap_points": 2, + "survey": True } ] } @@ -49,8 +61,8 @@ def app(): valuation_data = [ { - "uprn": 100050770761, - "value": 67_000 + "uprn": 100030334057, + "value": 133_000 } ] # Store valuation data to s3 diff --git a/etl/customers/southend/epc_data_pull_2024_11_14.py b/etl/customers/southend/epc_data_pull_2024_11_14.py index 14cd73be..11ddcc6f 100644 --- a/etl/customers/southend/epc_data_pull_2024_11_14.py +++ b/etl/customers/southend/epc_data_pull_2024_11_14.py @@ -229,7 +229,3 @@ def app(): filename = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/southend/southend EPC Data pull - 14 Nov " "2024.xlsx") asset_list.to_excel(filename, index=False) - - asset_list["% of the Roof with PV"].value_counts() - - asset_list[asset_list["% of the Roof with PV"] == "50.0"][["Address", "Postcode"]] diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index a5bbff7b..b6c29863 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -5,6 +5,8 @@ import pandas as pd import numpy as np from tqdm import tqdm from collections import Counter +from scipy.optimize import linprog +from utils.s3 import read_pickle_from_s3 CUSTOMER_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater" SURVEY_FOLDERS = os.path.join(CUSTOMER_FOLDER_PATH, "StonewaterSurveys_{i}") @@ -117,7 +119,7 @@ def extract_summary_report(pdf_path): - Fuel Bill - Address """ - + data = { "Address": None, "Postcode": None, @@ -289,26 +291,11 @@ def extract_summary_report(pdf_path): data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] - roof_section = re.search(r"8\.0 Roofs:\n(.*?)\n9\.0 Floors:", text, re.DOTALL) - roof_text = roof_section.group(1).strip() - roof_type_match = re.search(r"Type\s*([A-Za-z0-9\s]+)", roof_text) - data["Main Roof Type"] = roof_type_match.group(1).strip() if roof_type_match else None - - # Check if "Insulation" exists between Type and Insulation Thickness - insulation_search = re.search( - r"Type\s+.*?\n(Insulation\s+(.*?)\n)?(Insulation Thickness\s+(.*?)\n)", roof_text, re.DOTALL - ) - - if insulation_search: - # Insulation match will be present if it exists, otherwise it will be None - insulation_match = insulation_search.group(2) # Optional group for Insulation - insulation_thickness_match = insulation_search.group(4) # Required group for Insulation Thickness - - # Populate insulation fields - data["Main Roof Insulation"] = insulation_match.strip() if insulation_match else None - data["Main Roof Insulation Thickness"] = ( - insulation_thickness_match.strip() if insulation_thickness_match else None - ) + extracted_roof_data = extract_roof_details_summary(text) + main_roof_data = [roof for roof in extracted_roof_data if "Main" in roof["Building Part"]][0] + data["Main Roof Type"] = main_roof_data["Roof Type"] + data["Main Roof Insulation"] = main_roof_data["Roof Insulation"] + data["Main Roof Insulation Thickness"] = main_roof_data["Roof Insulation Thickness"] walls_data = extract_wall_details_summary(text) # Get the main building wall data @@ -591,6 +578,54 @@ def extract_roof_details_epr(text): return roof_data +def extract_roof_details_summary(text): + """ + Extracts roof type, insulation, and insulation thickness for each building part + in the 8.0 Roofs section of the summary report. + """ + # Define data structure to hold results + roof_data = [] + + # Locate the entire 8.0 Roofs section + roof_section_match = re.search(r"8\.0 Roofs:\n(.*?)(?=\n9\.0 Floors:|$)", text, re.DOTALL) + if not roof_section_match: + return roof_data # Return empty if no roof section is found + + # Extract the roof section and append "9.0 Floors:" as the boundary + roof_section = roof_section_match.group(1).strip() + "\n9.0 Floors:" + + # Define pattern to match each building part's roof entry + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)(?=\n(?:Insulation|9\.0 Floors:|[A-Z]))" # Matches Roof Type until the next field, label, or end + r"(?:\nInsulation\s+(.*?)(?=\n(?:Insulation Thickness|9\.0 Floors:|[A-Z])))?" # Optional Insulation + r"(?:\nInsulation Thickness\s+(.*?)(?=\n(?:9\.0 Floors:|[A-Z])))?", # Optional Insulation Thickness + re.DOTALL + ) + + # Extract each building part's data + for match in building_part_pattern.finditer(roof_section): + part_name = match.group(1).strip() # Building part label + roof_type = match.group(2).strip() # Roof Type + roof_insulation = match.group(3).strip() if match.group(3) else None # Optional Insulation + roof_insulation_thickness = match.group(4).strip() if match.group(4) else None # Optional Thickness + + # Cleaning to handle annoying cases when it comes out like this: + # 'A Another dwelling above\n1st Extension' + if roof_type.startswith("A Another dwelling above"): + roof_type = "A Another dwelling above" + + # Store results for this building part + roof_data.append({ + "Building Part": part_name, + "Roof Type": roof_type, + "Roof Insulation": roof_insulation, + "Roof Insulation Thickness": roof_insulation_thickness, + }) + + return roof_data + + def extract_wall_details_epr(text): """ Extracts wall type, insulation, dry-lining, and thickness for each building part @@ -694,6 +729,7 @@ def extract_epr(pdf_path): "Main Building Alternative Wall Insulation": None, "Main Building Alternative Wall Dry-lining": None, "Main Building Alternative Wall Thickness": None, + "Main Fuel": None } with open(pdf_path, "rb") as file: @@ -1036,10 +1072,13 @@ def main(): ] # We now merge on the coordinator data so that against each property, we can map the measures + # TODO: Get the pre & post primary energy numbers + # TODO: Make sure the numbers are going down + retrofit_packages_board = pd.read_excel( os.path.join( CUSTOMER_FOLDER_PATH, - "Stonewater_SHDF_3_0_Board_work_in_progress_-_Operations_1731315080 11.11.24.xlsx" + "Stonewater_SHDF_3_0_Board_work_in_progress_-_Operations_1732034933 Final 19.11.24.xlsx" ), header=4 ) @@ -1048,6 +1087,17 @@ def main(): retrofit_packages_board = retrofit_packages_board[ retrofit_packages_board["RA"].isin(["Invoiced", "Completed"]) ] + # populated_primary_energy = retrofit_packages_board[ + # ~pd.isnull(retrofit_packages_board['BASE Primary energy (13a-272)']) + # ] + # + # z = populated_primary_energy[ + # populated_primary_energy['POST Primary energy (13a - 272)'] > populated_primary_energy[ + # 'BASE Primary energy (13a-272)'] + # ] + # + # all(populated_primary_energy['POST Primary energy (13a - 272)'] <= populated_primary_energy[ + # 'BASE Primary energy (13a-272)']) # Replace \n with "" extracted_data["Postcode"] = extracted_data["Postcode"].str.replace("\n", "") @@ -1157,7 +1207,7 @@ def main(): # missed[["Name", "Postcode", "Archetype ID", "Arch. Group Rank"]].to_csv( # CUSTOMER_FOLDER_PATH + "/missed_debugging.csv") - if len(missing_ids) != 6: + if len(missing_ids) != 1: raise Exception("Unacceptable number of missings") if matching_lookup["Address ID"].duplicated().sum(): @@ -1204,7 +1254,6 @@ def main(): if stonewater_data["Address ID"].duplicated().sum(): raise Exception("Duplicate Address IDs") - # Create a section for costs for measure in measure_columns: stonewater_data[f"Cost of {measure}"] = None @@ -1262,8 +1311,41 @@ def main(): ]: stonewater_data[c] = stonewater_data[c].astype(str) + # FIll the primary energy numbers from the excel + stonewater_data = stonewater_data.merge( + retrofit_packages_board[ + [ + "Name", "Address ID", "BASE Primary energy (13a-272)", "POST Primary energy (13a - 272)" + ] + ], + on=["Address ID", "Name"], + how="left" + ) + stonewater_data["Primary Energy Use (kWh/yr)"] = np.where( + pd.isnull(stonewater_data["Primary Energy Use (kWh/yr)"]), + stonewater_data["BASE Primary energy (13a-272)"], + stonewater_data["Primary Energy Use (kWh/yr)"] + ) + stonewater_data = stonewater_data.drop(columns=["BASE Primary energy (13a-272)"]) + + # Add on organisation reference + original_archetypes = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 " + "- Archetyped V3.1.xlsx", + header=4 + ) + original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])] + original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"] + original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int) + + stonewater_data = stonewater_data.merge( + original_archetypes[["Address ID", 'Org. ref.']], + on="Address ID", + how="left" + ) + # Save this data to excel - stonewater_data.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - costed retrofit packages V2.xlsx", index=False) + stonewater_data.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - costed retrofit packages V4.xlsx", index=False) cost_sheet = [ { @@ -1618,5 +1700,896 @@ def append_stonewater_id(): index=False ) + +def propsed_wave_3_sample(): + """ + Stonewater want to ensure that the properties that when selecting properties for wave 3, they choose properties + such that most of the properties within a geographical area are treatable within the bid. + Name, if we take a geographical area (which could be postal region) they want the most, and ideally all, of the + properties within that geographical area to be included within the bid + :return: + """ + + asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 " + "- Archetyped V3.1.xlsx", + header=4 + ) + + # TODO: We drop 7 properties missing + # UPRN + asset_list = asset_list[~asset_list["Archetype ID"].isin(["MISSING UPRN"])] + # Clean address ids + asset_list = asset_list[~pd.isnull(asset_list["Address ID"])] + asset_list = asset_list[asset_list["Address ID"] != "Address ID"] + asset_list["Address ID"] = asset_list["Address ID"].astype(int) + + asset_list["Street name"] = np.where( + pd.isnull(asset_list["Street name"]), + asset_list["Postcode"], + asset_list["Street name"] + ) + + # Create the postal region, taking the first part of the postcode + asset_list["Postal Region"] = asset_list["Postcode"].str.split(" ").str[0] + asset_list["Street and Region"] = asset_list["Street name"] + " " + asset_list["Postal Region"] + unique_postal_regions = asset_list["Postal Region"].unique() + + # Keep just the columns we need + asset_list = asset_list[ + ["UPRN", "Address ID", 'Org. ref.', "Archetype ID", "Postal Region", "Name", "Postcode", "Street and Region", + "Property Type", "Wall Type", "Roof Type", "Heating"] + ] + + survey_results = pd.read_excel( + os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.19 V2.xlsx"), + header=13, + sheet_name="Modelled Packages" + ) + + survey_results = survey_results[ + [ + "Address ID", "Archetype ID", "Current SAP Rating", "Current EPC Band", "Postcode", + "Main Roof Type", "Main Roof Insulation", "Main Roof Insulation Thickness", + "Existing Primary Heating System", + "Package Ref", + "Main Wall Type", "Main Wall Insulation Type", "Main Wall Thickness", + "Main Building Alternative Wall Type", "Main Building Alternative Wall Insulation", + "Main Building Alternative Wall Thickness" + ] + ].rename( + columns={ + "Existing Primary Heating System": "Survey: Primary Heating System" + } + ) + + survey_results["Postal Region"] = survey_results["Postcode"].str.split(" ").str[0] + # Concatenate from the wall information + survey_results["Survey: Main Wall Type"] = survey_results["Main Wall Type"].astype(str) + ": " + survey_results[ + "Main Wall Insulation Type"].astype(str) + # Alternative wall + survey_results["Survey: Main Alternative Wall"] = ( + survey_results["Main Building Alternative Wall Type"].astype(str) + ": " + survey_results[ + "Main Building Alternative Wall Insulation"].astype(str) + ) + # Roof information + survey_results["Survey: Main Roof Type"] = survey_results["Main Roof Type"].astype(str) + ": " + survey_results[ + "Main Roof Insulation"].astype(str) + ": " + survey_results["Main Roof Insulation Thickness"].astype(str) + + # Drop the individual columns: + survey_results = survey_results.drop( + columns=[ + "Main Roof Type", "Main Roof Insulation", "Main Roof Insulation Thickness", + "Main Wall Type", "Main Wall Insulation Type", + "Main Building Alternative Wall Type", "Main Building Alternative Wall Insulation" + ] + ) + + survey_results_with_original_features = survey_results.merge( + asset_list[["UPRN", "Address ID", "Property Type", "Wall Type", "Roof Type", "Heating"]], + on="Address ID", + how="left" + ) + + if survey_results_with_original_features.shape[0] != survey_results.shape[0]: + raise ValueError("Something went wrong") + + # Against properties that have NO package ref, we assign a package ref + properties_with_packages = survey_results_with_original_features[ + ~pd.isnull(survey_results_with_original_features["Package Ref"]) + ] + + properties_without_packages = survey_results_with_original_features[ + (survey_results_with_original_features["Current SAP Rating"] < 69) & pd.isnull( + survey_results_with_original_features["Package Ref"] + ) + ] + + # Change this to a lookup + package_ratings = pd.DataFrame([ + { + "1A": 1, + "1B": 2, + "2A": 3, + "2B": 4, + "3A": 5, + "3B": 6, + 4: 7 + } + ]) + package_ratings = pd.melt(package_ratings, var_name="Package Ref", value_name="Rank") + + mapped_package_refs = [] + for _, property in tqdm(properties_without_packages.iterrows(), total=len(properties_without_packages)): + # Same archetype? + matches = properties_with_packages[properties_with_packages["Archetype ID"] == property["Archetype ID"]] + + if matches.empty: + # Similar property + matches = properties_with_packages[ + (properties_with_packages["Property Type"].str.split(":").str[0] == + property["Property Type"].split(":")[0]) & + (properties_with_packages["Wall Type"] == property["Wall Type"]) & + (properties_with_packages["Roof Type"].str.split(":").str[0] == property["Roof Type"].split(":")[0]) & + (properties_with_packages["Heating"].str.split(":").str[0] == property["Heating"].split(":")[0]) + ] + if matches.empty: + matches = properties_with_packages[ + (properties_with_packages["Property Type"].str.split(":").str[0] == + property["Property Type"].split(":")[0]) & + (properties_with_packages["Wall Type"].str.split(":").str[0] == property["Wall Type"].split(":")[0]) & + (properties_with_packages["Roof Type"].str.split(":").str[0] == property["Roof Type"].split(":")[0]) & + (properties_with_packages["Heating"].str.split(":").str[0] == property["Heating"].split(":")[0]) + ] + if matches.empty: + raise Exception("Implement me") + if matches.shape[0] > 1: + # Take the package with the highest rank + matches = matches.merge( + package_ratings, + on="Package Ref", + how="left" + ).sort_values("Rank", ascending=False).head(1) + + mapped_package_refs.append( + { + "Address ID": property["Address ID"], + "Matched Package Ref": matches["Package Ref"].values[0] + } + ) + + mapped_package_refs = pd.DataFrame(mapped_package_refs) + + survey_results = survey_results.merge( + mapped_package_refs, + on="Address ID", + how="left" + ) + survey_results["Package Ref"] = np.where( + pd.notnull(survey_results["Matched Package Ref"]), + survey_results["Matched Package Ref"], + survey_results["Package Ref"] + ) + survey_results = survey_results.drop(columns=["Matched Package Ref"]) + + # Do the same with survey_results_with_original_features + survey_results_with_original_features = survey_results_with_original_features.merge( + mapped_package_refs, + on="Address ID", + how="left" + ) + survey_results_with_original_features["Package Ref"] = np.where( + pd.notnull(survey_results_with_original_features["Matched Package Ref"]), + survey_results_with_original_features["Matched Package Ref"], + survey_results_with_original_features["Package Ref"] + ) + survey_results_with_original_features = survey_results_with_original_features.drop(columns=["Matched Package Ref"]) + + # Save the data for reference + # mapped_package_refs = mapped_package_refs.merge( + # asset_list[["Name", "Postcode", "Address ID", "Org. ref."]], + # on="Address ID", + # how="left" + # ) + # mapped_package_refs.to_csv(os.path.join(CUSTOMER_FOLDER_PATH, "mapped_package_refs.csv"), index=False) + + # We get longitude & Latitude + archetyping_spatial_features = read_pickle_from_s3( + bucket_name="retrofit-data-dev", s3_file_name="scustomers/Stonewater/clustering/spatial_data_to_uprn.pkl", + ) + archetyping_spatial_features = pd.concat(archetyping_spatial_features) + archetyping_spatial_features = archetyping_spatial_features[["UPRN", 'LATITUDE', 'LONGITUDE']].rename( + columns={"LATITUDE": "latitude", "LONGITUDE": "longitude"} + ) + # Merge them onto both datasets + asset_list = asset_list.merge( + archetyping_spatial_features, how="left", on="UPRN" + ) + if pd.isnull(asset_list["longitude"]).sum(): + raise ValueError("Something went wrong") + + survey_results_with_original_features = survey_results_with_original_features.merge( + archetyping_spatial_features, how="left", on="UPRN" + ) + if pd.isnull(survey_results_with_original_features["longitude"]).sum(): + raise ValueError("Something went wrong") + + def haversine(lat1, lon1, lat2, lon2): + # Radius of Earth in meters + R = 6371000 + + # Convert degrees to radians + lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) + + # Differences + dlat = lat2 - lat1 + dlon = lon2 - lon1 + + # Haversine formula + a = np.sin(dlat / 2.0) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2.0) ** 2 + c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) + distance = R * c + return distance + + # Tier definitions + # Tier 1: We have a property in the same postal region and same archetype that was surveyed and is below EPC D + # Tier 2: We have a property in the same archetype that was surveyed and is below EPC D + # + + def match_property_to_surveyed(property, survey_results_with_original_features): + surveyed = survey_results_with_original_features[ + ( + survey_results_with_original_features["Postal Region"] == + property["Postal Region"] + ) & + ( + survey_results_with_original_features["Property Type"] == + property["Property Type"] + ) + & + ( + survey_results_with_original_features["Wall Type"].str.split(":").str[0] == + property["Wall Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Roof Type"].str.split(":").str[0] == + property["Roof Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Heating"].str.split(":").str[0] == + property["Heating"].split(":")[0] + ) + ].copy() + + if not surveyed.empty: + return surveyed + + surveyed = survey_results_with_original_features[ + ( + survey_results_with_original_features["Postal Region"] == + property["Postal Region"] + ) & + ( + survey_results_with_original_features["Property Type"].str.split(":").str[0] == + property["Property Type"].split(":")[0] + ) + & + ( + survey_results_with_original_features["Wall Type"].str.split(":").str[0] == + property["Wall Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Roof Type"].str.split(":").str[0] == + property["Roof Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Heating"].str.split(":").str[0] == + property["Heating"].split(":")[0] + ) + ].copy() + + # surveyed = survey_results_with_original_features[ + # ( + # survey_results_with_original_features["Property Type"] == + # property["Property Type"] + # ) & + # ( + # survey_results_with_original_features["Wall Type"] == + # property["Wall Type"] + # ) & + # ( + # survey_results_with_original_features["Roof Type"].str.split(":").str[0] == + # property["Roof Type"].split(":")[0] + # ) & + # ( + # survey_results_with_original_features["Heating"] == + # property["Heating"] + # ) + # ].copy() + + if not surveyed.empty: + return surveyed + + surveyed = survey_results_with_original_features[ + ( + survey_results_with_original_features["Property Type"] == + property["Property Type"] + ) & + ( + survey_results_with_original_features["Wall Type"] == + property["Wall Type"] + ) & + ( + survey_results_with_original_features["Roof Type"].str.split(":").str[0] == + property["Roof Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Heating"].str.split(":").str[0] == + property["Heating"].split(":")[0] + ) + ].copy() + + return surveyed + + def fill_survey_columns(region_assets, suffix): + for col in [ + 'Current EPC Band', 'Current SAP Rating', + 'Survey: Main Wall Type', 'Survey: Main Alternative Wall', + 'Survey: Main Roof Type', 'Survey: Primary Heating System', + 'Survey: Matching Address ID', 'Distance to Closest Match (m)', + "Package Ref" + ]: + region_assets[col] = np.where( + pd.isnull(region_assets[col]) & pd.notnull(region_assets[col + suffix]), + region_assets[col + suffix], region_assets[col] + ) + return region_assets + + survey_attribute_columns = [ + "Survey: Main Wall Type", 'Survey: Main Alternative Wall', 'Survey: Main Roof Type', + 'Survey: Primary Heating System' + ] + + survey_results["Survey: Matching Address ID"] = survey_results["Address ID"].copy() + + results = [] + for region in tqdm(unique_postal_regions): + # Take all of the properties in that region + region_assets = asset_list[asset_list["Postal Region"] == region].copy() + + # We have a tier 1 match if the property itself was surveyed + exact_surveyed = survey_results[ + survey_results["Address ID"].isin(region_assets["Address ID"]) + ] + + region_assets = region_assets.merge( + exact_surveyed[ + ["Address ID", "Current EPC Band", "Current SAP Rating"] + survey_attribute_columns + [ + "Survey: Matching Address ID", "Package Ref" + ] + ], + on="Address ID", + how="left" + ) + region_assets['Distance to Closest Match (m)'] = None + region_assets["Distance to Closest Match (m)"] = np.where( + ~pd.isnull(region_assets["Current EPC Band"]), + 0, + region_assets["Distance to Closest Match (m)"] + ) + + # Label the tier 1 properties + region_assets["Confidence Tier"] = None + region_assets["Confidence Tier"] = np.where( + region_assets["Current EPC Band"].isin(["D", "E", "F", "G"]), + "1 - property was surveyed", region_assets["Confidence Tier"] + ) + + region_assets["Confidence Tier"] = np.where( + region_assets["Current EPC Band"].isin(["C", "B", "A"]), + "5 - property was surveyed", region_assets["Confidence Tier"] + ) + + archetype_ids = region_assets[ + pd.isnull(region_assets["Confidence Tier"]) + ]["Archetype ID"].unique() + # We get the properties that have been surveyed + + region_surveyed = [] + for arch_id in archetype_ids: + for _, property in region_assets[region_assets["Archetype ID"] == arch_id].iterrows(): + archetype_data = survey_results_with_original_features[ + survey_results["Archetype ID"] == arch_id + ].copy() + if archetype_data.empty: + continue + + match_type = "2 - same archetype" + if any(archetype_data["Postal Region"] == property["Postal Region"]): + match_type = "1 - same archetype, same postal region" + archetype_data = archetype_data[ + archetype_data["Postal Region"] == property["Postal Region"] + ] + + if archetype_data.shape[0] > 1: + # Look for an exact match, or as close as possible + archetype_data_filtered = match_property_to_surveyed(property, archetype_data) + if not archetype_data_filtered.empty: + archetype_data = archetype_data_filtered + + archetype_data["distance_meters"] = haversine( + lat1=property.latitude, lon1=property.longitude, + lat2=archetype_data["latitude"].values, lon2=archetype_data["longitude"].values + ) + expected_sap = np.average( + archetype_data["Current SAP Rating"], weights=1 / (archetype_data["distance_meters"] + 1) + ) + expected_epc = sap_to_epc(expected_sap) + + archetype_data = archetype_data.sort_values("distance_meters", ascending=True) + + # We take the features of the closest matching property + closest_match = archetype_data.iloc[0] + + # Set the package ref + if expected_epc in ["C", "B", "A"]: + package_ref = None + else: + package_ref = archetype_data["Package Ref"].dropna().values[0] + + region_surveyed.append( + { + "Archetype ID": arch_id, + "Address ID": property["Address ID"], + "Current EPC Band": expected_epc, + "Current SAP Rating": expected_sap, + 'Survey: Main Wall Type': closest_match["Survey: Main Wall Type"], + 'Survey: Main Alternative Wall': closest_match["Survey: Main Alternative Wall"], + 'Survey: Main Roof Type': closest_match["Survey: Main Roof Type"], + 'Survey: Primary Heating System': closest_match["Survey: Primary Heating System"], + "Survey: Matching Address ID": closest_match["Address ID"], + 'Distance to Closest Match (m)': closest_match["distance_meters"], + "Package Ref": package_ref, + "Match Type": match_type + } + ) + region_surveyed = pd.DataFrame(region_surveyed) + + if region_surveyed.empty: + region_surveyed = pd.DataFrame( + columns=[ + "Archetype ID", "Address ID", "Current EPC Band", "Current SAP Rating", + 'Survey: Main Wall Type', 'Survey: Main Alternative Wall', 'Survey: Main Roof Type', + 'Survey: Primary Heating System', "Survey: Matching Address ID", 'Distance to Closest Match (m)', + "Match Type", "Package Ref" + ] + ) + + starting_shape = region_assets.shape[0] + region_assets = region_assets.merge( + region_surveyed, + on=["Archetype ID", "Address ID"], + how="left", + suffixes=("", "_method1") + ) + if region_assets.shape[0] != starting_shape: + raise ValueError("Something went wrong") + + # Label the tier 1 properties + region_assets["Confidence Tier"] = np.where( + region_assets["Current EPC Band_method1"].isin(["D", "E", "F", "G"]) & + pd.isnull(region_assets["Confidence Tier"]) & ~pd.isnull(region_assets["Match Type"]), + region_assets["Match Type"], region_assets["Confidence Tier"] + ) + + # Handle EPC C + region_assets["Confidence Tier"] = np.where( + region_assets["Current EPC Band_method1"].isin(["C", "B", "F", "G"]) & + pd.isnull(region_assets["Confidence Tier"]), + "5 - EPC C or above", region_assets["Confidence Tier"] + ) + + region_assets = fill_survey_columns(region_assets, suffix="_method1") + + method_1_columns = [c for c in region_assets.columns if c.endswith("_method1")] + region_assets = region_assets.drop(columns=method_1_columns + ["Match Type"]) + + missed_addressids = region_assets[pd.isnull(region_assets["Confidence Tier"])]["Address ID"].unique().tolist() + + if not missed_addressids: + results.append(region_assets) + continue + + # This means that this archetype was never surveyed and so we need to find a sufficiently similar property + final_missed_matches = [] + for a_id in missed_addressids: + + match_type = "3 - compared to similar properties" + + property = asset_list[asset_list["Address ID"] == a_id].squeeze() + + surveyed = match_property_to_surveyed(property, survey_results_with_original_features) + + if surveyed.empty: + match_type = "3 - compared to similar properties, relaxed" + # In this case, we do one additional check where we filter on everything the same apart from heating, + # where we do a slightly more rough match + surveyed = survey_results_with_original_features[ + ( + survey_results_with_original_features["Property Type"].str.split(":").str[0] == + property["Property Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Wall Type"].str.split(":").str[0] == + property["Wall Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Roof Type"].str.split(":").str[0] == + property["Roof Type"].split(":")[0] + ) + ].copy() + + if surveyed.empty: + if property["Property Type"].split(":")[0] in ["House", "Bungalow", "Maisonette"]: + filter_property_types = ["House", "Bungalow", ] + else: + filter_property_types = ["Flat"] + surveyed = survey_results_with_original_features[ + ( + survey_results_with_original_features["Property Type"].str.split(":").str[0].isin( + filter_property_types + ) + ) & + ( + survey_results_with_original_features["Wall Type"].str.split(":").str[0] == + property["Wall Type"].split(":")[0] + ) & + ( + survey_results_with_original_features["Roof Type"].str.split(":").str[0] == + property["Roof Type"].split(":")[0] + ) + ].copy() + + if "Electric" in property["Heating"]: + # Take other electric heating systems + surveyed = surveyed[surveyed["Heating"].str.contains("Electric")] + elif property["Heating"] in [ + "Community Heating Systems: Community boilers only (RdSAP)", + "Community Heating Systems: Community CHP and boilers (RdSAP)" + ]: + # Take other community heating systems + surveyed = surveyed[surveyed["Heating"].str.contains("Community")] + elif property["Heating"] == 'Heat Pump: (from database)': + # Take other heat pumps + surveyed = surveyed[surveyed["Heating"].str.contains("Heat Pump")] + elif property["Heating"] == "Solid fuel room heaters: Open fire in grate": + # Take other properties with room heaters + surveyed = surveyed[surveyed["Heating"].str.contains("room heaters")] + elif "Boiler" in property["Heating"]: + # Take other properties with boilers + surveyed = surveyed[surveyed["Heating"].str.contains("Boiler")] + else: + raise Exception("Fix me") + + if surveyed.empty: + final_missed_matches.append( + { + "Address ID": a_id, + "Confidence Tier": "4 - no similar property, needs survey to confirm", + "Current EPC Band": "Needs Survey", + "Current SAP Rating": "Needs Survey", + 'Survey: Main Wall Type': "Not Surveyed", + "Survey: Main Alternative Wall": "Not Surveyed", + "Survey: Main Roof Type": "Not Surveyed", + "Survey: Primary Heating System": "Not Surveyed", + "Survey: Matching Address ID": "Not Surveyed", + 'Distance to Closest Match (m)': 9999999, + "Package Ref": "Not Surveyed", + } + ) + continue + + # Calculate distance + surveyed["distance_meters"] = haversine( + lat1=property["latitude"], lon1=property["longitude"], + lat2=surveyed["latitude"].values, lon2=surveyed["longitude"].values + ) + surveyed = surveyed.sort_values("distance_meters", ascending=True) + + # Check if we have a postcode match check if surveyed postcode is the same as the property postcode + if any(surveyed["Postcode"] == property["Postcode"]): + surveyed = surveyed[surveyed["Postcode"] == property["Postcode"]] + + if any(surveyed["Postal Region"] == property["Postal Region"]): + surveyed = surveyed[surveyed["Postal Region"] == property["Postal Region"]] + + # Take the 3 nearest + surveyed = surveyed.head(3) + + # perform a weighted mean of SAP rating - the closer the better + expected_sap = np.average( + surveyed["Current SAP Rating"], weights=1 / (surveyed["distance_meters"] + 1) + ) + expected_epc = sap_to_epc(expected_sap) + + if expected_epc in ["C", "B", "A"]: + match_type = "5 - EPC C or above" + + closest_match = surveyed.iloc[0] + + # The closest property may be an EPC C, we we take the package ref from the property that's the nearest + # with non-NA package ref + if expected_epc in ["C", "B", "A"]: + package_ref = None + else: + package_ref = surveyed["Package Ref"].dropna().values[0] + + final_missed_matches.append( + { + "Address ID": a_id, + "Confidence Tier": match_type, + "Current EPC Band": expected_epc, + "Current SAP Rating": expected_sap, + 'Survey: Main Wall Type': closest_match["Survey: Main Wall Type"], + "Survey: Main Alternative Wall": closest_match["Survey: Main Alternative Wall"], + "Survey: Main Roof Type": closest_match["Survey: Main Roof Type"], + "Survey: Primary Heating System": closest_match["Survey: Primary Heating System"], + "Survey: Matching Address ID": closest_match["Address ID"], + 'Distance to Closest Match (m)': closest_match["distance_meters"], + "Package Ref": package_ref + } + ) + continue + + final_missed_matches = pd.DataFrame(final_missed_matches) + + region_assets = region_assets.merge( + final_missed_matches, + on="Address ID", + how="left", + suffixes=("", "_method3") + ) + + region_assets["Confidence Tier"] = region_assets["Confidence Tier"].fillna( + region_assets["Confidence Tier_method3"] + ) + + region_assets = fill_survey_columns(region_assets, suffix="_method3") + + method_3_columns = [c for c in region_assets.columns if c.endswith("_method3")] + region_assets = region_assets.drop(columns=method_3_columns) + + if pd.isnull(region_assets["Current EPC Band"]).sum(): + raise Exception("Something went wrong") + + results.append(region_assets) + + results = pd.concat(results) + + if (pd.isnull(results["Package Ref"]) & (~results["Current EPC Band"].isin(["A", "B", "C"]))).sum(): + raise ValueError("Missing Package Refs") + + # Check if there are missings in current epc band, current sap rating or any of the survey attributes + for c in ( + [ + "Current EPC Band", "Current SAP Rating", "Survey: Matching Address ID", 'Distance to Closest Match (m)'] + + survey_attribute_columns + ): + if pd.isnull(results[c]).sum(): + raise Exception("Something went wrong") + + gain_columns = sorted([x for x in results["Confidence Tier"].unique() if "1 - " in x or "2 - " in x or "3 - " in x]) + loss_columns = sorted([x for x in results["Confidence Tier"].unique() if "4 - " in x or "5 - " in x]) + + def optimise(gain, loss, max_loss=250): + + # Define the coefficients for the objective function (negative because we maximize Gain) + c = -gain + + # Define constraints + A = [loss] # Only 1 constraint for now, total Loss + b = [max_loss] # Maximum total Loss allowed + + # Bounds for each variable (select or not select each row, 0 <= x <= 1) + bounds = [(0, 1) for _ in gain] + + # Solve the problem using linprog with HiGHS solver + result = linprog(c, A_ub=A, b_ub=b, bounds=bounds, method='highs') + if not result.success: + raise Exception("Optimization failed") + + selected_rows = result.x.round().astype(int) # Rounded to 0 or 1 + optimal_gain = -result.fun + + return selected_rows, optimal_gain + + street_summary = results.pivot_table( + index='Street and Region', + columns='Confidence Tier', + aggfunc='size', + fill_value=0 + ).reset_index() + + street_summary["Gain"] = street_summary[gain_columns].sum(axis=1) + street_summary["Loss"] = street_summary[loss_columns].sum(axis=1) + + selected_rows, _ = optimise( + gain=street_summary["Gain"].values, + loss=street_summary["Loss"].values, + max_loss=250 + ) + + street_summary["Selected"] = selected_rows == 1 + print(street_summary[street_summary["Selected"]][["Gain", "Loss"]].sum()) + + selected_streets = street_summary[ + street_summary["Selected"] + ] + + totals = selected_streets[["Gain", "Loss"]].sum() + + bid_size = totals.sum() + print("Bid Size:", bid_size) + total_epc_d_or_below = totals["Gain"] + print("Total EPC D or below:", total_epc_d_or_below) + total_epc_c = totals["Loss"] + print("Total EPC C or above:", total_epc_c) + # Total needing a survey + total_needing_survey = selected_streets[ + "4 - no similar property, needs survey to confirm" + ].sum() + print("Total needing survey:", total_needing_survey) + + # Label final outputs + # We create a summary of packages by street + results["Package Ref"] = results["Package Ref"].fillna("EPC C - No Package") + results["Package Ref"] = results["Package Ref"].astype(str) + results["Package Ref"] = np.where( + results["Package Ref"] == "4.0", "4", results["Package Ref"] + ) + package_summary = results.pivot_table( + index='Street and Region', + columns='Package Ref', + aggfunc='size', + fill_value=0 + ).reset_index() + + assert sum([v for k, v in package_summary.sum().items() if k != "Street and Region"]) == results.shape[0] + + street_bid_structure = street_summary.merge( + package_summary, how="left", on="Street and Region" + ) + street_bid_structure = street_bid_structure.sort_values("Gain", ascending=False) + + individual_units_programme = results.copy() + individual_units_programme["Unit in Programme"] = individual_units_programme["Street and Region"].isin( + street_bid_structure[street_bid_structure["Selected"]]["Street and Region"].values + ) + + # Merge on Stonewaters ID + asset_list_ids = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 " + "- Archetyped V3.1.xlsx", + header=4 + )[["Address ID", "Org. ref."]] + # Clean address ids + asset_list_ids = asset_list_ids[~pd.isnull(asset_list_ids["Address ID"])] + asset_list_ids = asset_list_ids[asset_list_ids["Address ID"] != "Address ID"] + asset_list_ids["Address ID"] = asset_list_ids["Address ID"].astype(int) + + individual_units_programme = individual_units_programme.merge( + asset_list_ids.rename( + columns={"Org. ref.": "Survey: Org. ref.", "Address ID": "Survey: Matching Address ID"} + ), + how="left", + on="Survey: Matching Address ID" + ) + + individual_units_programme["Survey: Org. ref."] = np.where( + (individual_units_programme["Survey: Matching Address ID"] == "Not Surveyed"), + "Not Surveyed", + individual_units_programme["Survey: Org. ref."] + ) + + if pd.isnull(individual_units_programme["Survey: Org. ref."]).sum() or pd.isnull( + individual_units_programme["Org. ref."]).sum(): + raise ValueError("something went wrong") + + for col in ["Survey: Main Roof Type", "Survey: Main Wall Type", "Survey: Main Alternative Wall"]: + individual_units_programme[col] = ( + individual_units_programme[col] + .str.replace(r': nan(?=$|:)', '', regex=True) # Remove ': nan' at the end or before another ':' + .str.replace(r':\s+:', ': ', regex=True) # Replace occurrences of ': :' with ': ' + .str.replace(r'\s+', ' ', regex=True) # Replace multiple spaces with a single space + .str.strip() # Strip leading/trailing spaces + ) + + # Any EPC C properties that have been included should be flagged as potential low carbon heating + selected_epc_c = individual_units_programme[ + (individual_units_programme["Current EPC Band"].isin(["C", "B", "A", "Needs Survey"])) & + (individual_units_programme["Unit in Programme"]) + ] + + flat_wall_map = { + "CA Cavity: F Filled Cavity": False, + "CA Cavity: A As Built": True, + "SO Solid Brick: A As Built": True, + "Not Surveyed": False + } + + heating_map = { + "BGW Post 98 Combi condens. with auto ign.": False, + "BGB Post 98 Regular condens. with auto ign.": False, + "SEK High heat retention storage heaters": False, + "SEB Modern slimline storage heaters": True, + "Not Surveyed": False + } + + infill_data = [] + for _, epc_c_property in selected_epc_c.iterrows(): + if epc_c_property["Property Type"].split(":")[0] == "Flat": + # Look for a wall insulation measure + infill = flat_wall_map[epc_c_property["Survey: Main Wall Type"]] + infill_data.append( + { + "Address ID": epc_c_property["Address ID"], + "Street and Region": epc_c_property["Street and Region"], + "Possible Flat Infill?": infill + } + ) + continue + + infill = heating_map[epc_c_property["Survey: Primary Heating System"]] + infill_data.append( + { + "Address ID": epc_c_property["Address ID"], + "Street and Region": epc_c_property["Street and Region"], + "Low Carbon Heating Infill?": infill + } + ) + infill_data = pd.DataFrame(infill_data) + + individual_units_programme = individual_units_programme.merge( + infill_data[["Address ID", 'Possible Flat Infill?', 'Low Carbon Heating Infill?']], + how="left", on="Address ID" + ) + + for c in ['Possible Flat Infill?', 'Low Carbon Heating Infill?']: + individual_units_programme[c] = individual_units_programme[c].fillna(False) + + infill_by_street = infill_data.pivot_table( + index='Street and Region', + values=['Possible Flat Infill?', 'Low Carbon Heating Infill?'], + aggfunc='sum', + fill_value=0 + ).reset_index() + + street_bid_structure = street_bid_structure.merge( + infill_by_street, how="left", on="Street and Region" + ) + + for c in ['Low Carbon Heating Infill?', 'Possible Flat Infill?']: + street_bid_structure[c] = street_bid_structure[c].fillna(0) + + master_sheet = pd.read_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Osmosis Reviewed - Parity Download 18.7 - " + "master " + "sheet.csv", + encoding='latin1' + ) + master_sheet = master_sheet[["Address ID", "Main Fuel"]] + + individual_units_programme = individual_units_programme.merge( + master_sheet, how="left", on="Address ID" + ) + + street_bid_structure.to_csv( + os.path.join(CUSTOMER_FOLDER_PATH, "Street Bid Structure V2.csv"), index=False + ) + + individual_units_programme.to_csv( + os.path.join(CUSTOMER_FOLDER_PATH, "Individual units - programme V2.csv"), index=False + ) + # if __name__ == "__main__": # main() diff --git a/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt b/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt index 3ad5d2c1..09ba20bd 100644 --- a/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt +++ b/etl/customers/stonewater/requirements/requirements-wave-3-prep.txt @@ -7,4 +7,5 @@ epc-api-python==1.0.2 usaddress==0.5.11 fuzzywuzzy==0.18.0 python-dotenv +scipy diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index cd76dae4..b6394275 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -26,6 +26,20 @@ class RetrieveFindMyEpc: self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower() + @staticmethod + def extract_low_carbon_sources(soup): + # Find the section header + section_header = soup.find("h3", string="Low and zero carbon energy sources") + if not section_header: + return {} + + # Locate the list following the header + energy_list = section_header.find_next("ul") + + # Extract the list items + sources = {item.get_text(strip=True): True for item in energy_list.find_all("li")} + return sources + def retrieve_newest_find_my_epc_data(self, sap_2012_date=None): """ For a post code and address, we pull out all the required data from the find my epc website @@ -112,6 +126,7 @@ class RetrieveFindMyEpc: # Find all h3 headers for each step and extract their related information step_headers = recommendations_div.find_all('h3', class_='govuk-heading-m') previous_sap_score = current_sap + previous_epc = current_rating.split(' ')[-6] for step_num, step_header in enumerate(step_headers, start=1): # Extract the step title (the measure) measure_title = step_header.text.strip().replace(f"Step {step_num}: ", "") @@ -124,7 +139,11 @@ class RetrieveFindMyEpc: # Check if the potential rating div is found if potential_rating_div: # Extract the rating text within the SVG text element - rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold').text.strip() + extracted_rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold') + if extracted_rating_text is not None: + rating_text = extracted_rating_text.text.strip() + else: + rating_text = " ".join([str(previous_sap_score), previous_epc]) # Parse the rating text to separate the numeric rating and EPC letter new_rating = int(rating_text.split()[0]) new_epc = rating_text.split()[1] @@ -138,6 +157,7 @@ class RetrieveFindMyEpc: "sap_points": new_rating - previous_sap_score }) previous_sap_score = new_rating + previous_epc = new_epc # Search for the assessment informaton assessment_information = address_res.find('div', {'id': 'information'}) @@ -191,6 +211,9 @@ class RetrieveFindMyEpc: # Finally, we format the recommendations recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date) + # 4) Low and zero carbon energy sources + low_carbon_energy_sources = self.extract_low_carbon_sources(address_res) + resulting_data = { 'epc_certificate': epc_certificate, 'current_epc_rating': current_rating.split(' ')[-6], @@ -200,7 +223,8 @@ class RetrieveFindMyEpc: "heating_text": heating_text, "hot_water_text": hot_water_text, "recommendations": recommendations, - **assessment_data + **assessment_data, + **low_carbon_energy_sources } return resulting_data @@ -246,6 +270,31 @@ class RetrieveFindMyEpc: ], "Band A condensing boiler": ["boiler_upgrade"], "Double glazing": ["double_glazing"], + "Flue gas heat recovery device in conjunction with boiler": ["flue_gas_heat_recovery"], + "Wind turbine": ["wind_turbine"], + "Loft insulation": ["loft_insulation"], + "Solar photovoltaic (PV) panels": ["solar_pv"], + "Party wall insulation": ["party_wall_insulation"], + 'Draught proofing': ["draught_proofing"], + "Roof insulation recommendation": [], + "Cavity wall insulation recommendation": [], + "Windows draught proofing": [], + "Low energy lighting for all fixed outlets": ["low_energy_lighting"], + "Cylinder thermostat recommendation": [], + "Heating controls recommendation": [], + "Replace boiler with Band A condensing boiler": [], + "Solar panel recommendation": [], + "Double glazing recommendation": [], + "Solid wall insulation recommendation": [], + "Fuel change recommendation": [], + "PV Cells recommendation": [], + "Replacement glazing units": ["double_glazing"], + "Heating controls (time and temperature zone control)": ["time_temperature_zone_control"], + "High heat retention storage heaters": ["high_heat_retention_storage_heaters"], + "Gas condensing boiler": ["boiler_upgrade"], + "Change room heaters to condensing boiler": ["boiler_upgrade"], + "Cylinder thermostat": ["cylinder_thermostat"], + "Heat recovery system for mixer showers": ["heat_recovery_shower"], } survey = True diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py new file mode 100644 index 00000000..6f9dd135 --- /dev/null +++ b/etl/route_march_data_pull/app.py @@ -0,0 +1,333 @@ +import os +import time +from idlelib.iomenu import errors + +import pandas as pd +import numpy as np +from tqdm import tqdm + +from dotenv import load_dotenv +from backend.SearchEpc import SearchEpc +from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc +from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes + +from recommendations.recommendation_utils import ( + estimate_perimeter, + estimate_external_wall_area, + estimate_number_of_floors +) + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + + +def get_data(asset_list, fulladdress_column, address1_column, postcode_column): + epc_data = [] + errors = [] + no_epc = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + try: + postcode = home[postcode_column] + house_number = home[address1_column] + full_address = home[fulladdress_column] + + searcher = SearchEpc( + address1=str(house_number), + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5 + ) + # Force the skipping of estimating the EPC + searcher.ordnance_survey_client.property_type = None + searcher.ordnance_survey_client.built_form = None + + searcher.find_property(skip_os=True) + if searcher.newest_epc is None: + no_epc.append(home["row_id"]) + continue + + # Look for EPC recommendatons + try: + property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"]) + except: + property_recommendations = {"rows": []} + + # Retrieve data from FindMyEPC + try: + find_epc_searcher = RetrieveFindMyEpc( + address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"] + ) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() + except ValueError as e: + if "No EPC found" in str(e): + find_epc_searcher = RetrieveFindMyEpc( + address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"] + ) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() + else: + find_epc_data = {} + except Exception as e: + raise Exception(f"Error retrieving FindMyEPC data: {e}") + time.sleep(np.random.uniform(0.1, 1)) + + epc = { + "row_id": home["row_id"], + **searcher.newest_epc.copy(), + "recommendations": property_recommendations["rows"], + "find_my_epc_data": find_epc_data, + } + + epc_data.append(epc) + except Exception as e: + errors.append(home["row_id"]) + time.sleep(5) + + return epc_data, errors, no_epc + + +def extract_address1(asset_list, full_address_col, method="first_two_words"): + if method == "first_two_words": + asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") + return asset_list + + raise ValueError(f"Method {method} not recognized") + + +def app(): + """ + This app is EPC pulling data for some properties owned by Livewest + + Data request contents: + Date of last EPC + Reason for EPC + SAP score on register + Property Type + Property Area + Property Age + Any Dimensions (HLP,PW,RH) + Property Wall Construction + Heating Type + Secondary Heating + Loft Insulation Depth + + Additional if possible: + Heat loss calculations + EPC recommendations + Property UPRN + + """ + DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Bromford/" + DATA_FILENAME = "Bromford programme review.xlsx" + SHEET_NAME = "Bromford" + POSTCODE_COLUMN = "Postcode" + FULLADDRESS_COLUMN = None + ADDRESS1_COLUMN = "No." + ADDRESS1_METHOD = "first_two_words" + ADDRESS_COLS_TO_CONCAT = ["No.", "Address"] + + asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME) + asset_list = asset_list[~pd.isnull(asset_list["Postcode"])] + asset_list["row_id"] = asset_list.index + + # We clean up portential non-breaking spaces, and double spaces + for col in [c for c in [POSTCODE_COLUMN, FULLADDRESS_COLUMN, ADDRESS1_COLUMN] if c is not None]: + asset_list[col] = asset_list[col].astype(str) + asset_list[col] = asset_list[col].str.replace('\xa0', ' ', regex=False) + asset_list[col] = asset_list[col].str.replace(' ', ' ', regex=False) + + if ADDRESS1_COLUMN is None: + ADDRESS1_COLUMN = "address1_extracted" + asset_list = extract_address1( + asset_list=asset_list, full_address_col=FULLADDRESS_COLUMN, method=ADDRESS1_METHOD + ) + + if FULLADDRESS_COLUMN is None: + FULLADDRESS_COLUMN = "fulladdress_extracted" + # We concatenate the columns in ADDRESS_COLS_TO_CONCAT, on commas + asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply(lambda x: ", ".join(x), axis=1) + + # We check for duplicated addresses + asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN] + if asset_list["deduper"].duplicated().sum(): + # Drop the dupes + print(f"There are {asset_list['deduper'].duplicated().sum()} duplicated addresses - dropping") + asset_list = asset_list[~asset_list["deduper"].duplicated()] + + epc_data, errors, no_epc = get_data( + asset_list=asset_list, + fulladdress_column=FULLADDRESS_COLUMN, + address1_column=ADDRESS1_COLUMN, + postcode_column=POSTCODE_COLUMN + ) + + # We now retrieve any failed properties + asset_list_failed = asset_list[asset_list["row_id"].isin(errors)] + epc_data_failed, _, _ = get_data( + asset_list=asset_list_failed, + fulladdress_column=FULLADDRESS_COLUMN, + address1_column=ADDRESS1_COLUMN, + postcode_column=POSTCODE_COLUMN + ) + + # Append the failed data to the main data + epc_data.extend(epc_data_failed) + + epc_df = pd.DataFrame(epc_data) + + # We expand out the recommendations + recommendations_df = epc_df[["row_id", "recommendations"]] + + unique_recommendations = set() + for _, row in recommendations_df.iterrows(): + unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) + + columns = ["row_id"] + list(unique_recommendations) + transformed_data = [] + for _, row in recommendations_df.iterrows(): + # Initialize a dictionary for this row with False for all recommendations + row_data = {col: False for col in columns} + row_data["row_id"] = row["row_id"] + + # Set True for each recommendation present in this row + for rec in row["recommendations"]: + recommendation_text = rec["improvement-summary-text"] + row_data[recommendation_text] = True + + # Append the row data to transformed_data + transformed_data.append(row_data) + + transformed_df = pd.DataFrame(transformed_data) + # Drop the column that is "" + transformed_df = transformed_df.drop(columns=[""]) + + # Get the find my epc data + find_my_epc_data = epc_df[["row_id", "find_my_epc_data"]].drop(columns=["find_my_epc_data"]).join( + pd.json_normalize(epc_df["find_my_epc_data"]) + ) + # We check if we get the solar pv column: + if "Solar photovoltaics" not in find_my_epc_data.columns: + find_my_epc_data["Solar photovoltaics"] = False + + # Retrieve just the data we need + epc_df = epc_df[ + [ + "row_id", + "uprn", + "property-type", + "built-form", + "inspection-date", + "current-energy-rating", + "current-energy-efficiency", + "roof-description", + "walls-description", + "transaction-type", + # New fields needed + "secondheat-description", + "total-floor-area", + "construction-age-band", + "floor-height", + "number-habitable-rooms", + "mainheat-description", + # + "energy-consumption-current", # kwh/m2 + "photo-supply", + ] + ] + + asset_list = asset_list.merge( + epc_df, + how="left", + on="row_id" + ).merge( + find_my_epc_data[ + [ + "row_id", "heating_text", "hot_water_text", 'Assessor’s name', + "Assessor's Telephone", "Assessor's Email", "Accreditation scheme", + "Assessor’s ID", "Solar photovoltaics" + ] + ].rename( + columns={ + "Solar photovoltaics": "Has Solar PV", + "heating_text": "Heating Estimated kWh", + "hot_water_text": "Hot Water Estimated kWh", + } + ), + how="left", + on="row_id" + ) + + asset_list["Has Solar PV"] = asset_list["Has Solar PV"] | ~asset_list["photo-supply"].isin(["0.0", 0, None, ""]) + asset_list = asset_list.drop(columns=["photo-supply"]) + + # Rename the columns + asset_list = asset_list.rename(columns={ + "inspection-date": "Date of last EPC", + "current-energy-efficiency": "SAP score on register", + "current-energy-rating": "EPC rating on register", + "property-type": "Property Type", + "built-form": "Archetype", + "total-floor-area": "Property Floor Area", + "construction-age-band": "Property Age Band", + "floor-height": "Property Floor Height", + "number-habitable-rooms": "Number of Habitable Rooms", + "walls-description": "Wall Construction", + "roof-description": "Roof Construction", + "mainheat-description": "Heating Type", + "secondheat-description": "Secondary Heating", + "transaction-type": "Reason for last EPC", + "energy-consumption-current": "Heat Demand (kWh/m2)", + }) + + asset_list["Estimated Number of Floors"] = asset_list.apply( + lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull( + x["Property Type"]) else None, axis=1 + ) + + asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float) + # Replace "" value with None + asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None) + asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float) + + asset_list["Estimated Perimeter (m)"] = asset_list.apply( + lambda x: estimate_perimeter( + floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"], + num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"], + ), axis=1 + ) + + asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply( + lambda x: estimate_external_wall_area( + num_floors=x["Estimated Number of Floors"], + floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5, + perimeter=x["Estimated Perimeter (m)"], + built_form=x["Archetype"] + ), + axis=1 + ) + + asset_list["Roof Insulation Thickness"] = asset_list.apply( + lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull( + x["Roof Construction"]) else None, + axis=1 + ) + + # For all of the columns in transformed_df, prefix with "Recommendation: " + for col in transformed_df.columns: + if col == "row_id": + continue + transformed_df = transformed_df.rename(columns={col: f"Recommendation: {col}"}) + + asset_list = asset_list.merge( + transformed_df, + how="left", + on="row_id" + ) + asset_list = asset_list.drop(columns=["row_id"]) + + # Store as an excel + filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx" + asset_list.to_excel(filename, index=False) diff --git a/etl/route_march_data_pull/requirements.txt b/etl/route_march_data_pull/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/recommendations/FloorRecommendations.py b/recommendations/FloorRecommendations.py index 25741e7a..ed00bbe9 100644 --- a/recommendations/FloorRecommendations.py +++ b/recommendations/FloorRecommendations.py @@ -172,6 +172,11 @@ class FloorRecommendations(Definitions): insulation_materials = pd.DataFrame(insulation_materials) + non_invasive_recs = next( + (r for r in self.property.non_invasive_recommendations if + r["type"] == insulation_materials["type"].values[0]), {} + ) + lowest_selected_u_value = None for _, insulation_material_group in insulation_materials.groupby("description"): @@ -217,6 +222,9 @@ class FloorRecommendations(Definitions): else: raise NotImplementedError("Implement me!") + sap_points = non_invasive_recs.get("sap_points", None) + survey = non_invasive_recs.get("survey", False) + floor_ending_config = FloorAttributes(new_description).process() floor_simulation_config = check_simulation_difference( new_config=floor_ending_config, old_config=self.property.floor, prefix="floor_" @@ -245,7 +253,8 @@ class FloorRecommendations(Definitions): "description": self._make_floor_description(material), "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": None, + "sap_points": sap_points, + "survey": survey, "already_installed": already_installed, "simulation_config": simulation_config, "description_simulation": { diff --git a/recommendations/HotwaterRecommendations.py b/recommendations/HotwaterRecommendations.py index aed1a5e5..b86329e4 100644 --- a/recommendations/HotwaterRecommendations.py +++ b/recommendations/HotwaterRecommendations.py @@ -66,7 +66,7 @@ class HotwaterRecommendations: (self.property.hotwater["heater_type"] in ["electric immersion"]) & (self.property.data["hot-water-energy-eff"] == "Very Poor") & (self.property.hotwater["no_system_present"] is None) & - len(has_tank_recommendation) == 0 + (len(has_tank_recommendation) == 0) ): self.recommend_tank_insulation(phase=phase) return