From 667ed1b990172887d88ec60c6eff45b02e1f255d Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 12 Jun 2024 15:44:04 +0100 Subject: [PATCH] working on stonewater clustering pipeline --- .../EPC data pull - 12th June.py | 156 +++++++++++ etl/customers/stonewater/shdf_3_clustering.py | 264 +++++++++++++++++- .../epc_attributes/RoofAttributes.py | 2 +- 3 files changed, 419 insertions(+), 3 deletions(-) create mode 100644 etl/customers/places_for_people/EPC data pull - 12th June.py diff --git a/etl/customers/places_for_people/EPC data pull - 12th June.py b/etl/customers/places_for_people/EPC data pull - 12th June.py new file mode 100644 index 00000000..45a70ad4 --- /dev/null +++ b/etl/customers/places_for_people/EPC data pull - 12th June.py @@ -0,0 +1,156 @@ +import os + +import pandas as pd +from tqdm import tqdm +import numpy as np + +from dotenv import load_dotenv +from backend.SearchEpc import SearchEpc +from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes + +from recommendations.recommendation_utils import ( + estimate_perimeter, + estimate_external_wall_area, + estimate_number_of_floors +) + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + + +def app(): + """ + This app is EPC pulling data for some properties owned by LHP + :return: + """ + + asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Downloads/Places for People NORTH WEST - EPC DATA PULL REQUEST.xlsx", header=0 + ) + + epc_data = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + + full_address = home["Address"] + + address1 = home["AddressLine1"] + postcode = home["Postcode"] + + searcher = SearchEpc( + address1=address1, + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address + ) + # Force the skipping of estimating the EPC + searcher.ordnance_survey_client.property_type = None + searcher.ordnance_survey_client.built_form = None + + searcher.find_property(skip_os=True) + if searcher.newest_epc is None: + continue + + epc = { + "asset_list_address": full_address, + **searcher.newest_epc.copy() + } + + epc_data.append(epc) + + epc_df = pd.DataFrame(epc_data) + + # Retrieve just the data we need + epc_df = epc_df[ + [ + "asset_list_address", + "uprn", + "property-type", + "built-form", + "inspection-date", + "current-energy-rating", + "current-energy-efficiency", + "roof-description", + "walls-description", + "transaction-type", + # New fields needed + "secondheat-description", + "total-floor-area", + "construction-age-band", + "floor-height", + "number-habitable-rooms", + "mainheat-description" + ] + ] + + # epc_df.to_csv("pfp sales data.csv", index=False) + + asset_list = asset_list.merge( + epc_df, + how="left", + left_on=["Address"], + right_on=["asset_list_address"] + ) + + asset_list = asset_list.drop(columns=["asset_list_address"]) + + # Rename the columns + asset_list = asset_list.rename(columns={ + "inspection-date": "Date of last EPC", + "current-energy-efficiency": "SAP score on register", + "current-energy-rating": "EPC rating on register", + "property-type": "EPC Property Type", + "built-form": "EPC Archetype", + "total-floor-area": "EPC Property Floor Area", + "construction-age-band": "EPC Property Age Band", + "floor-height": "EPC Property Floor Height", + "number-habitable-rooms": "EPC Number of Habitable Rooms", + "walls-description": "EPC Wall Construction", + "roof-description": "EPC Roof Construction", + "mainheat-description": "EPC Heating Type", + "secondheat-description": "EPC Secondary Heating", + "transaction-type": "Reason for last EPC" + }) + + asset_list["Estimated Number of Floors"] = asset_list.apply( + lambda x: estimate_number_of_floors( + property_type=x["EPC Property Type"] + ) if not pd.isnull(x["EPC Property Type"]) else None, axis=1 + ) + + asset_list["EPC Property Floor Area"] = asset_list["EPC Property Floor Area"].astype(float) + asset_list["EPC Number of Habitable Rooms"] = np.where( + asset_list["EPC Number of Habitable Rooms"] == "", + None, + asset_list["EPC Number of Habitable Rooms"] + ) + asset_list["EPC Number of Habitable Rooms"] = asset_list["EPC Number of Habitable Rooms"].astype(float) + + asset_list["Estimated Perimeter (m)"] = asset_list.apply( + lambda x: estimate_perimeter( + floor_area=x["EPC Property Floor Area"] / x["Estimated Number of Floors"], + num_rooms=x["EPC Number of Habitable Rooms"] / x["Estimated Number of Floors"], + ), axis=1 + ) + + asset_list["Estimated Heat Loss Perimeter (m)"] = asset_list.apply( + lambda x: estimate_external_wall_area( + num_floors=x["Estimated Number of Floors"], + floor_height=float(x["EPC Property Floor Height"]) if x["EPC Property Floor Height"] else 2.5, + perimeter=x["Estimated Perimeter (m)"], + built_form=x["EPC Archetype"] + ), + axis=1 + ) + + asset_list["Roof Insulation Thickness"] = asset_list.apply( + lambda x: RoofAttributes(description=x["EPC Roof Construction"]).process()[ + "insulation_thickness"] if not pd.isnull(x["EPC Roof Construction"]) else None, + axis=1 + ) + + # Store as an excel + filename = "Places for People NORTH WEST - EPC DATA PULL.xlsx" + asset_list.to_excel(filename, index=False) diff --git a/etl/customers/stonewater/shdf_3_clustering.py b/etl/customers/stonewater/shdf_3_clustering.py index 75917a55..44043206 100644 --- a/etl/customers/stonewater/shdf_3_clustering.py +++ b/etl/customers/stonewater/shdf_3_clustering.py @@ -10,11 +10,47 @@ from fuzzywuzzy import fuzz import numpy as np import pandas as pd import time -from utils.s3 import save_data_to_s3, read_excel_from_s3, read_from_s3 +from utils.s3 import save_data_to_s3, read_excel_from_s3, read_from_s3, read_dataframe_from_s3_parquet load_dotenv(dotenv_path="backend/.env") EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") +# We create a MAP of uprns, for EPCs that didn't give use the UPRN +missing_uprn_map = [ + # This is a map from internal_id to UPRN, for properties where we do have an EPC, but we don't have + # a uprn + # 1 Church Street, Alfreton, DE55 7AH + {"internal_id": 78, "mapped_uprn": None}, # Doesn't seem to exist any more + # 1 Granville Road, Luton, LU1 1PA + {"internal_id": 315, "mapped_uprn": 100080148856}, + # 11 College Street, Birstall, Batley, WF17 9HF + # The EPC record is for 11 and 11a + {"internal_id": 1090, "mapped_uprn": 83190440}, + # 11a College Street, Birstall, Batley, WF17 9HF + {"internal_id": 1092, "mapped_uprn": 83143766}, + # Flat 5 Friars Street, Hereford, HR4 0AS + # TODO: Check this + {"internal_id": 1384, "mapped_uprn": 200002600892}, + # This UPRN is for 5 Friars Court, which is a flat + # Flat 7 Friars Street, Hereford, HR4 0AS + # TODO: Check this + {"internal_id": 1385, "mapped_uprn": 200002600894}, + # This UPRN is for 7 Friars Court, which is a flat + # 1 Waverley Street, Dudley, DY2 0YE + {"internal_id": 3349, "mapped_uprn": 90022438}, + # 5 Brighton Road, Burgh Heath, Tadworth, KT20 6BQ + # TODO: Check this + # This UPRN is for 5 Copthorne, Brighton Road, Burgh Heath, KT20 6BQ, which is a flat + {"internal_id": 5027, "mapped_uprn": 100062145273}, + # Room 1, 21 Coxford Road, Southampton, SO16 5FG + # This is for 21 Coxford Road + {"internal_id": 5554, "mapped_uprn": 100060692392}, + +] +missing_uprn_map = pd.DataFrame(missing_uprn_map) + +internal_id_epcs_to_drop = [315, 1384, 1385, 3349] + def remove_commas_and_full_stops(input_string: str) -> str: """ @@ -610,7 +646,58 @@ def compile_data(): header_row=4 ) - # TODO: Read in UPRNs + # TODO: Read in UPRNs or UDPRN + + epc_data = json.loads( + read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name="customers/Stonewater/clustering/epc_data.json" + ) + ) + epc_data = pd.DataFrame(epc_data) + + # We drop come EPCS + epc_data = epc_data[~epc_data["internal_id"].isin(internal_id_epcs_to_drop)] + + # This we can use to produce additional variables such as number of old surveys + older_epc_data = json.loads( + read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name="customers/Stonewater/clustering/old_epc_data.json" + ) + ) + older_epc_data = {k: v for k, v in older_epc_data.items() if k not in internal_id_epcs_to_drop} + + # This is the first ordnance survey data pull + os_most_relevant_1 = [] + os_all_1 = {} + for i in tqdm(["1", "2", "3"]): + most_relevant_segment = read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name=f"customers/Stonewater/clustering/os_most_relevant_{i}.json" + ) + os_most_relevant_1.extend(json.loads(most_relevant_segment)) + os_all_segment = read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name=f"customers/Stonewater/clustering/os_all_{i}.json" + ) + os_all_1 = {**os_all_1, **json.loads(os_all_segment)} + + os_most_relevant_1 = pd.DataFrame(os_most_relevant_1) + + # This is the second ordnance survey data pull + os_most_relevant_2 = read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name="customers/Stonewater/clustering/problematic_os.json" + ) + os_most_relevant_2 = json.loads(os_most_relevant_2) + os_most_relevant_2 = pd.DataFrame(os_most_relevant_2) + + os_all_2 = read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name="customers/Stonewater/clustering/problematic_os_all.json" + ) + os_all_2 = json.loads(os_all_2) ######################################################################## # Prepare asset list @@ -664,3 +751,176 @@ def compile_data(): if pd.isnull(asset_list["full_address"]).sum(): raise ValueError("Missing full addresses") + + # Quick check to see if we have os data for every property that doesn't have an EPC + without_epc = asset_list[~asset_list["internal_id"].isin(epc_data["internal_id"].values)] + os_most_relevant_1_internal_ids = os_most_relevant_1["internal_id"].tolist() + os_most_relevant_2_internal_ids = os_most_relevant_2["internal_id"].tolist() + + missing_os_data = [] + for _, x in without_epc.iterrows(): + # We would prioritise the data pulled the second time around + + internal_id = x["internal_id"] + if internal_id in os_most_relevant_2_internal_ids: + continue + + if internal_id in os_most_relevant_1_internal_ids: + continue + + missing_os_data.append(internal_id) + + if len(missing_os_data): + raise Exception("We don't have SOME data for each internal_id") + + # For the EPC data, some of them are missing UPRN + epc_data_to_address = asset_list[ + asset_list["internal_id"].isin(epc_data["internal_id"].values) + ][ + ["full_address", "internal_id"]].merge( + epc_data, how="left", on="internal_id" + ) + missed_uprn = epc_data_to_address[epc_data_to_address["uprn"] == ""] + + # Once we have UPRNs, we might want to pull in the EPC data again + # epc_data_with_uprn = [] + # older_epc_data_with_uprn = {} + # + # for row_number, asset in tqdm(asset_list.iterrows(), total=len(asset_list)): + # searcher = SearchEpc( + # address1=str(asset["address1"]), + # postcode=str(asset["postcode"]), + # auth_token=EPC_AUTH_TOKEN, + # os_api_key="", + # full_address=str(asset["full_address"]), + # uprn=asset["uprn"] + # ) + # searcher.find_property(skip_os=True) + # + # if searcher.newest_epc is None: + # continue + # + # epc_data_with_uprn.append( + # { + # "internal_id": asset["internal_id"], + # **searcher.newest_epc + # } + # ) + # + # if searcher.older_epcs is not None: + # older_epc_data_with_uprn[asset["internal_id"]] = searcher.older_epcs + + # We now get the remaining properties + # TODO: We might want to use epc_data_with_uprn + remaining_properties = asset_list[~asset_list["internal_id"].isin(epc_data["internal_id"].values)] + + # We estimate the data + final_epcs = [] + for _, p in remaining_properties.iterrows(): + internal_id = p["internal_id"] + uprn = p["UPRN"] + + if internal_id in os_most_relevant_1_internal_ids: + p_os_data = os_most_relevant_1[os_most_relevant_1["internal_id"] == internal_id].to_dict("records")[0] + p_os_full = os_all_1[str(internal_id)] + else: + p_os_data = os_most_relevant_2[os_most_relevant_2["internal_id"] == internal_id].to_dict("records")[0] + p_os_full = os_all_2[str(internal_id)] + p_os_full = pd.DataFrame( + [x["DPA"] if "DPA" in x else x["LPI"] for x in p_os_full] + ) + + # TODO: Add this back in + # When we have this + if p["uprn"] != p_os_data["UPRN"]: + # Get it from the older data + filtered = p_os_full[p_os_full["UPRN"] == p["uprn"]] + p_os_data = filtered.to_dict("records")[0] + + searcher = SearchEpc( + address1=str(p["address1"]), + postcode=str(p["postcode"]), + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + uprn=uprn + ) + searcher.ordnance_survey_client.parse_classification_code(p_os_data["CLASSIFICATION_CODE"]) + + searcher.find_property(skip_os=True) + + final_epcs.append( + { + "internal_id": internal_id, + **searcher.newest_epc + } + ) + + final_epcs = pd.DataFrame(final_epcs) + + complete_epcs = pd.concat( + [ + epc_data, + final_epcs + ] + ) + + # We now pull additional data + uprns = complete_epcs["uprn"].tolist() + # We get the spatial file list and loop through each EPC and determine which file it needs. + # We then just read in the files that we need and get the data, for each uprn from that file + + uprn_filenames = read_dataframe_from_s3_parquet( + bucket_name="retrofit-data-dev", file_key="spatial/filename_meta.parquet" + ) + + uprn_lookup = {} + for uprn in complete_epcs["uprn"]: + if not uprn: + # TODO: Do something about this! + continue + filtered_df = uprn_filenames[ + (uprn_filenames["lower"] <= int(uprn)) + & (uprn_filenames["upper"] >= int(uprn)) + ] + if filtered_df["filenames"].values[0] in uprn_lookup: + uprn_lookup[filtered_df["filenames"].values[0]].append(int(uprn)) + else: + uprn_lookup[filtered_df["filenames"].values[0]] = [int(uprn)] + + spatial_data_to_uprn = [] + for filename, associated_uprn in tqdm(uprn_lookup.items(), total=len(uprn_lookup)): + # Read in the file + spatial_data = read_dataframe_from_s3_parquet( + bucket_name="retrofit-data-dev", file_key=f"spatial/{filename}" + ) + + spatial_df = spatial_data[spatial_data["UPRN"].isin(associated_uprn)] + spatial_data_to_uprn.append(spatial_df) + + spatial_data_to_uprn = pd.concat(spatial_data_to_uprn) + + # TODO: Let's store this in s3 + save_data_to_s3( + data=json.dumps(spatial_data_to_uprn.to_dict("records")), + s3_file_name="scustomers/Stonewater/clustering/spatial_data_to_uprn.json", + bucket_name="retrofit-data-dev" + ) + + # We merge this spatial data onto final EPCS + spatial_data_to_uprn = spatial_data_to_uprn.drop( + columns=["partition", "filename"] + ).rename(columns={"UPRN": "uprn"}) + spatial_data_to_uprn["uprn"] = spatial_data_to_uprn["uprn"].astype(str) + + property_attributes = complete_epcs.merge( + spatial_data_to_uprn, + how="left", + on="uprn" + ) + + # We drop the columns we don't care about for clustering + property_attributes = property_attributes.drop( + columns=[ + + ] + ) diff --git a/etl/epc_clean/epc_attributes/RoofAttributes.py b/etl/epc_clean/epc_attributes/RoofAttributes.py index 76f99f09..84d1f3e9 100644 --- a/etl/epc_clean/epc_attributes/RoofAttributes.py +++ b/etl/epc_clean/epc_attributes/RoofAttributes.py @@ -45,7 +45,7 @@ class RoofAttributes(Definitions): """ self.description: str = description.lower().strip() - self.nodata = not description or description in self.DATA_ANOMALY_MATCHES + self.nodata = not description or description in self.DATA_ANOMALY_MATCHES or self.description == "sap05:roof" self.welsh_translation_search()