diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index 4e336f23..d2232f40 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -6,6 +6,8 @@ import numpy as np from tqdm import tqdm from collections import Counter from scipy.optimize import linprog + +from SearchEpc import SearchEpc from utils.s3 import read_pickle_from_s3 CUSTOMER_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater" @@ -2608,7 +2610,7 @@ def propsed_wave_3_sample(): len(list(set(units_in_bid))) -def identify_incorrect_pacakges(): +def identify_incorrect_packages(): """ Due to limitations in the data collected during survey, we have some properties that do not have suitable packages assigned. This function will identify those properties, which can be flagged for Stonewater's review @@ -2635,21 +2637,23 @@ def identify_incorrect_pacakges(): # Check the different heating types units_with_assigned_packages["Gas properties: different to Parity"] = ( - (units_with_assigned_packages["Heating Type"].isin(["Gas", "Communal Gas"])) & ( - units_with_assigned_packages["Heating"].isin( - [ - "Heat Pump: Electric Heat " - "pumps: Air source heat pump " - "with flow temperature <= 35°C", - "Electric Storage Systems: Fan " - "storage heaters", - "Electric (direct acting) room " - "heaters: Panel, convector or " - "radiant heaters" - ] + ( + units_with_assigned_packages["Heating Type"].isin(["Gas", "Communal Gas"]) + ) & ( + units_with_assigned_packages["Heating"].isin( + [ + "Heat Pump: Electric Heat " + "pumps: Air source heat pump " + "with flow temperature <= 35°C", + "Electric Storage Systems: Fan " + "storage heaters", + "Electric (direct acting) room " + "heaters: Panel, convector or " + "radiant heaters" + ] + ) ) ) - ) units_with_assigned_packages["Electric properties: different to Parity"] = ( (units_with_assigned_packages["Heating Type"] == "Electric") & ( @@ -2717,17 +2721,26 @@ def identify_incorrect_pacakges(): # We now iterate through postcodes and find anomalous properties based on the partiy data and survey data fields_to_check = [ - 'Wall Type', 'Roof Type', 'Heating', 'Main Fuel', + 'Wall Type Category', + # 'Roof Type Category', - not very interesting + 'Heating', + 'Main Fuel', 'Survey: Main Wall Type', - 'Survey: Main Roof Type', 'Survey: Primary Heating System' + # 'Survey: Main Roof Type', + 'Survey: Primary Heating System' ] - # Create an empty dictionary to store results - aggregated_results = {} - units_with_assigned_packages['Wall Type'] = units_with_assigned_packages['Wall Type'].str.replace( + units_with_assigned_packages['Wall Type Category'] = units_with_assigned_packages['Wall Type'].str.replace( r'\s*\(.*?\)', '', regex=True ) + # Create roof type category by splitting in colon and taking the first part + units_with_assigned_packages['Roof Type Category'] = units_with_assigned_packages['Roof Type'].str.split(':').str[0] + + units_with_assigned_packages["Street, Region and Postcode"] = ( + units_with_assigned_packages["Street and Region"] + ", " + units_with_assigned_packages["Postcode"] + ) + def check_mixed_types(row): # Count distinct primary types with non-zero values primary_types_present = set() @@ -2738,11 +2751,11 @@ def identify_incorrect_pacakges(): primary_types_present.add(primary_type) return len(primary_types_present) > 1 # True if more than one primary type - # Process each field + aggregated_results = {} for field in fields_to_check: # Group by postcode and count occurrences of each unique value field_counts = ( - units_with_assigned_packages.groupby(['Postcode', field]) + units_with_assigned_packages.groupby(['Street, Region and Postcode', field]) .size() .unstack(fill_value=0) .reset_index() @@ -2764,5 +2777,132 @@ def identify_incorrect_pacakges(): # Store the result in the dictionary aggregated_results[field] = field_counts + # Let's fetch the EPC data + # Read in the existing EPC data we stored + import json + from utils.s3 import read_from_s3, read_pickle_from_s3 + def read_epc_data(): + epc_data = json.loads( + read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name="customers/Stonewater/clustering/epc_data.json" + ) + ) + epc_data = pd.DataFrame(epc_data) + + epc_data["uprn"] = np.where( + epc_data["internal_id"] == 1091, + 83143766, + epc_data["uprn"] + ) + epc_data_batch_2 = read_pickle_from_s3( + s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl", + bucket_name="retrofit-data-dev" + ) + epc_data_batch_2 = pd.DataFrame(epc_data_batch_2) + + complete_epcs = pd.concat([epc_data, epc_data_batch_2]) + + return complete_epcs + + epc_data = read_epc_data() + # Get just the fields we want from the EPC: Uprn, Wall, Roof, Heating, Fuel, SAP Score, EPC Band, Date of EPC + epc_data_to_append = epc_data[ + [ + "uprn", "walls-description", "roof-description", "mainheat-description", "main-fuel", + "current-energy-efficiency", "current-energy-rating", "lodgement-date", + "estimated" + ] + ].rename( + columns={ + "uprn": "UPRN", + "walls-description": "EPC: Wall Type", + "roof-description": "EPC: Roof Type", + "mainheat-description": "EPC: Heating", + "mainfuel": "EPC: Main Fuel", + "current-energy-efficiency": "EPC: SAP Score", + "current-energy-rating": "EPC: EPC Band", + "lodgement-date": "EPC: Date of EPC", + "estimated": "EPC Estimated based on Nearby Properties" + } + ) + # Find entries where the SAP score is not an integer + non_integer_sap = epc_data_to_append[~epc_data_to_append["EPC: SAP Score"].astype(str).str.isnumeric()] + non_integer_sap["UPRN"].values[0] + + epc_data_to_append["EPC: Date of EPC"] = pd.to_datetime(epc_data_to_append["EPC: Date of EPC"]) + # Years since the EPC was lodged + epc_data_to_append["Years since EPC"] = (pd.Timestamp.now() - epc_data_to_append["EPC: Date of EPC"]).dt.days / 365 + epc_data_to_append = epc_data_to_append[epc_data_to_append["UPRN"] != ""] + epc_data_to_append["UPRN"] = epc_data_to_append["UPRN"].astype(int) + + units_with_assigned_packages = units_with_assigned_packages.merge( + epc_data_to_append, how="left", on="UPRN", + ) + + # Read in the wave 2.1 data + wave_2_data = pd.read_excel( + os.path.join( + CUSTOMER_FOLDER_PATH, "Stonewater 2.1 SAP Pre & Post.xlsx" + ), + header=3 + ) + # Remove any where the work is outstanding + wave_2_data = wave_2_data[wave_2_data["Retrofit Assessment"] == "Completed"] + wave_2_data = wave_2_data[~pd.isnull(wave_2_data["Package Approved (Client)"])] + wave_2_data["house_number"] = wave_2_data["Name"].apply(lambda x: SearchEpc.get_house_number(x, "")) + + # Filter postcodes in the units_with_assigned_packages, to find overlapping postcodes + related_to_wave_2 = units_with_assigned_packages[ + units_with_assigned_packages["Postcode"].isin( + wave_2_data["Post Code"].values + ) & ( + ~units_with_assigned_packages["Confidence Tier"].isin( + [ + "1 - same archetype, same postal region", "1 - property was surveyed" + ] + ) + ) + ] + + wave2_matches = [] + for _, home in related_to_wave_2.iterrows(): + # Get the related homes + assigned_wave_2_packages = wave_2_data[ + wave_2_data["Post Code"] == home["Postcode"] + ] + + if assigned_wave_2_packages.shape[0] != 1: + # In this case, we get the closest match based on door number + hn = SearchEpc.get_house_number(home["Name"], home["Postcode"]) + + assigned_wave_2_packages = assigned_wave_2_packages[ + abs(assigned_wave_2_packages["house_number"].astype(int) - int(hn)) == min( + abs(assigned_wave_2_packages["house_number"].astype(int) - int(hn))) + ] + + wave2_matches.append( + { + "UPRN": home["UPRN"], + "2.1 matched address": assigned_wave_2_packages["Name"].values[0], + "2.1 matched address: Package Ref": assigned_wave_2_packages["Package Approved (Client)"].values[0], + "2.1 matched address: Wall Insulation": assigned_wave_2_packages["Wall Insulation"].values[0], + "2.1 matched address: Loft Insulation": assigned_wave_2_packages["Loft Insulation"].values[0], + "2.1 matched address: Ventilation": assigned_wave_2_packages["Ventilation"].values[0], + "2.1 matched address: Windows": assigned_wave_2_packages["Windwos Upgrade"].values[0] + } + ) + + # Store each results to CSV + for field, df in aggregated_results.items(): + df.to_csv( + os.path.join(CUSTOMER_FOLDER_PATH, f"{field} - aggregated results.csv"), index=False + ) + + # Store units_with_assigned_packages + units_with_assigned_packages.to_csv( + os.path.join(CUSTOMER_FOLDER_PATH, "Units with assigned packages - with flags.csv"), index=False + ) + # if __name__ == "__main__": # main()