From d86ab5ff8df50e58248bff92582084462fc2166b Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 21 Feb 2025 15:18:53 +0000 Subject: [PATCH] restructuing app location --- asset_list/app.py | 497 ++++++++++++++++++++ etl/route_march_data_pull/app.py | 502 --------------------- etl/route_march_data_pull/requirements.txt | 0 3 files changed, 497 insertions(+), 502 deletions(-) delete mode 100644 etl/route_march_data_pull/app.py delete mode 100644 etl/route_march_data_pull/requirements.txt diff --git a/asset_list/app.py b/asset_list/app.py index 21b405d8..1a7788fe 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -1 +1,498 @@ import os +import time +import json +import pandas as pd +import numpy as np +from tqdm import tqdm +import msgpack +from utils.s3 import read_from_s3 +from asset_list.AssetList import AssetList +from asset_list.mappings.property_type import PROPERTY_MAPPING +from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS +from asset_list.mappings.heating_systems import HEATING_MAPPINGS +from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS + +from dotenv import load_dotenv +from backend.SearchEpc import SearchEpc +from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + + +def get_data( + asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, + uprn_column=None, epc_api_only=False, row_id_name="row_id" +): + epc_data = [] + errors = [] + no_epc = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + try: + postcode = home[postcode_column] + house_number = str(home[address1_column]).strip() + full_address = home[fulladdress_column].strip() + house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode) + if house_no is None: + house_no = house_number + uprn = manual_uprn_map.get(full_address, None) + if uprn is None and home.get(uprn_column): + uprn = home[uprn_column] + + if pd.isnull(uprn): + uprn = None + + searcher = SearchEpc( + address1=str(house_no), + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5, + uprn=uprn + ) + # Force the skipping of estimating the EPC + searcher.ordnance_survey_client.property_type = None + searcher.ordnance_survey_client.built_form = None + + searcher.find_property(skip_os=True) + + # Check if we have a flat or appartment + if searcher.newest_epc is None and uprn is None: + # Try again: + if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None: + # Backup + add1 = full_address.split(",") + if len(add1) > 1: + add1 = add1[1].strip() + else: + # Try splitting on space + add1 = full_address.split(" ")[0].strip() + + else: + add1 = str(house_number) + searcher = SearchEpc( + address1=add1, + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5 + ) + + if ( + "flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in + house_number.lower() + ): + searcher.ordnance_survey_client.property_type = "Flat" + + searcher.find_property(skip_os=True) + + if searcher.newest_epc is None: + no_epc.append(home[row_id_name]) + continue + + if epc_api_only: + epc = { + row_id_name: home[row_id_name], + **searcher.newest_epc.copy() + } + + epc_data.append(epc) + continue + + # Look for EPC recommendatons + try: + property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"]) + except: + property_recommendations = {"rows": []} + + # Retrieve data from FindMyEPC + try: + find_epc_searcher = RetrieveFindMyEpc( + address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"] + ) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() + except ValueError as e: + if "No EPC found" in str(e) and "address1" in searcher.newest_epc: + try: + find_epc_searcher = RetrieveFindMyEpc( + address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"] + ) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() + except ValueError as e: + if "No EPC found" in str(e): + find_epc_data = {} + else: + find_epc_data = {} + except Exception as e: + raise Exception(f"Error retrieving FindMyEPC data: {e}") + time.sleep(np.random.uniform(0.1, 1)) + + epc = { + row_id_name: home[row_id_name], + **searcher.newest_epc.copy(), + "recommendations": property_recommendations["rows"], + "find_my_epc_data": find_epc_data, + } + + epc_data.append(epc) + except Exception as e: + errors.append(home[row_id_name]) + time.sleep(5) + + return epc_data, errors, no_epc + + +def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"): + if method == "first_two_words": + asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") + return asset_list + + if method == "first_word": + asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0] + return asset_list + + if method == "house_number_extraction": + asset_list["address1_extracted"] = asset_list.apply( + lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]), + axis=1 + ) + return asset_list + + raise ValueError(f"Method {method} not recognized") + + +def app(): + """ + This app is EPC pulling data for some properties owned by Livewest + + Data request contents: + Date of last EPC + Reason for EPC + SAP score on register + Property Type + Property Area + Property Age + Any Dimensions (HLP,PW,RH) + Property Wall Construction + Heating Type + Secondary Heating + Loft Insulation Depth + + Additional if possible: + Heat loss calculations + EPC recommendations + Property UPRN + """ + + # TODO: + # For cavity work: + # - Flag any entries that have a different wall type between non-intrusive data against EPC + # - Worth double checking entries that have a difference in wall construction + # - Look at anything that is flagged as an empty cavity but the EPC data says it’s a filled cavity + # - Look at the current EPC scores - Anything that is C75 or above, especially if it’s assumed no insulation + # - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats + # are less than C75 + # - Flag anything pre SAP2012 + # - Flag anything over 5 years old + # - Look at year built vs age band + # + # For Solar: + # - Discount any that have solar PV - based on non-intrusives and from the inspections team + # - In the heating, discount anything that isn’t ashp, ghsp, hhrs, electric storage - possibly homes with + # electric room heaters but it might need to be an EPC E + # - Fabric - check the floor, wall and roof: + # - Filled or empty cavity is good + # - Insulated solid/timber/system built is good + # - SCIS/CEG needs solid floors + # - JJC don’t care + # - Anything with a loft 200 or below + # - Anything C75 and above won’t qualify + # - Insulated loft = 200mm + # - We want: fully insulated property (all wall types), EPC D or below (floors should be solid) + # - Or the insulation required is loft/cavity (floors should be solid) + + # For Westward + DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward" + DATA_FILENAME = "WESTWARD - completed list..xlsx" + SHEET_NAME = "Sheet1" + + POSTCODE_COLUMN = "WFT EDIT Postcode" + FULLADDRESS_COLUMN = "Address" + ADDRESS1_COLUMN = None + ADDRESS1_METHOD = "house_number_extraction" + + ADDRESS_COLS_TO_CONCAT = [] + MISSING_POSTCODES_METHOD = None + PROPERTY_YEAR_BUILT = "Build date" + UPRN_COLUMN = "UPRN" + # If we have the non-intrusives data, this should be true + HAS_NON_INTRUSIVES = True + PROPERTY_TYPE_COLUMN = "Location type" # This will be used to identify and remove bedsits + + # Maps addresses to uprn in problematic cases + MANUAL_UPRN_MAP = {} + + asset_list = AssetList( + local_filepath=os.path.join(DATA_FOLDER, DATA_FILENAME), + header=0, + sheet_name=SHEET_NAME, + address1_colname=ADDRESS1_COLUMN, + postcode_colname=POSTCODE_COLUMN, + landlord_property_id="UPRN", + full_address_colname=FULLADDRESS_COLUMN, + full_address_cols_to_concat=ADDRESS_COLS_TO_CONCAT, + missing_postcodes_method=MISSING_POSTCODES_METHOD, + address1_extraction_method=ADDRESS1_METHOD, + landlord_year_built=PROPERTY_YEAR_BUILT, + landlord_uprn=UPRN_COLUMN, + landlord_property_type=PROPERTY_TYPE_COLUMN, + landlord_wall_construction="Wall Construction (EPC)", + landlord_heating_system="Heat Source", + landlord_existing_pv="PV (Y/N)" + ) + asset_list.init_standardise() + + # We produce the new maps, which can be saved for future useage + + new_property_type_map = PROPERTY_MAPPING.copy().update( + asset_list.variable_mappings[asset_list.landlord_property_type] + ) + new_wall_map = WALL_CONSTRUCTION_MAPPINGS.copy().update( + asset_list.variable_mappings[asset_list.landlord_wall_construction] + ) + new_heating_map = HEATING_MAPPINGS.copy().update( + asset_list.variable_mappings[asset_list.landlord_heating_system] + ) + new_existing_pv_map = EXISTING_PV_MAPPINGS.copy().update( + asset_list.variable_mappings[asset_list.landlord_existing_pv] + ) + + asset_list.apply_standardiation() + + # DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester" + # DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx" + # SHEET_NAME = "Sheet1" + # POSTCODE_COLUMN = 'Full Address.1' + # FULLADDRESS_COLUMN = "Full Address" + # ADDRESS1_COLUMN = None + # ADDRESS1_METHOD = "first_word" + # ADDRESS_COLS_TO_CONCAT = [] + # MISSING_POSTCODES_METHOD = None + # PROPERTY_YEAR_BUILT = "Build Date" + # UPRN_COLUMN = None + # # If we have the non-intrusives data, this should be true + # HAS_NON_INTRUSIVES = True + + ### We retrieve the EPC data + + # We chunk up this data into 5000 rows at a time + # Create the chunks directory + force_retrieve_data = False + skip = None # Used to skip already completed chunks + chunk_size = 5000 + filename = "Chunk {i}.csv" + download_folder = os.path.join(DATA_FOLDER, "Chunks") + if not os.path.exists(download_folder): + os.makedirs(download_folder) + + chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size)) + downloaded_files = {filename.format(i=i) for i in chunk_indexes} + + # We check if we have files associated to these files already and if we do, and we do not want to force the + # fetching of the data, we skip + folder_contents = os.listdir(download_folder) + if all(x in folder_contents for x in downloaded_files): + skip = max(chunk_indexes) + + for i in range(0, len(asset_list.standardised_asset_list), chunk_size): + print(f"Processing chunk {i} to {i + chunk_size}") + if skip is not None and not force_retrieve_data: + if i <= skip: + continue + chunk = asset_list.standardised_asset_list[i:i + chunk_size] + epc_data_chunk, errors_chunk, no_epc_chunk = get_data( + asset_list=chunk, + row_id_name=asset_list.DOMNA_PROPERTY_ID, + fulladdress_column=asset_list.STANDARD_FULL_ADDRESS, + address1_column=asset_list.STANDARD_ADDRESS_1, + postcode_column=asset_list.STANDARD_POSTCODE, + manual_uprn_map=MANUAL_UPRN_MAP, + uprn_column=asset_list.STANDARD_UPRN + ) + + # We now retrieve any failed properties + chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)] + epc_data_failed, _, _ = get_data( + asset_list=chunk_failed, + row_id_name=asset_list.DOMNA_PROPERTY_ID, + fulladdress_column=FULLADDRESS_COLUMN, + address1_column=ADDRESS1_COLUMN, + postcode_column=POSTCODE_COLUMN, + manual_uprn_map=MANUAL_UPRN_MAP, + epc_api_only=False + ) + + epc_data_chunk.extend(epc_data_failed) + + # Append the failed data to the main data + # Store the chunk locally as a csv + pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False) + # Store the errors and no-data locally + with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f: + json.dump(errors_chunk, f) + + with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f: + json.dump(no_epc_chunk, f) + + # We read in and concatenate the created created chunks + # List the contents + epc_data = [] + for file in downloaded_files: + csv_data = pd.read_csv(os.path.join(download_folder, file)) + # We need to convert the recommendations back to a list + csv_data["recommendations"] = csv_data["recommendations"].apply(eval) + csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval) + epc_data.append(csv_data) + + epc_df = pd.concat(epc_data) + # TODO: TEMP!!! + epc_df = epc_df.rename(columns={"row_id": asset_list.DOMNA_PROPERTY_ID}) + + # We expand out the recommendations + recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]] + + unique_recommendations = set() + for _, row in recommendations_df.iterrows(): + unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) + + columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations) + transformed_data = [] + for _, row in recommendations_df.iterrows(): + # Initialize a dictionary for this row with False for all recommendations + row_data = {col: False for col in columns} + row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID] + + # Set True for each recommendation present in this row + for rec in row["recommendations"]: + recommendation_text = rec["improvement-summary-text"] + row_data[recommendation_text] = True + + # Append the row data to transformed_data + transformed_data.append(row_data) + + transformed_df = pd.DataFrame(transformed_data) + transformed_df = transformed_df[ + [ + asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)", + "Floor insulation", "Floor insulation (suspended floor)" + ] + ] + + transformed_df["epc_has_floor_recommendation"] = ( + transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] | + transformed_df["Floor insulation (suspended floor)"] + ) + + # Get the find my epc data + find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop( + columns=["find_my_epc_data"]).join( + pd.json_normalize(epc_df["find_my_epc_data"]) + ) + find_my_epc_data = find_my_epc_data.merge( + transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]], + how="left", on=asset_list.DOMNA_PROPERTY_ID + ) + + # We check if we get the solar pv column: + if "Solar photovoltaics" not in find_my_epc_data.columns: + find_my_epc_data["Solar photovoltaics"] = False + + # Retrieve just the data we need + epc_df = epc_df[ + [asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys()) + ].rename( + columns=asset_list.EPC_API_DATA_NAMES + ) + + epc_df = epc_df.merge( + find_my_epc_data[ + [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys()) + ] + .rename(columns=asset_list.FIND_EPC_DATA_NAMES), + how="left", + on=asset_list.DOMNA_PROPERTY_ID + ) + + asset_list.merge_data(epc_df) + # TODO: TEMP!!! + epc_df["epc_os_uprn"] = epc_df["epc_os_uprn"].astype("Int64").astype(str) + asset_list.standardised_asset_list = asset_list.standardised_asset_list.merge( + epc_df.drop(columns=["domna_property_id"]), how="left", left_on="ordnance_survey_uprn", right_on="epc_os_uprn" + ) + + asset_list.extract_attributes() + + cleaned = read_from_s3( + s3_file_name="cleaned_epc_data/cleaned.bson", + bucket_name="retrofit-data-dev" + ) + cleaned = msgpack.unpackb(cleaned, raw=False) + + asset_list.identify_worktypes(cleaned) + + # TODO: We should do this breakdown for flats + def flat_analysis(asset_list): + + # We need to deduce the building name - we strip out the house number + def extract_building_name(x): + # TODO: This doesn't really work + if pd.isnull(x): + return None + house_no = SearchEpc.get_house_number(address=x, postcode=None) + if house_no: + return x.replace(house_no, "").strip() + return x.split(",")[0].strip() + + # We want to deduce if flats have 50% of the properties below C75 + # We group by postcode and property type + grouped = asset_list.groupby([POSTCODE_COLUMN, "Property Type"]) + + flat_data = [] + for _, group in grouped: + if "flat" in group["Property Type"].str.lower().values: + num_flats = group["Property Type"].str.lower().value_counts().get("flat", 0) + num_below_c75 = group["SAP score on register"].lt(75).sum() + + flat_data.append( + { + "Postcode": group[POSTCODE_COLUMN].iloc[0], + "Property Type": "Flat", + "Number of Flats with EPC": num_flats, + "Number of Flats below C75": num_below_c75, + "Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats) + } + ) + + flat_data = pd.DataFrame(flat_data) + + return flat_data + + flat_data = flat_analysis(asset_list) + + # Store as an excel + filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx" + # Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data + + with pd.ExcelWriter(filename) as writer: + asset_list.to_excel(writer, sheet_name="EPC Data", index=False) + flat_data.to_excel(writer, sheet_name="Flat Data", index=False) + + matches_review = asset_list[ + [FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"] + ] diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py deleted file mode 100644 index 7bf3cca8..00000000 --- a/etl/route_march_data_pull/app.py +++ /dev/null @@ -1,502 +0,0 @@ -import os -import time -import json -import pandas as pd -import numpy as np -from tqdm import tqdm -import msgpack -from utils.s3 import read_from_s3 -from asset_list.AssetList import AssetList -from asset_list.mappings.property_type import PROPERTY_MAPPING -from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS -from asset_list.mappings.heating_systems import HEATING_MAPPINGS -from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS - -from dotenv import load_dotenv -from backend.SearchEpc import SearchEpc -from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc - -from etl.epc_clean.epc_attributes.attribute_utils import ( - extract_thermal_transmittance -) - -load_dotenv(dotenv_path="backend/.env") -EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") - - -def get_data( - asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, - uprn_column=None, epc_api_only=False, row_id_name="row_id" -): - epc_data = [] - errors = [] - no_epc = [] - for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): - try: - postcode = home[postcode_column] - house_number = str(home[address1_column]).strip() - full_address = home[fulladdress_column].strip() - house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode) - if house_no is None: - house_no = house_number - uprn = manual_uprn_map.get(full_address, None) - if uprn is None and home.get(uprn_column): - uprn = home[uprn_column] - - if pd.isnull(uprn): - uprn = None - - searcher = SearchEpc( - address1=str(house_no), - postcode=postcode, - auth_token=EPC_AUTH_TOKEN, - os_api_key="", - property_type=None, - fast=True, - full_address=full_address, - max_retries=5, - uprn=uprn - ) - # Force the skipping of estimating the EPC - searcher.ordnance_survey_client.property_type = None - searcher.ordnance_survey_client.built_form = None - - searcher.find_property(skip_os=True) - - # Check if we have a flat or appartment - if searcher.newest_epc is None and uprn is None: - # Try again: - if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None: - # Backup - add1 = full_address.split(",") - if len(add1) > 1: - add1 = add1[1].strip() - else: - # Try splitting on space - add1 = full_address.split(" ")[0].strip() - - else: - add1 = str(house_number) - searcher = SearchEpc( - address1=add1, - postcode=postcode, - auth_token=EPC_AUTH_TOKEN, - os_api_key="", - property_type=None, - fast=True, - full_address=full_address, - max_retries=5 - ) - - if ( - "flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in - house_number.lower() - ): - searcher.ordnance_survey_client.property_type = "Flat" - - searcher.find_property(skip_os=True) - - if searcher.newest_epc is None: - no_epc.append(home[row_id_name]) - continue - - if epc_api_only: - epc = { - row_id_name: home[row_id_name], - **searcher.newest_epc.copy() - } - - epc_data.append(epc) - continue - - # Look for EPC recommendatons - try: - property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"]) - except: - property_recommendations = {"rows": []} - - # Retrieve data from FindMyEPC - try: - find_epc_searcher = RetrieveFindMyEpc( - address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"] - ) - find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() - except ValueError as e: - if "No EPC found" in str(e) and "address1" in searcher.newest_epc: - try: - find_epc_searcher = RetrieveFindMyEpc( - address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"] - ) - find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() - except ValueError as e: - if "No EPC found" in str(e): - find_epc_data = {} - else: - find_epc_data = {} - except Exception as e: - raise Exception(f"Error retrieving FindMyEPC data: {e}") - time.sleep(np.random.uniform(0.1, 1)) - - epc = { - row_id_name: home[row_id_name], - **searcher.newest_epc.copy(), - "recommendations": property_recommendations["rows"], - "find_my_epc_data": find_epc_data, - } - - epc_data.append(epc) - except Exception as e: - errors.append(home[row_id_name]) - time.sleep(5) - - return epc_data, errors, no_epc - - -def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"): - if method == "first_two_words": - asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") - return asset_list - - if method == "first_word": - asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0] - return asset_list - - if method == "house_number_extraction": - asset_list["address1_extracted"] = asset_list.apply( - lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]), - axis=1 - ) - return asset_list - - raise ValueError(f"Method {method} not recognized") - - -def app(): - """ - This app is EPC pulling data for some properties owned by Livewest - - Data request contents: - Date of last EPC - Reason for EPC - SAP score on register - Property Type - Property Area - Property Age - Any Dimensions (HLP,PW,RH) - Property Wall Construction - Heating Type - Secondary Heating - Loft Insulation Depth - - Additional if possible: - Heat loss calculations - EPC recommendations - Property UPRN - """ - - # TODO: - # For cavity work: - # - Flag any entries that have a different wall type between non-intrusive data against EPC - # - Worth double checking entries that have a difference in wall construction - # - Look at anything that is flagged as an empty cavity but the EPC data says it’s a filled cavity - # - Look at the current EPC scores - Anything that is C75 or above, especially if it’s assumed no insulation - # - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats - # are less than C75 - # - Flag anything pre SAP2012 - # - Flag anything over 5 years old - # - Look at year built vs age band - # - # For Solar: - # - Discount any that have solar PV - based on non-intrusives and from the inspections team - # - In the heating, discount anything that isn’t ashp, ghsp, hhrs, electric storage - possibly homes with - # electric room heaters but it might need to be an EPC E - # - Fabric - check the floor, wall and roof: - # - Filled or empty cavity is good - # - Insulated solid/timber/system built is good - # - SCIS/CEG needs solid floors - # - JJC don’t care - # - Anything with a loft 200 or below - # - Anything C75 and above won’t qualify - # - Insulated loft = 200mm - # - We want: fully insulated property (all wall types), EPC D or below (floors should be solid) - # - Or the insulation required is loft/cavity (floors should be solid) - - # For Westward - DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward" - DATA_FILENAME = "WESTWARD - completed list..xlsx" - SHEET_NAME = "Sheet1" - - POSTCODE_COLUMN = "WFT EDIT Postcode" - FULLADDRESS_COLUMN = "Address" - ADDRESS1_COLUMN = None - ADDRESS1_METHOD = "house_number_extraction" - - ADDRESS_COLS_TO_CONCAT = [] - MISSING_POSTCODES_METHOD = None - PROPERTY_YEAR_BUILT = "Build date" - UPRN_COLUMN = "UPRN" - # If we have the non-intrusives data, this should be true - HAS_NON_INTRUSIVES = True - PROPERTY_TYPE_COLUMN = "Location type" # This will be used to identify and remove bedsits - - # Maps addresses to uprn in problematic cases - MANUAL_UPRN_MAP = {} - - asset_list = AssetList( - local_filepath=os.path.join(DATA_FOLDER, DATA_FILENAME), - header=0, - sheet_name=SHEET_NAME, - address1_colname=ADDRESS1_COLUMN, - postcode_colname=POSTCODE_COLUMN, - landlord_property_id="UPRN", - full_address_colname=FULLADDRESS_COLUMN, - full_address_cols_to_concat=ADDRESS_COLS_TO_CONCAT, - missing_postcodes_method=MISSING_POSTCODES_METHOD, - address1_extraction_method=ADDRESS1_METHOD, - landlord_year_built=PROPERTY_YEAR_BUILT, - landlord_uprn=UPRN_COLUMN, - landlord_property_type=PROPERTY_TYPE_COLUMN, - landlord_wall_construction="Wall Construction (EPC)", - landlord_heating_system="Heat Source", - landlord_existing_pv="PV (Y/N)" - ) - asset_list.init_standardise() - - # We produce the new maps, which can be saved for future useage - - new_property_type_map = PROPERTY_MAPPING.copy().update( - asset_list.variable_mappings[asset_list.landlord_property_type] - ) - new_wall_map = WALL_CONSTRUCTION_MAPPINGS.copy().update( - asset_list.variable_mappings[asset_list.landlord_wall_construction] - ) - new_heating_map = HEATING_MAPPINGS.copy().update( - asset_list.variable_mappings[asset_list.landlord_heating_system] - ) - new_existing_pv_map = EXISTING_PV_MAPPINGS.copy().update( - asset_list.variable_mappings[asset_list.landlord_existing_pv] - ) - - asset_list.apply_standardiation() - - # DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester" - # DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx" - # SHEET_NAME = "Sheet1" - # POSTCODE_COLUMN = 'Full Address.1' - # FULLADDRESS_COLUMN = "Full Address" - # ADDRESS1_COLUMN = None - # ADDRESS1_METHOD = "first_word" - # ADDRESS_COLS_TO_CONCAT = [] - # MISSING_POSTCODES_METHOD = None - # PROPERTY_YEAR_BUILT = "Build Date" - # UPRN_COLUMN = None - # # If we have the non-intrusives data, this should be true - # HAS_NON_INTRUSIVES = True - - ### We retrieve the EPC data - - # We chunk up this data into 5000 rows at a time - # Create the chunks directory - force_retrieve_data = False - skip = None # Used to skip already completed chunks - chunk_size = 5000 - filename = "Chunk {i}.csv" - download_folder = os.path.join(DATA_FOLDER, "Chunks") - if not os.path.exists(download_folder): - os.makedirs(download_folder) - - chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size)) - downloaded_files = {filename.format(i=i) for i in chunk_indexes} - - # We check if we have files associated to these files already and if we do, and we do not want to force the - # fetching of the data, we skip - folder_contents = os.listdir(download_folder) - if all(x in folder_contents for x in downloaded_files): - skip = max(chunk_indexes) - - for i in range(0, len(asset_list.standardised_asset_list), chunk_size): - print(f"Processing chunk {i} to {i + chunk_size}") - if skip is not None and not force_retrieve_data: - if i <= skip: - continue - chunk = asset_list.standardised_asset_list[i:i + chunk_size] - epc_data_chunk, errors_chunk, no_epc_chunk = get_data( - asset_list=chunk, - row_id_name=asset_list.DOMNA_PROPERTY_ID, - fulladdress_column=asset_list.STANDARD_FULL_ADDRESS, - address1_column=asset_list.STANDARD_ADDRESS_1, - postcode_column=asset_list.STANDARD_POSTCODE, - manual_uprn_map=MANUAL_UPRN_MAP, - uprn_column=asset_list.STANDARD_UPRN - ) - - # We now retrieve any failed properties - chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)] - epc_data_failed, _, _ = get_data( - asset_list=chunk_failed, - row_id_name=asset_list.DOMNA_PROPERTY_ID, - fulladdress_column=FULLADDRESS_COLUMN, - address1_column=ADDRESS1_COLUMN, - postcode_column=POSTCODE_COLUMN, - manual_uprn_map=MANUAL_UPRN_MAP, - epc_api_only=False - ) - - epc_data_chunk.extend(epc_data_failed) - - # Append the failed data to the main data - # Store the chunk locally as a csv - pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False) - # Store the errors and no-data locally - with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f: - json.dump(errors_chunk, f) - - with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f: - json.dump(no_epc_chunk, f) - - # We read in and concatenate the created created chunks - # List the contents - epc_data = [] - for file in downloaded_files: - csv_data = pd.read_csv(os.path.join(download_folder, file)) - # We need to convert the recommendations back to a list - csv_data["recommendations"] = csv_data["recommendations"].apply(eval) - csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval) - epc_data.append(csv_data) - - epc_df = pd.concat(epc_data) - # TODO: TEMP!!! - epc_df = epc_df.rename(columns={"row_id": asset_list.DOMNA_PROPERTY_ID}) - - # We expand out the recommendations - recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]] - - unique_recommendations = set() - for _, row in recommendations_df.iterrows(): - unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) - - columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations) - transformed_data = [] - for _, row in recommendations_df.iterrows(): - # Initialize a dictionary for this row with False for all recommendations - row_data = {col: False for col in columns} - row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID] - - # Set True for each recommendation present in this row - for rec in row["recommendations"]: - recommendation_text = rec["improvement-summary-text"] - row_data[recommendation_text] = True - - # Append the row data to transformed_data - transformed_data.append(row_data) - - transformed_df = pd.DataFrame(transformed_data) - transformed_df = transformed_df[ - [ - asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)", - "Floor insulation", "Floor insulation (suspended floor)" - ] - ] - - transformed_df["epc_has_floor_recommendation"] = ( - transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] | - transformed_df["Floor insulation (suspended floor)"] - ) - - # Get the find my epc data - find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop( - columns=["find_my_epc_data"]).join( - pd.json_normalize(epc_df["find_my_epc_data"]) - ) - find_my_epc_data = find_my_epc_data.merge( - transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]], - how="left", on=asset_list.DOMNA_PROPERTY_ID - ) - - # We check if we get the solar pv column: - if "Solar photovoltaics" not in find_my_epc_data.columns: - find_my_epc_data["Solar photovoltaics"] = False - - # Retrieve just the data we need - epc_df = epc_df[ - [asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys()) - ].rename( - columns=asset_list.EPC_API_DATA_NAMES - ) - - epc_df = epc_df.merge( - find_my_epc_data[ - [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys()) - ] - .rename(columns=asset_list.FIND_EPC_DATA_NAMES), - how="left", - on=asset_list.DOMNA_PROPERTY_ID - ) - - asset_list.merge_data(epc_df) - # TODO: TEMP!!! - epc_df["epc_os_uprn"] = epc_df["epc_os_uprn"].astype("Int64").astype(str) - asset_list.standardised_asset_list = asset_list.standardised_asset_list.merge( - epc_df.drop(columns=["domna_property_id"]), how="left", left_on="ordnance_survey_uprn", right_on="epc_os_uprn" - ) - - asset_list.extract_attributes() - - cleaned = read_from_s3( - s3_file_name="cleaned_epc_data/cleaned.bson", - bucket_name="retrofit-data-dev" - ) - cleaned = msgpack.unpackb(cleaned, raw=False) - - asset_list.identify_worktypes(cleaned) - - # TODO: We should do this breakdown for flats - def flat_analysis(asset_list): - - # We need to deduce the building name - we strip out the house number - def extract_building_name(x): - # TODO: This doesn't really work - if pd.isnull(x): - return None - house_no = SearchEpc.get_house_number(address=x, postcode=None) - if house_no: - return x.replace(house_no, "").strip() - return x.split(",")[0].strip() - - # We want to deduce if flats have 50% of the properties below C75 - # We group by postcode and property type - grouped = asset_list.groupby([POSTCODE_COLUMN, "Property Type"]) - - flat_data = [] - for _, group in grouped: - if "flat" in group["Property Type"].str.lower().values: - num_flats = group["Property Type"].str.lower().value_counts().get("flat", 0) - num_below_c75 = group["SAP score on register"].lt(75).sum() - - flat_data.append( - { - "Postcode": group[POSTCODE_COLUMN].iloc[0], - "Property Type": "Flat", - "Number of Flats with EPC": num_flats, - "Number of Flats below C75": num_below_c75, - "Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats) - } - ) - - flat_data = pd.DataFrame(flat_data) - - return flat_data - - flat_data = flat_analysis(asset_list) - - # Store as an excel - filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx" - # Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data - - with pd.ExcelWriter(filename) as writer: - asset_list.to_excel(writer, sheet_name="EPC Data", index=False) - flat_data.to_excel(writer, sheet_name="Flat Data", index=False) - - matches_review = asset_list[ - [FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"] - ] diff --git a/etl/route_march_data_pull/requirements.txt b/etl/route_march_data_pull/requirements.txt deleted file mode 100644 index e69de29b..00000000