import numpy as np import os import pandas as pd from tqdm import tqdm from model_data.BaseUtility import BaseUtility from pathlib import Path from typing import Tuple def list_subdirectories(directory_path): return [entry for entry in directory_path.iterdir() if entry.is_dir()] DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates' FULLY_GLAZED_DESCRIPTIONS = [ "Fully double glazed", "High performance glazing", "Fully triple glazed", "Full secondary glazing", "Multiple glazing throughout", ] FIXED_FEATURES = [ 'PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'NUMBER_HABITABLE_ROOMS', 'CONSTITUENCY', 'NUMBER_HEATED_ROOMS', 'FIXED_LIGHTING_OUTLETS_COUNT', 'FLOOR_HEIGHT', 'FLOOR_LEVEL', 'TOTAL_FLOOR_AREA', ] COMPONENT_FEATURES = [ 'TRANSACTION_TYPE', 'WALLS_DESCRIPTION', 'FLOOR_DESCRIPTION', 'LIGHTING_DESCRIPTION', 'ROOF_DESCRIPTION', 'MAINHEAT_DESCRIPTION', 'HOTWATER_DESCRIPTION', 'MAIN_FUEL', 'MECHANICAL_VENTILATION', 'SECONDHEAT_DESCRIPTION', 'ENERGY_TARIFF', # Not sure if this is relevant 'SOLAR_WATER_HEATING_FLAG', 'PHOTO_SUPPLY', 'WINDOWS_DESCRIPTION', 'GLAZED_TYPE', 'MULTI_GLAZE_PROPORTION', 'LIGHTING_DESCRIPTION', 'LOW_ENERGY_LIGHTING', 'NUMBER_OPEN_FIREPLACES', 'MAINHEATCONT_DESCRIPTION', 'EXTENSION_COUNT', # 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION ] # For these fields, we take an average if we have multiple values AVERAGE_FIXED_FEATURES = [ "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT" ] # For these fields, we take the latest value if we have multiple values # Since more recent EPCs have been conducted with more rigour, we assume that the latest value is # the most accurate LATEST_FIELD = [ "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "FIXED_LIGHTING_OUTLETS_COUNT", "CONSTRUCTION_AGE_BAND", "FLOOR_LEVEL", "CONSTRUCTION_AGE_BAND", # This is a field we're probably want to use verisk data for ] # If we see thee features changing, we don't use the EPC, since deem it not to be reliable MANDATORY_FIXED_FEATURES = [ "PROPERTY_TYPE", "BUILT_FORM", "CONSTITUENCY" ] # For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were # conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England # and Wales from 31 July 2014 EARLIEST_EPC_DATE = "2014-08-01" RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY" HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT" def iterative_filtering(cleaning_averages, property_data): # Define the columns to filter on columns_to_filter = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"] # Merge datasets together on columns filtered_data = pd.merge(cleaning_averages, property_data.iloc[[-1]], on=columns_to_filter) # # Start with the entire cleaning_averages DataFrame # filtered_data = cleaning_averages.copy() # # Iterate through the columns and apply filters one by one # for column in columns_to_filter: # # Apply the filter using the value from property_data # new_filtered_data = filtered_data[filtered_data[column] == property_data[column].iloc[0]] # # If the filter results in no data, return the previous result # if new_filtered_data.empty: # continue # # If the filter is successful, update the filtered data # filtered_data = new_filtered_data return filtered_data def ordinal(n): if 10 <= n % 100 <= 20: suffix = 'th' else: suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th') return str(n) + suffix FLOOR_LEVEL_MAP = { "Basement": -1, "Ground": 0, "ground floor": 0, "20+": 20, "21st or above": 21, **{str(i).zfill(2): i for i in range(0, 21)}, **{ordinal(i): i for i in range(-1, 21)}, **{str(i): i for i in range(-1, 21)}, **{i: i for i in range(-1, 21)}, } BUILT_FORM_REMAP = { "Enclosed End-Terrace": "End-Terrace", "Enclosed Mid-Terrace": "Mid-Terrace", } DATA_PROCESSOR_SETTINGS = { 'low_memory': False, 'epc_minimum_count': 1, 'column_mappings': {'UPRN': [int, str]} } class DataProcessor: """ Handle data loading and data preprocessing """ def __init__(self, filepath: Path) -> None: self.filepath = filepath def load_data(self, low_memory=False) -> None: self.data = pd.read_csv(self.filepath, low_memory=low_memory) def process(self) -> pd.DataFrame: """ Load all data adnd process data via composition """ self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory']) self.confine_data() self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings']) self.clean_multi_glaze_proportion() self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count']) self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) return self.data def make_cleaning_averages(self) -> Tuple[pd.DataFrame, pd.DataFrame]: # Define a custom function to calculate the median, excluding missing values def median_without_missing(group): return group[AVERAGE_FIXED_FEATURES].median(skipna=True) cleaning_averages = self.data.groupby( ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], observed=True ).apply(median_without_missing).reset_index() general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply( median_without_missing).reset_index() return cleaning_averages, general_averages def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None: ''' Reduce the data futher by keeping only datasets with multiple epcs ''' counts = self.data.groupby("UPRN").size().reset_index() counts.columns = ["UPRN", "count"] # take UPRNS with multiple EPCs counts = counts[counts["count"] > epc_minimum_count] self.data = pd.merge(self.data, counts, on='UPRN') def recast_df_columns(self, column_mappings: dict) -> None: """ Recast columns from the dataframe to ensure the behaviour we want """ for key, values in column_mappings.items(): if key not in self.data.columns: print('Column mapping incorrectly specified') exit(1) for value in values: self.data[key] = self.data[key].astype(value) def confine_data(self) -> None: """ Include all step to reduce down the data based on assumptions """ # Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one # Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged # before the introduction of SAP09 # Filter 3: We remove EPCS that were conducted for a new build, since these are performed with # full SAP, which produces different results to the RdSAP methodology # Filter 4: We remove floor level in top floor or mid floor since this is ambiguous self.data = self.data[~pd.isnull(self.data["UPRN"])] \ [self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] \ [self.data["TRANSACTION_TYPE"] != "new dwelling"] \ [~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])] def clean_multi_glaze_proportion(self) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 """ no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100 def app(): # Get all the files in the directory # Data glossary: # https://epc.opendatacommunities.org/docs/guidance#glossary directories = list_subdirectories(DATA_DIRECTORY) dataset = [] for directory in tqdm(directories): filepath = directory / "certificates.csv" data_processor = DataProcessor(filepath=filepath) df = data_processor.process() cleaning_averages, general_averages = data_processor.make_cleaning_averages() for uprn, property_data in df.groupby("UPRN", observed=True): # Fixed features - these are property attributes that shouldn't change over time ignore_epc = False fixed_data = {} for field in FIXED_FEATURES: vals = property_data[field].dropna().unique() # Remove invalid values vals = [v for v in vals if v not in BaseUtility.DATA_ANOMALY_MATCHES] if field == "FLOOR_LEVEL": vals = list({FLOOR_LEVEL_MAP[v] for v in vals}) if field == "BUILT_FORM": vals = list({BUILT_FORM_REMAP.get(v, v) for v in vals}) if field in AVERAGE_FIXED_FEATURES: if len(vals) > 1: # Check the values are too far apart if abs(vals[0] - vals[1]) / vals[0] > 0.1: # Take the more recent value since it's likely to be more accurate vals = [vals[-1]] if vals: field_value = np.mean(vals) else: # Clean using averages avgs = iterative_filtering(cleaning_averages, property_data) # TODO: Should probably do a mean/median? field_value = avgs[field].iloc[0] if pd.isnull(field_value): # Just the use the general averages field_value = general_averages[ (general_averages["PROPERTY_TYPE"] == property_data["PROPERTY_TYPE"].iloc[0]) & (general_averages["BUILT_FORM"] == property_data["BUILT_FORM"].iloc[0]) ][field].iloc[0] elif field in LATEST_FIELD: field_value = vals[-1] if vals else None else: if len(vals) > 1: if field in MANDATORY_FIXED_FEATURES: ignore_epc = True else: raise ValueError("Fixed feature {} has more than one value - fix me".format(field)) field_value = vals[0] if vals else None fixed_data[field] = field_value if ignore_epc: continue # We include the lodgement date here as we probably need to factor time into the # model, since EPC standards and rigour have changed over time variable_data = property_data[ COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] ] # Note: we look at changes between subsequent EPCS, however we could look at other permutations # e.g. first vs second, second vs third and also first vs third property_model_data = [] for idx in range(0, property_data.shape[0] - 1): if idx >= property_data.shape[0] - 1: break starting_record = variable_data.iloc[idx] ending_record = variable_data.iloc[idx + 1] rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE] heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE] # TODO: Should this be <= 0? if rdsap_change == 0: # Assumption: We aren't interested in records that exhibit no change continue # TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and # floors, we may want to use the U-value. We may also want to handle the (assumed) tags # within descriptions starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING") ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING") features = pd.concat([starting_record, ending_record]) property_model_data.append( { "UPRN": uprn, "RDSAP_CHANGE": rdsap_change, "HEAT_DEMAND_CHANGE": heat_demand_change, **fixed_data, **features.to_dict() } ) dataset.extend(property_model_data) if __name__ == "__main__": app()