import os import json import pandas as pd from asset_list.AssetList import AssetList from asset_list.mappings.property_type import PROPERTY_MAPPING from asset_list.mappings.built_form import BUILT_FORM_MAPPINGS from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS from asset_list.mappings.heating_systems import HEATING_MAPPINGS from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS from asset_list.mappings.roof import ROOF_CONSTRUCTION_MAPPINGS from asset_list.utils import get_data from dotenv import load_dotenv from backend.SearchEpc import SearchEpc load_dotenv(dotenv_path="backend/.env") EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"): if method == "first_two_words": asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") return asset_list if method == "first_word": asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0] return asset_list if method == "house_number_extraction": asset_list["address1_extracted"] = asset_list.apply( lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]), axis=1 ) return asset_list raise ValueError(f"Method {method} not recognized") def app(): """ This app is EPC pulling data for some properties owned by Livewest Data request contents: Date of last EPC Reason for EPC SAP score on register Property Type Property Area Property Age Any Dimensions (HLP,PW,RH) Property Wall Construction Heating Type Secondary Heating Loft Insulation Depth Additional if possible: Heat loss calculations EPC recommendations Property UPRN """ data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Warmfront/SCIS") data_filename = "SCIS_Historic_Deemed_Combined_Workings.xlsx" sheet_name = "SCIS" postcode_column = 'POSTCODE' address1_column = "NO" address1_method = None fulladdress_column = None address_cols_to_concat = ["NO", "Street / Block Name", "Town/Area"] missing_postcodes_method = None landlord_year_built = None landlord_os_uprn = None landlord_property_type = "PROPERTY TYPE As per table emailed" landlord_built_form = "PROPERTY TYPE As per table emailed" landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None landlord_existing_pv = None landlord_property_id = "Row ID" landlord_sap = None outcomes_filename = None outcomes_sheetname = None outcomes_postcode = None outcomes_houseno = None outcomes_id = None outcomes_address = None master_filepaths = [] master_id_colnames = [] master_to_asset_list_filepath = None phase = False ecosurv_landlords = None asset_list_header = 0 landlord_block_reference = None # Peabody data for cleaning data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting " "Project/data_validation") data_filename = "to_standardise_uprns.xlsx" sheet_name = "Sheet1" postcode_column = 'Postcode' address1_column = "Address 1" address1_method = None fulladdress_column = None address_cols_to_concat = ["Address 1", "Address 2", "Address 3"] missing_postcodes_method = None landlord_year_built = None landlord_os_uprn = None landlord_property_type = "Type" landlord_built_form = "Attachment" landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None landlord_existing_pv = None landlord_property_id = "Org Ref" landlord_sap = None outcomes_filename = None outcomes_sheetname = None outcomes_postcode = None outcomes_houseno = None outcomes_id = None outcomes_address = None master_filepaths = [] master_id_colnames = [] master_to_asset_list_filepath = None phase = False ecosurv_landlords = None asset_list_header = 0 landlord_block_reference = None # Lambeth: # data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lambeth/December 10th" # data_filename = "lambeth_sw2_leigham court estate.xlsx" # sheet_name = "Sheet1" # postcode_column = 'Postcode' # address1_column = "Address" # address1_method = None # fulladdress_column = None # address_cols_to_concat = ["Address"] # missing_postcodes_method = None # landlord_year_built = None # landlord_os_uprn = None # landlord_property_type = None # landlord_built_form = None # landlord_wall_construction = None # landlord_roof_construction = None # landlord_heating_system = None # landlord_existing_pv = None # landlord_property_id = "row_id" # landlord_sap = None # outcomes_filename = None # outcomes_sheetname = None # outcomes_postcode = None # outcomes_houseno = None # outcomes_id = None # outcomes_address = None # master_filepaths = [] # master_id_colnames = [] # master_to_asset_list_filepath = None # phase = False # ecosurv_landlords = None # asset_list_header = 0 # landlord_block_reference = None # Maps addresses to uprn in problematic cases manual_uprn_map = {} asset_list = AssetList( local_filepath=os.path.join(data_folder, data_filename), header=asset_list_header, sheet_name=sheet_name, address1_colname=address1_column, postcode_colname=postcode_column, landlord_property_id=landlord_property_id, full_address_colname=fulladdress_column, full_address_cols_to_concat=address_cols_to_concat, missing_postcodes_method=missing_postcodes_method, address1_extraction_method=address1_method, landlord_year_built=landlord_year_built, landlord_uprn=landlord_os_uprn, landlord_property_type=landlord_property_type, landlord_built_form=landlord_built_form, landlord_wall_construction=landlord_wall_construction, landlord_roof_construction=landlord_roof_construction, landlord_heating_system=landlord_heating_system, landlord_existing_pv=landlord_existing_pv, landlord_sap=landlord_sap, landlord_block_reference=landlord_block_reference, phase=phase ) asset_list.init_standardise() # We produce the new maps, which can be saved for future useage new_property_type_map = { k: v for k, v in ( asset_list.variable_mappings[asset_list.landlord_property_type] if asset_list.landlord_property_type else {} ).items() if k not in PROPERTY_MAPPING } new_built_form_map = { k: v for k, v in ( asset_list.variable_mappings[asset_list.landlord_built_form] if asset_list.landlord_built_form else {} ).items() if k not in BUILT_FORM_MAPPINGS } new_wall_map = { k: v for k, v in ( asset_list.variable_mappings[asset_list.landlord_wall_construction] if asset_list.landlord_wall_construction else {} ).items() if k not in WALL_CONSTRUCTION_MAPPINGS } new_heating_map = { k: v for k, v in ( asset_list.variable_mappings[asset_list.landlord_heating_system] if asset_list.landlord_heating_system else {} ).items() if k not in HEATING_MAPPINGS } new_existing_pv_map = { k: v for k, v in ( asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {} ).items() if k not in EXISTING_PV_MAPPINGS } new_roof_construction_map = { k: v for k, v in ( asset_list.variable_mappings[asset_list.landlord_roof_construction] if asset_list.landlord_roof_construction else {} ).items() if k not in ROOF_CONSTRUCTION_MAPPINGS } asset_list.apply_standardiation() # We now flag properties that have been treated under existing programmes asset_list.flag_outcomes( outcomes_filepaths=outcomes_filename, outcomes_sheetname=outcomes_sheetname, outcomes_address=outcomes_address, outcomes_postcode=outcomes_postcode, outcomes_houseno=outcomes_houseno, outcomes_id=outcomes_id ) asset_list.flag_survey_master( master_filepaths=master_filepaths, master_to_asset_list_filepath=master_to_asset_list_filepath, master_id_colnames=master_id_colnames, ) asset_list.flag_ecosurv(ecosurv_landlords) ### We retrieve the EPC data # We chunk up this data into 5000 rows at a time # Create the chunks directory epc_api_only = False force_retrieve_data = False skip = None # Used to skip already completed chunks chunk_size = 2000 filename = "Chunk {i}.csv" download_folder = os.path.join(data_folder, "Chunks") if not os.path.exists(download_folder): os.makedirs(download_folder) chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size)) downloaded_files = {filename.format(i=i) for i in chunk_indexes} # We check if we have files associated to these files already and if we do, and we do not want to force the # fetching of the data, we skip folder_contents = os.listdir(download_folder) if all(x in folder_contents for x in downloaded_files): skip = max(chunk_indexes) if any(x in folder_contents for x in downloaded_files): skip = max([i for i in chunk_indexes if filename.format(i=i) in folder_contents]) for i in range(0, len(asset_list.standardised_asset_list), chunk_size): print(f"Processing chunk {i} to {i + chunk_size}") if skip is not None and not force_retrieve_data: if i <= skip: continue chunk = asset_list.standardised_asset_list[i:i + chunk_size] epc_data_chunk, errors_chunk, no_epc_chunk = get_data( df=chunk, row_id_name=asset_list.DOMNA_PROPERTY_ID, uprn_column=AssetList.STANDARD_UPRN, fulladdress_column=AssetList.STANDARD_FULL_ADDRESS, address1_column=AssetList.STANDARD_ADDRESS_1, postcode_column=AssetList.STANDARD_POSTCODE, property_type_column=AssetList.STANDARD_PROPERTY_TYPE, built_form_column=AssetList.STANDARD_BUILT_FORM, manual_uprn_map=manual_uprn_map, epc_api_only=epc_api_only, epc_auth_token=EPC_AUTH_TOKEN ) # We now retrieve any failed properties # chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)] # epc_data_failed, _, _ = get_data( # df=chunk_failed, # row_id_name=asset_list.DOMNA_PROPERTY_ID, # uprn_column=AssetList.STANDARD_UPRN, # fulladdress_column=AssetList.STANDARD_FULL_ADDRESS, # address1_column=AssetList.STANDARD_ADDRESS_1, # postcode_column=AssetList.STANDARD_POSTCODE, # property_type_column=AssetList.STANDARD_PROPERTY_TYPE, # built_form_column=AssetList.STANDARD_BUILT_FORM, # manual_uprn_map=manual_uprn_map, # epc_api_only=epc_api_only, # epc_auth_token=EPC_AUTH_TOKEN # ) # # epc_data_chunk.extend(epc_data_failed) # Append the failed data to the main data # Store the chunk locally as a csv pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False) # Store the errors and no-data locally with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f: json.dump(errors_chunk, f) with open(os.path.join(data_folder, f"Chunks/Chunk {i} nodata.csv"), "w") as f: json.dump(no_epc_chunk, f) # We read in and concatenate the created created chunks # List the contents epc_data = [] for file in downloaded_files: csv_data = pd.read_csv(os.path.join(download_folder, file)) # We need to convert the recommendations back to a list csv_data["recommendations"] = csv_data["recommendations"].apply(eval) # We don't have this if we didn't run the pulling from find my epc if "find_my_epc_data" in csv_data.columns: csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval) epc_data.append(csv_data) epc_df = pd.concat(epc_data) if "estimated" not in epc_df.columns: epc_df["estimated"] = False epc_df["estimated"] = epc_df["estimated"].fillna(False) # We expand out the recommendations recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]] unique_recommendations = set() for _, row in recommendations_df.iterrows(): unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations) transformed_data = [] for _, row in recommendations_df.iterrows(): # Initialize a dictionary for this row with False for all recommendations row_data = {col: False for col in columns} row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID] # Set True for each recommendation present in this row for rec in row["recommendations"]: recommendation_text = rec["improvement-summary-text"] row_data[recommendation_text] = True # Append the row data to transformed_data transformed_data.append(row_data) transformed_df = pd.DataFrame(transformed_data) for col in [ "Floor insulation (solid floor)", "Floor insulation", "Floor insulation (suspended floor)" ]: if col not in transformed_df.columns: transformed_df[col] = False transformed_df = transformed_df[ [ asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)", "Floor insulation", "Floor insulation (suspended floor)" ] ] transformed_df["epc_has_floor_recommendation"] = ( transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] | transformed_df["Floor insulation (suspended floor)"] ) # Get the find my epc data if "find_my_epc_data" not in epc_df.columns: epc_df["find_my_epc_data"] = None find_my_epc_data = [] for _, x in epc_df.iterrows(): if x["find_my_epc_data"]: find_my_epc_data.append( { asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID], **x["find_my_epc_data"] } ) else: find_my_epc_data.append( { asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID] } ) find_my_epc_data = pd.DataFrame(find_my_epc_data) find_my_epc_data = find_my_epc_data.merge( transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]], how="left", on=asset_list.DOMNA_PROPERTY_ID ) # We check if we get the solar pv column: if "Solar photovoltaics" not in find_my_epc_data.columns: find_my_epc_data["Solar photovoltaics"] = False # Retrieve just the data we need epc_df = epc_df[ [asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys()) ].rename( columns=asset_list.EPC_API_DATA_NAMES ) # Look for columns not in the find my EPC data, which will have happened if we didn't # retrieve it in the first place missed_find_epc_cols = [c for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) if c not in find_my_epc_data.columns] if missed_find_epc_cols: for c in missed_find_epc_cols: find_my_epc_data[c] = None epc_df = epc_df.merge( find_my_epc_data[ [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys()) ] .rename(columns=asset_list.FIND_EPC_DATA_NAMES), how="left", on=asset_list.DOMNA_PROPERTY_ID ) asset_list.merge_data(epc_df) asset_list.extract_attributes() asset_list.identify_worktypes() # We now flag the status of the property asset_list.label_property_status() asset_list.analyse_geographies() asset_list.get_work_figures() # Store as an excel filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx" # Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data # Determine inspections priority # solar_jobs = asset_list.standardised_asset_list[~pd.isnull(asset_list.standardised_asset_list["solar_reason"])][ # "domna_postcode"].unique() # asset_list.standardised_asset_list["in_solar_area"] = asset_list.standardised_asset_list["domna_postcode"].isin( # solar_jobs # ) # # Same for cav # cavity_jobs = asset_list.standardised_asset_list[ # ~pd.isnull(asset_list.standardised_asset_list["cavity_reason"]) # ]["domna_postcode"].unique() # asset_list.standardised_asset_list["in_cavity_area"] = asset_list.standardised_asset_list["domna_postcode"].isin( # cavity_jobs # ) # # We prioritise properties that are in solar areas and cavity areas # import numpy as np # asset_list.standardised_asset_list["inspection_priority"] = np.where( # asset_list.standardised_asset_list["in_solar_area"] | asset_list.standardised_asset_list["in_cavity_area"], # 1, 2 # ) with pd.ExcelWriter(filename) as writer: asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False) if asset_list.block_analysis_df is not None: asset_list.block_analysis_df.to_excel(writer, sheet_name="Block Analysis", index=False) # If we have outcomes, we add a tab with the outcomes if not asset_list.outcomes_for_output.empty: asset_list.outcomes_for_output.to_excel(writer, sheet_name="Outcomes", index=False) if not asset_list.unmatched_submissions.empty: asset_list.unmatched_submissions.to_excel(writer, sheet_name="Unmatched Submissions", index=False) if not asset_list.outcomes_no_match.empty: asset_list.outcomes_no_match.to_excel(writer, sheet_name="Unmatched Outcomes", index=False) if not asset_list.ecosurv_no_match.empty: asset_list.ecosurv_no_match.to_excel(writer, sheet_name="Unmatched Ecosurv", index=False) if not asset_list.geographical_areas.empty: asset_list.geographical_areas.to_excel(writer, sheet_name="Geographical Areas", index=False) # Store dupes if not asset_list.duplicated_addresses.empty: asset_list.duplicated_addresses.to_excel(writer, sheet_name="Duplicate Properties", index=False)