From a8150e3c91d9316b4c323f5abae2e50c86910c04 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 3 Feb 2026 17:01:37 +0000 Subject: [PATCH] merged from main --- asset_list/app.py | 188 +++++++++++++++++++++++++++++----------------- 1 file changed, 120 insertions(+), 68 deletions(-) diff --git a/asset_list/app.py b/asset_list/app.py index 9907a609..201e3ca8 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -12,23 +12,35 @@ from asset_list.utils import get_data from dotenv import load_dotenv from backend.SearchEpc import SearchEpc + load_dotenv(dotenv_path="backend/.env") -EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=") +EPC_AUTH_TOKEN = os.getenv( + "EPC_AUTH_TOKEN", + "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=", +) -def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"): +def extract_address1( + asset_list, full_address_col, postcode_col, method="first_two_words" +): if method == "first_two_words": - asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") + asset_list["address1_extracted"] = ( + asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") + ) return asset_list if method == "first_word": - asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0] + asset_list["address1_extracted"] = ( + asset_list[full_address_col].str.split(" ").str[0] + ) return asset_list if method == "house_number_extraction": asset_list["address1_extracted"] = asset_list.apply( - lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]), - axis=1 + lambda x: SearchEpc.get_house_number( + address=x[full_address_col], postcode=x[postcode_col] + ), + axis=1, ) return asset_list @@ -57,15 +69,11 @@ def app(): EPC recommendations Property UPRN """ -<<<<<<< HEAD - data_folder = ("/workspaces/model/asset_list") - data_filename = "assets.xlsx" -======= data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Hackney" data_filename = "Domna SHF Wave 3 (3).xlsx" sheet_name = "Domna Wave 3" - postcode_column = 'Postcode' + postcode_column = "Postcode" address1_column = "Address 1" address1_method = None fulladdress_column = None @@ -96,15 +104,16 @@ def app(): landlord_block_reference = None # Peabody data for cleaning - data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting " - "Project/data_validation") + data_folder = ( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting " + "Project/data_validation" + ) data_filename = "to_standardise_uprns.xlsx" ->>>>>>> 3874da6177cbcc37f7a488bec0a06e387906653c sheet_name = "Sheet1" - postcode_column = 'Postcode' + postcode_column = "Postcode" address1_column = None - address1_method = 'house_number_extraction' - fulladdress_column = 'Address' + address1_method = "house_number_extraction" + fulladdress_column = "Address" address_cols_to_concat = None missing_postcodes_method = None landlord_year_built = None @@ -155,49 +164,62 @@ def app(): landlord_existing_pv=landlord_existing_pv, landlord_sap=landlord_sap, landlord_block_reference=landlord_block_reference, - phase=phase + phase=phase, ) asset_list.init_standardise() # We produce the new maps, which can be saved for future useage new_property_type_map = { - k: v for k, v in ( - asset_list.variable_mappings[asset_list.landlord_property_type] if - asset_list.landlord_property_type else {} + k: v + for k, v in ( + asset_list.variable_mappings[asset_list.landlord_property_type] + if asset_list.landlord_property_type + else {} ).items() if k not in PROPERTY_MAPPING } new_built_form_map = { - k: v for k, v in ( - asset_list.variable_mappings[asset_list.landlord_built_form] if - asset_list.landlord_built_form else {} + k: v + for k, v in ( + asset_list.variable_mappings[asset_list.landlord_built_form] + if asset_list.landlord_built_form + else {} ).items() if k not in BUILT_FORM_MAPPINGS } new_wall_map = { - k: v for k, v in ( - asset_list.variable_mappings[asset_list.landlord_wall_construction] if - asset_list.landlord_wall_construction else {} + k: v + for k, v in ( + asset_list.variable_mappings[asset_list.landlord_wall_construction] + if asset_list.landlord_wall_construction + else {} ).items() if k not in WALL_CONSTRUCTION_MAPPINGS } new_heating_map = { - k: v for k, v in ( - asset_list.variable_mappings[asset_list.landlord_heating_system] if - asset_list.landlord_heating_system else {} + k: v + for k, v in ( + asset_list.variable_mappings[asset_list.landlord_heating_system] + if asset_list.landlord_heating_system + else {} ).items() if k not in HEATING_MAPPINGS } new_existing_pv_map = { - k: v for k, v in ( - asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {} + k: v + for k, v in ( + asset_list.variable_mappings[asset_list.landlord_existing_pv] + if asset_list.landlord_existing_pv + else {} ).items() if k not in EXISTING_PV_MAPPINGS } new_roof_construction_map = { - k: v for k, v in ( - asset_list.variable_mappings[asset_list.landlord_roof_construction] if - asset_list.landlord_roof_construction else {} + k: v + for k, v in ( + asset_list.variable_mappings[asset_list.landlord_roof_construction] + if asset_list.landlord_roof_construction + else {} ).items() if k not in ROOF_CONSTRUCTION_MAPPINGS } @@ -211,7 +233,7 @@ def app(): outcomes_address=outcomes_address, outcomes_postcode=outcomes_postcode, outcomes_houseno=outcomes_houseno, - outcomes_id=outcomes_id + outcomes_id=outcomes_id, ) asset_list.flag_survey_master( @@ -245,14 +267,16 @@ def app(): skip = max(chunk_indexes) if any(x in folder_contents for x in downloaded_files): - skip = max([i for i in chunk_indexes if filename.format(i=i) in folder_contents]) + skip = max( + [i for i in chunk_indexes if filename.format(i=i) in folder_contents] + ) for i in range(0, len(asset_list.standardised_asset_list), chunk_size): print(f"Processing chunk {i} to {i + chunk_size}") if skip is not None and not force_retrieve_data: if i <= skip: continue - chunk = asset_list.standardised_asset_list[i:i + chunk_size] + chunk = asset_list.standardised_asset_list[i : i + chunk_size] epc_data_chunk, errors_chunk, no_epc_chunk = get_data( df=chunk, row_id_name=asset_list.DOMNA_PROPERTY_ID, @@ -264,7 +288,7 @@ def app(): built_form_column=AssetList.STANDARD_BUILT_FORM, manual_uprn_map=manual_uprn_map, epc_api_only=epc_api_only, - epc_auth_token=EPC_AUTH_TOKEN + epc_auth_token=EPC_AUTH_TOKEN, ) # We now retrieve any failed properties @@ -287,7 +311,9 @@ def app(): # Append the failed data to the main data # Store the chunk locally as a csv - pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False) + pd.DataFrame(epc_data_chunk).to_csv( + os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False + ) # Store the errors and no-data locally with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f: json.dump(errors_chunk, f) @@ -318,7 +344,9 @@ def app(): unique_recommendations = set() for _, row in recommendations_df.iterrows(): - unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]]) + unique_recommendations.update( + [rec["improvement-summary-text"] for rec in row["recommendations"]] + ) columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations) transformed_data = [] @@ -338,20 +366,24 @@ def app(): transformed_df = pd.DataFrame(transformed_data) for col in [ "Floor insulation (solid floor)", - "Floor insulation", "Floor insulation (suspended floor)" + "Floor insulation", + "Floor insulation (suspended floor)", ]: if col not in transformed_df.columns: transformed_df[col] = False transformed_df = transformed_df[ [ - asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)", - "Floor insulation", "Floor insulation (suspended floor)" + asset_list.DOMNA_PROPERTY_ID, + "Floor insulation (solid floor)", + "Floor insulation", + "Floor insulation (suspended floor)", ] ] transformed_df["epc_has_floor_recommendation"] = ( - transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] | - transformed_df["Floor insulation (suspended floor)"] + transformed_df["Floor insulation (solid floor)"] + | transformed_df["Floor insulation"] + | transformed_df["Floor insulation (suspended floor)"] ) # Get the find my epc data @@ -364,21 +396,20 @@ def app(): find_my_epc_data.append( { asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID], - **x["find_my_epc_data"] + **x["find_my_epc_data"], } ) else: find_my_epc_data.append( - { - asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID] - } + {asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]} ) find_my_epc_data = pd.DataFrame(find_my_epc_data) find_my_epc_data = find_my_epc_data.merge( transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]], - how="left", on=asset_list.DOMNA_PROPERTY_ID + how="left", + on=asset_list.DOMNA_PROPERTY_ID, ) # We check if we get the solar pv column: @@ -388,24 +419,26 @@ def app(): # Retrieve just the data we need epc_df = epc_df[ [asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys()) - ].rename( - columns=asset_list.EPC_API_DATA_NAMES - ) + ].rename(columns=asset_list.EPC_API_DATA_NAMES) # Look for columns not in the find my EPC data, which will have happened if we didn't # retrieve it in the first place - missed_find_epc_cols = [c for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) if c not in find_my_epc_data.columns] + missed_find_epc_cols = [ + c + for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) + if c not in find_my_epc_data.columns + ] if missed_find_epc_cols: for c in missed_find_epc_cols: find_my_epc_data[c] = None epc_df = epc_df.merge( find_my_epc_data[ - [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys()) - ] - .rename(columns=asset_list.FIND_EPC_DATA_NAMES), + [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + + list(asset_list.FIND_EPC_DATA_NAMES.keys()) + ].rename(columns=asset_list.FIND_EPC_DATA_NAMES), how="left", - on=asset_list.DOMNA_PROPERTY_ID + on=asset_list.DOMNA_PROPERTY_ID, ) asset_list.merge_data(epc_df) @@ -422,7 +455,10 @@ def app(): asset_list.get_work_figures() # Store as an excel - filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx" + filename = ( + os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + + " - Standardised.xlsx" + ) # Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data # Determine inspections priority @@ -446,26 +482,42 @@ def app(): # ) with pd.ExcelWriter(filename) as writer: - asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False) + asset_list.standardised_asset_list.to_excel( + writer, sheet_name="Standardised Asset List", index=False + ) if asset_list.block_analysis_df is not None: - asset_list.block_analysis_df.to_excel(writer, sheet_name="Block Analysis", index=False) + asset_list.block_analysis_df.to_excel( + writer, sheet_name="Block Analysis", index=False + ) # If we have outcomes, we add a tab with the outcomes if not asset_list.outcomes_for_output.empty: - asset_list.outcomes_for_output.to_excel(writer, sheet_name="Outcomes", index=False) + asset_list.outcomes_for_output.to_excel( + writer, sheet_name="Outcomes", index=False + ) if not asset_list.unmatched_submissions.empty: - asset_list.unmatched_submissions.to_excel(writer, sheet_name="Unmatched Submissions", index=False) + asset_list.unmatched_submissions.to_excel( + writer, sheet_name="Unmatched Submissions", index=False + ) if not asset_list.outcomes_no_match.empty: - asset_list.outcomes_no_match.to_excel(writer, sheet_name="Unmatched Outcomes", index=False) + asset_list.outcomes_no_match.to_excel( + writer, sheet_name="Unmatched Outcomes", index=False + ) if not asset_list.ecosurv_no_match.empty: - asset_list.ecosurv_no_match.to_excel(writer, sheet_name="Unmatched Ecosurv", index=False) + asset_list.ecosurv_no_match.to_excel( + writer, sheet_name="Unmatched Ecosurv", index=False + ) if not asset_list.geographical_areas.empty: - asset_list.geographical_areas.to_excel(writer, sheet_name="Geographical Areas", index=False) + asset_list.geographical_areas.to_excel( + writer, sheet_name="Geographical Areas", index=False + ) # Store dupes if asset_list.duplicated_addresses is not None: if not asset_list.duplicated_addresses.empty: - asset_list.duplicated_addresses.to_excel(writer, sheet_name="Duplicate Properties", index=False) + asset_list.duplicated_addresses.to_excel( + writer, sheet_name="Duplicate Properties", index=False + )