From b71e76449fea852350da3cfe555ff5c56cbf7d8a Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 15 Sep 2023 19:13:14 +0100 Subject: [PATCH] working on rdsap cleaning --- .../simulation_system/core/DataProcessor.py | 131 ++++++++++-------- model_data/simulation_system/core/Settings.py | 28 +++- .../generate_rdsap_change.py | 31 ++--- utils/s3.py | 23 +++ 4 files changed, 129 insertions(+), 84 deletions(-) diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index c02e6ed5..ba3cee33 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -14,7 +14,10 @@ from model_data.simulation_system.core.Settings import ( COLUMNS_TO_MERGE_ON, COMPONENT_FEATURES, FIXED_FEATURES, - COLUMNTYPES + COLUMNTYPES, + RDSAP_RESPONSE, + MAX_SAP_SCORE, + fill_na_map ) from typing import List @@ -101,10 +104,14 @@ class DataProcessor: raise NotImplementedError("Not handled the case for value %s" % x) - self.data["CONSTRUCTION_AGE_BAND_CLEANED"] = self.data["CONSTRUCTION_AGE_BAND"].apply( + self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply( lambda x: clean_construction_age_band(x) ) + self.data = self.data[ + ~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"]) + ] + def clean_missing_rooms(self): """ For the number of heated rooms and number of habitable rooms, we clean these values up front, @@ -132,7 +139,7 @@ class DataProcessor: for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]: to_index = 3 - matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND_CLEANED", "POSTAL_AREA"] + matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"] has_missings = pd.isnull(self.data[col]).sum() while has_missings: self.data = apply_clean( @@ -175,6 +182,8 @@ class DataProcessor: if not self.newdata: self.confine_data() + self.remap_columns() + # We have some non-standard construction age bands which we'll clean for matching self.standardise_construction_age_band() self.clean_missing_rooms() @@ -189,7 +198,6 @@ class DataProcessor: self.retain_multiple_epc_properties( epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] ) - self.remap_columns() if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: # If we have multiple EPC records, we can try and do filling @@ -199,8 +207,14 @@ class DataProcessor: # Final re-casting after data transformed and prepared self.data = self.data.astype(COLUMNTYPES) + self.na_remapping() + return self.data + def na_remapping(self): + for column, fill_value in fill_na_map.items(): + self.data[column] = self.data[column].fillna(fill_value) + def fill_na_fields(self, columns_to_fill: List = COLUMNS_TO_MERGE_ON): """ If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields @@ -305,62 +319,43 @@ class DataProcessor: suffixes=["", "_BUILT_FORM_AVERAGE"], ) - # Replace any missing NAN values with averages for the same Property type and built form - cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ - "TOTAL_FLOOR_AREA" - ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_AVERAGE"]) - cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ - "FLOOR_HEIGHT" - ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop( - columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] - ) + for variable in AVERAGE_FIXED_FEATURES: + # Replace any missing NAN values with averages for the same Property type and built form + cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( + cleaning_averages_filled[f"{variable}_AVERAGE"] + ) - # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope - # and built form - # We can use just the property type average and replace - cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ - "TOTAL_FLOOR_AREA" - ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_PROPERTY_AVERAGE"]) - cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ - "FLOOR_HEIGHT" - ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_PROPERTY_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop( - columns=[ - "TOTAL_FLOOR_AREA_PROPERTY_AVERAGE", - "FLOOR_HEIGHT_PROPERTY_AVERAGE", - ] - ) + cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE") - # If there are still NA values, use BUILT FORM averages - cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ - "TOTAL_FLOOR_AREA" - ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE"]) - cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ - "FLOOR_HEIGHT" - ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_BUILT_FORM_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop( - columns=[ - "TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE", - "FLOOR_HEIGHT_BUILT_FORM_AVERAGE", - ] - ) + # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope + # and built form + # We can use just the property type average and replace - # If there still is na values, use average across all properties in consituecy - cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ - "TOTAL_FLOOR_AREA" - ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA"].mean()) - cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ - "FLOOR_HEIGHT" - ].fillna(cleaning_averages_filled["FLOOR_HEIGHT"].mean()) + cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( + cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"] + ) + + cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE") + + # If there are still NA values, use BUILT FORM averages + cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna( + cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"] + ) + + cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE") + + # If there still is na values, use average across all properties in consituecy + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[variable].mean()) # If the consituency is all NA values, then take UK AVERAGE VALUES - cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ - "TOTAL_FLOOR_AREA" - ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) - cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ - "FLOOR_HEIGHT" - ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) + # cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + # "TOTAL_FLOOR_AREA" + # ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) + # cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + # "FLOOR_HEIGHT" + # ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) return cleaning_averages_filled @@ -402,12 +397,23 @@ class DataProcessor: # Filter 4: We remove floor level in top floor or mid floor since this is ambiguous + # Filter 5: Remove any EPCs with a SAP score above 100 + + # Filter 6: We found a small number of cases that have missing window description so we drop these + + # Filter 7: We found a small number of cases that have missing hotwater description so we drop these + self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"] self.data = self.data[ ~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"]) ] + self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE] + + # We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them + self.data = self.data[~pd.isnull(self.data["WINDOWS_DESCRIPTION"])] + self.data = self.data[~pd.isnull(self.data["HOTWATER_DESCRIPTION"])] def clean_multi_glaze_proportion(self) -> None: """ @@ -437,6 +443,12 @@ class DataProcessor: differs depending on where the function is being used. :return: Cleaned DataFrame. """ + + cols_to_clean = [ + c for c in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"] if + c in data_to_clean.columns + ] + # Enforce data types for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: data_to_clean[col] = data_to_clean[col].astype(float) @@ -445,10 +457,9 @@ class DataProcessor: columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist() # Calculate averages - cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg({ - "TOTAL_FLOOR_AREA": "mean", - "FLOOR_HEIGHT": "mean" - }) + cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg( + dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean))) + ) # Merge with the original data data_to_clean = pd.merge( @@ -460,7 +471,7 @@ class DataProcessor: ) # Fill NaN values with averages - for col in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]: + for col in cols_to_clean: data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index c094c085..0135c14a 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -55,7 +55,8 @@ FLOOR_HEIGHT_NATIONAL_AVERAGE = 2.45 AVERAGE_FIXED_FEATURES = [ "TOTAL_FLOOR_AREA", - "FLOOR_HEIGHT" + "FLOOR_HEIGHT", + "FIXED_LIGHTING_OUTLETS_COUNT", ] COLUMNS_TO_MERGE_ON = [ @@ -82,8 +83,7 @@ FIXED_FEATURES = [ "CONSTITUENCY", "NUMBER_HEATED_ROOMS", "FIXED_LIGHTING_OUTLETS_COUNT", - "FLOOR_HEIGHT", - "FLOOR_LEVEL", + "FLOOR_HEIGHT" "TOTAL_FLOOR_AREA", ] @@ -120,7 +120,6 @@ LATEST_FIELD = [ "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "FIXED_LIGHTING_OUTLETS_COUNT", - "FLOOR_LEVEL", "CONSTRUCTION_AGE_BAND", # This is a field we're probably want to use verisk data for ] @@ -173,7 +172,7 @@ DATA_PROCESSOR_SETTINGS = { COLUMNTYPES = { 'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object', 'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64', - 'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64', + 'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'CONSTRUCTION_AGE_BAND': 'object', 'TRANSACTION_TYPE': 'object', 'WALLS_DESCRIPTION': 'object', @@ -194,3 +193,22 @@ COLUMNTYPES = { 'EXTENSION_COUNT': 'float64', 'LODGEMENT_DATE': 'object', } + +# For modelling, we don't allow records with more than 100 SAP points +MAX_SAP_SCORE = 100 + +fill_na_map = { + # There are some descriptions, such as "To be used only when there is no heating/hot-water system or data is from + # a community network" that could be clustered with unknown fuel + "MAIN_FUEL": "UNKNOWN", + "MECHANICAL_VENTILATION": "Unknown", + "SECONDHEAT_DESCRIPTION": "None", + "ENERGY_TARIFF": "Unknown", + # We set solar water heating flag to N - we could investigate using a different category entirely + "SOLAR_WATER_HEATING_FLAG": "N", + "GLAZED_TYPE": "not defined", + "MULTI_GLAZE_PROPORTION": 0, + "LOW_ENERGY_LIGHTING": 0, + "MAINHEATCONT_DESCRIPTION": "Unknown", + "EXTENSION_COUNT": 0, +} diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index 42c2f878..5a2f60c3 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -14,7 +14,7 @@ from model_data.simulation_system.core.Settings import ( CARBON_RESPONSE, ) from model_data.simulation_system.core.DataProcessor import DataProcessor -from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3 +from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3, read_dataframe_from_s3_parquet DATA_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates" @@ -92,7 +92,6 @@ def process_and_prune_desriptions(df, cleaned_lookup): 'is_assumed_ENDING', 'has_dwelling_above_ENDING', 'is_valid_ENDING', 'insulation_thickness_ENDING', ] - } for component in ["walls", "floor", "roof"]: @@ -172,22 +171,6 @@ def app(): dataset = [] cleaning_dataset = [] - # TODO [x] : Does energy tariff make a difference - # - leave for now but it may not - # TODO: [x] : Add starting SAP and head demand as a feature - # TODO [x] : If SAP hasn't changed, we don't include the record - # TODO [x]: If SAP gets worse, it genuinely looks like in the vast majority of cases that the building looks - # worse in the newer epc, so we can switch the orders - # TODO [x] : Have a look at temporal features - # TODO [x] : Floor area will impact the EPC so instead of averaging, we should have a starting and ending value. - # TODO [x]: Same as floor area for floor height - # TODO [x]: If fundamental building fabric changes, we should proabably discard the record - # TODO [x]: Should we prune records that have an exceptionally large amount of time between them? - # - leave for now and check performance after temporal features - # TODO [x]: If we have multiple EPCs lodged on the same day, should we remove them? Could be corrections? - # - Leave for now - # - for directory in tqdm(directories): filepath = directory / "certificates.csv" @@ -199,7 +182,6 @@ def app(): data_by_urpn = [] for uprn, property_data in df.groupby("UPRN", observed=True): - # Fixed features - these are property attributes that shouldn't change over time fixed_data = {} @@ -224,6 +206,13 @@ def app(): fixed_data.update(mandatory_field_data) fixed_data.update(latest_field_data) + # Apply cleaning to fixed_data + fixed_data = DataProcessor.apply_averages_cleaning( + data_to_clean=pd.DataFrame([fixed_data]), + cleaning_data=cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON + ).to_dict("records")[0] + # We include the lodgement date here as we probably need to factor time into the # model, since EPC standards and rigour have changed over time variable_data = modified_property_data[ @@ -289,6 +278,7 @@ def app(): data_by_urpn.extend(property_model_data) data_by_urpn_df = pd.DataFrame(data_by_urpn) + # Add some temporal features - we look at the days from the standard starting point in time # for the starting and ending date so all records are from a fixed point data_by_urpn_df["DAYS_TO_STARTING"] = ( @@ -308,6 +298,9 @@ def app(): # is low # We also replace descriptions with their cleaned variants + if pd.isnull(data_by_urpn_df).sum().sum(): + raise ValueError("Null values found in dataset") + data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup) dataset.append(data_by_urpn_df) diff --git a/utils/s3.py b/utils/s3.py index c31f1520..8d24d6c0 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -1,6 +1,7 @@ import boto3 from io import BytesIO from botocore.exceptions import NoCredentialsError, PartialCredentialsError +import pandas as pd def read_from_s3(bucket_name, s3_file_name): @@ -63,3 +64,25 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key): # Upload the Parquet file to S3 client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue()) + + +def read_dataframe_from_s3_parquet(bucket_name, file_key): + """ + Read a pandas DataFrame from a Parquet file stored in S3. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :return: A pandas DataFrame. + """ + + # Create the boto3 client + client = boto3.client('s3') + + # Get the Parquet file from S3 + response = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the file into a pandas DataFrame + parquet_buffer = BytesIO(response['Body'].read()) + df = pd.read_parquet(parquet_buffer) + + return df