mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
309 lines
11 KiB
Python
309 lines
11 KiB
Python
import numpy as np
|
|
import os
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
from model_data.BaseUtility import BaseUtility
|
|
|
|
|
|
def list_subdirectories(directory_path):
|
|
return [d for d in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, d))]
|
|
|
|
|
|
DATA_DIRECTORY = os.getcwd() + '/model_data/simulation_system/data/all-domestic-certificates'
|
|
|
|
FIXED_FEATURES = [
|
|
'PROPERTY_TYPE',
|
|
'BUILT_FORM',
|
|
'CONSTRUCTION_AGE_BAND',
|
|
'NUMBER_HABITABLE_ROOMS',
|
|
'CONSTITUENCY',
|
|
'NUMBER_HEATED_ROOMS',
|
|
'FIXED_LIGHTING_OUTLETS_COUNT',
|
|
'FLOOR_HEIGHT',
|
|
'FLOOR_LEVEL',
|
|
'TOTAL_FLOOR_AREA',
|
|
]
|
|
|
|
COMPONENT_FEATURES = [
|
|
'TRANSACTION_TYPE',
|
|
'WALLS_DESCRIPTION',
|
|
'FLOOR_DESCRIPTION',
|
|
'LIGHTING_DESCRIPTION',
|
|
'ROOF_DESCRIPTION',
|
|
'MAINHEAT_DESCRIPTION',
|
|
'HOTWATER_DESCRIPTION',
|
|
'MAIN_FUEL',
|
|
'MECHANICAL_VENTILATION',
|
|
'SECONDHEAT_DESCRIPTION',
|
|
'ENERGY_TARIFF', # Not sure if this is relevant
|
|
'SOLAR_WATER_HEATING_FLAG',
|
|
'PHOTO_SUPPLY',
|
|
'WINDOWS_DESCRIPTION',
|
|
'GLAZED_TYPE',
|
|
'MULTI_GLAZE_PROPORTION',
|
|
'LIGHTING_DESCRIPTION',
|
|
'LOW_ENERGY_LIGHTING',
|
|
'NUMBER_OPEN_FIREPLACES',
|
|
'MAINHEATCONT_DESCRIPTION',
|
|
'EXTENSION_COUNT',
|
|
# 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION
|
|
]
|
|
|
|
# For these fields, we take an average if we have multiple values
|
|
AVERAGE_FIXED_FEATURES = [
|
|
"TOTAL_FLOOR_AREA",
|
|
"FLOOR_HEIGHT"
|
|
]
|
|
|
|
# For these fields, we take the latest value if we have multiple values
|
|
# Since more recent EPCs have been conducted with more rigour, we assume that the latest value is
|
|
# the most accurate
|
|
LATEST_FIELD = [
|
|
"NUMBER_HABITABLE_ROOMS",
|
|
"NUMBER_HEATED_ROOMS",
|
|
"FIXED_LIGHTING_OUTLETS_COUNT",
|
|
"CONSTRUCTION_AGE_BAND",
|
|
"FLOOR_LEVEL",
|
|
"CONSTRUCTION_AGE_BAND", # This is a field we're probably want to use verisk data for
|
|
]
|
|
|
|
# If we see thee features changing, we don't use the EPC, since deem it not to be reliable
|
|
MANDATORY_FIXED_FEATURES = [
|
|
"PROPERTY_TYPE",
|
|
"BUILT_FORM",
|
|
"CONSTITUENCY"
|
|
]
|
|
|
|
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
|
|
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
|
|
# and Wales from 31 July 2014
|
|
EARLIEST_EPC_DATE = "2014-08-01"
|
|
|
|
RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
|
|
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
|
|
|
|
|
|
def make_cleaning_averages(df):
|
|
# Define a custom function to calculate the median, excluding missing values
|
|
def median_without_missing(group):
|
|
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
|
|
|
|
cleaning_averages = df.groupby(
|
|
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
|
|
observed=True
|
|
).apply(median_without_missing).reset_index()
|
|
|
|
general_averages = df.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
|
|
median_without_missing).reset_index()
|
|
|
|
return cleaning_averages, general_averages
|
|
|
|
|
|
def iterative_filtering(cleaning_averages, property_data):
|
|
# Define the columns to filter on
|
|
columns_to_filter = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS",
|
|
"NUMBER_HEATED_ROOMS"]
|
|
|
|
# Start with the entire cleaning_averages DataFrame
|
|
filtered_data = cleaning_averages.copy()
|
|
|
|
# Iterate through the columns and apply filters one by one
|
|
for column in columns_to_filter:
|
|
# Apply the filter using the value from property_data
|
|
new_filtered_data = filtered_data[filtered_data[column] == property_data[column].iloc[0]]
|
|
|
|
# If the filter results in no data, return the previous result
|
|
if new_filtered_data.empty:
|
|
continue
|
|
|
|
# If the filter is successful, update the filtered data
|
|
filtered_data = new_filtered_data
|
|
|
|
return filtered_data
|
|
|
|
|
|
def clean_multi_glaze_proportion(df):
|
|
fully_glazed_descriptions = [
|
|
"Fully double glazed",
|
|
"High performance glazing",
|
|
"Fully triple glazed",
|
|
"Full secondary glazing",
|
|
"Multiple glazing throughout",
|
|
]
|
|
|
|
df["MULTI_GLAZE_PROPORTION"] = np.where(
|
|
pd.isnull(df["MULTI_GLAZE_PROPORTION"]) & (df["WINDOWS_DESCRIPTION"].isin(fully_glazed_descriptions)),
|
|
100,
|
|
df["MULTI_GLAZE_PROPORTION"],
|
|
)
|
|
|
|
return df
|
|
|
|
|
|
def ordinal(n):
|
|
if 10 <= n % 100 <= 20:
|
|
suffix = 'th'
|
|
else:
|
|
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th')
|
|
|
|
return str(n) + suffix
|
|
|
|
|
|
FLOOR_LEVEL_MAP = {
|
|
"Basement": -1,
|
|
"Ground": 0,
|
|
"ground floor": 0,
|
|
"20+": 20,
|
|
"21st or above": 21,
|
|
**{str(i).zfill(2): i for i in range(0, 21)},
|
|
**{ordinal(i): i for i in range(-1, 21)},
|
|
**{str(i): i for i in range(-1, 21)},
|
|
**{i: i for i in range(-1, 21)},
|
|
}
|
|
|
|
BUILT_FORM_REMAP = {
|
|
"Enclosed End-Terrace": "End-Terrace",
|
|
"Enclosed Mid-Terrace": "Mid-Terrace",
|
|
}
|
|
|
|
|
|
def app():
|
|
# Get all the files in the directory
|
|
|
|
# Data glossary:
|
|
# https://epc.opendatacommunities.org/docs/guidance#glossary
|
|
|
|
directories = list_subdirectories(DATA_DIRECTORY)
|
|
|
|
dataset = []
|
|
for directory in tqdm(directories):
|
|
filepath = os.path.join(DATA_DIRECTORY, directory, "certificates.csv")
|
|
df = pd.read_csv(filepath, low_memory=False)
|
|
# UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
|
|
df = df[~pd.isnull(df["UPRN"])]
|
|
# Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
|
|
# before the introduction of SAP09
|
|
df = df[df["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
|
|
|
|
cleaning_averages, general_averages = make_cleaning_averages(df)
|
|
|
|
# We remove EPCS that were conducted for a new build, since these are performed with
|
|
# full SAP, which produces different results to the RdSAP methodology
|
|
df = df[df["TRANSACTION_TYPE"] != "new dwelling"]
|
|
|
|
df = clean_multi_glaze_proportion(df)
|
|
|
|
# We remove floor level in top floor or mid floor since this is ambiguous
|
|
df = df[~df["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
|
|
|
|
df["UPRN"] = df["UPRN"].astype(int).astype(str)
|
|
counts = df.groupby("UPRN").size().reset_index()
|
|
counts.columns = ["UPRN", "count"]
|
|
counts = counts.sort_values("count", ascending=False)
|
|
|
|
# take UPRNS with multiple EPCs
|
|
counts = counts[counts["count"] > 1]
|
|
df = df[df["UPRN"].isin(counts["UPRN"])]
|
|
df = df.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
|
|
|
|
for uprn, property_data in df.groupby("UPRN", observed=True):
|
|
|
|
# Fixed features - these are property attributes that shouldn't change over time
|
|
|
|
ignore_epc = False
|
|
fixed_data = {}
|
|
for field in FIXED_FEATURES:
|
|
vals = property_data[field].dropna().unique()
|
|
# Remove invalid values
|
|
vals = [v for v in vals if v not in BaseUtility.DATA_ANOMALY_MATCHES]
|
|
|
|
if field == "FLOOR_LEVEL":
|
|
vals = list({FLOOR_LEVEL_MAP[v] for v in vals})
|
|
|
|
if field == "BUILT_FORM":
|
|
vals = list({BUILT_FORM_REMAP.get(v, v) for v in vals})
|
|
|
|
if field in AVERAGE_FIXED_FEATURES:
|
|
|
|
if len(vals) > 1:
|
|
# Check the values are too far apart
|
|
if abs(vals[0] - vals[1]) / vals[0] > 0.1:
|
|
# Take the more recent value since it's likely to be more accurate
|
|
vals = [vals[-1]]
|
|
|
|
if vals:
|
|
field_value = np.mean(vals)
|
|
else:
|
|
# Clean using averages
|
|
|
|
avgs = iterative_filtering(cleaning_averages, property_data)
|
|
# TODO: Should probably do a mean/median?
|
|
field_value = avgs[field].iloc[0]
|
|
|
|
if pd.isnull(field_value):
|
|
# Just the use the general averages
|
|
field_value = general_averages[
|
|
(general_averages["PROPERTY_TYPE"] == property_data["PROPERTY_TYPE"].iloc[0]) &
|
|
(general_averages["BUILT_FORM"] == property_data["BUILT_FORM"].iloc[0])
|
|
][field].iloc[0]
|
|
|
|
elif field in LATEST_FIELD:
|
|
field_value = vals[-1] if vals else None
|
|
else:
|
|
if len(vals) > 1:
|
|
if field in MANDATORY_FIXED_FEATURES:
|
|
ignore_epc = True
|
|
else:
|
|
raise ValueError("Fixed feature {} has more than one value - fix me".format(field))
|
|
|
|
field_value = vals[0] if vals else None
|
|
|
|
fixed_data[field] = field_value
|
|
|
|
if ignore_epc:
|
|
continue
|
|
|
|
# We include the lodgement date here as we probably need to factor time into the
|
|
# model, since EPC standards and rigour have changed over time
|
|
variable_data = property_data[
|
|
COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
|
|
]
|
|
|
|
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
|
|
# e.g. first vs second, second vs third and also first vs third
|
|
property_model_data = []
|
|
for idx in range(0, property_data.shape[0] - 1):
|
|
|
|
if idx >= property_data.shape[0] - 1:
|
|
break
|
|
|
|
starting_record = variable_data.iloc[idx]
|
|
ending_record = variable_data.iloc[idx + 1]
|
|
rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE]
|
|
heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE]
|
|
|
|
# TODO: Should this be <= 0?
|
|
if rdsap_change == 0:
|
|
# Assumption: We aren't interested in records that exhibit no change
|
|
continue
|
|
|
|
# TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and
|
|
# floors, we may want to use the U-value. We may also want to handle the (assumed) tags
|
|
# within descriptions
|
|
|
|
starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
|
|
ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
|
|
|
|
features = pd.concat([starting_record, ending_record])
|
|
|
|
property_model_data.append(
|
|
{
|
|
"UPRN": uprn,
|
|
"RDSAP_CHANGE": rdsap_change,
|
|
"HEAT_DEMAND_CHANGE": heat_demand_change,
|
|
**fixed_data,
|
|
**features.to_dict()
|
|
}
|
|
)
|
|
|
|
dataset.extend(property_model_data)
|