Model/model_data/simulation_system/generate_rdsap_change.py
2023-09-15 19:13:14 +01:00

328 lines
14 KiB
Python

import pandas as pd
from tqdm import tqdm
import msgpack
from pathlib import Path
from model_data.simulation_system.core.Settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
EARLIEST_EPC_DATE,
CARBON_RESPONSE,
)
from model_data.simulation_system.core.DataProcessor import DataProcessor
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3, read_dataframe_from_s3_parquet
DATA_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates"
def get_cleaned():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def process_and_prune_desriptions(df, cleaned_lookup):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
:param df:
:param cleaned_lookup:
:return:
"""
# TODO: In a future iteration, we can test using the binary features and the insulation thickness
# estimates, we well as estimated U-values
cols_to_drop = {
"walls": [
'original_description', 'thermal_transmittance',
'thermal_transmittance_unit', 'is_cavity_wall', 'is_filled_cavity',
'is_solid_brick', 'is_system_built', 'is_timber_frame',
'is_granite_or_whinstone', 'is_as_built', 'is_cob', 'is_assumed',
'is_sandstone_or_limestone', 'insulation_thickness',
'external_insulation', 'internal_insulation',
'original_description_ENDING',
'thermal_transmittance_ENDING', 'thermal_transmittance_unit_ENDING',
'is_cavity_wall_ENDING', 'is_filled_cavity_ENDING',
'is_solid_brick_ENDING', 'is_system_built_ENDING',
'is_timber_frame_ENDING', 'is_granite_or_whinstone_ENDING',
'is_as_built_ENDING', 'is_cob_ENDING', 'is_assumed_ENDING',
'is_sandstone_or_limestone_ENDING', 'insulation_thickness_ENDING',
'external_insulation_ENDING', 'internal_insulation_ENDING',
],
"floor": [
'original_description', 'thermal_transmittance',
'thermal_transmittance_unit', 'is_assumed', 'is_to_unheated_space',
'is_to_external_air', 'is_suspended', 'is_solid',
'another_property_below', 'insulation_thickness', 'no_data',
'original_description_ENDING',
'thermal_transmittance_ENDING', 'thermal_transmittance_unit_ENDING',
'is_assumed_ENDING', 'is_to_unheated_space_ENDING',
'is_to_external_air_ENDING', 'is_suspended_ENDING', 'is_solid_ENDING',
'another_property_below_ENDING', 'insulation_thickness_ENDING',
'no_data_ENDING',
],
"roof": [
'original_description', 'clean_description', 'thermal_transmittance',
'thermal_transmittance_unit', 'is_pitched', 'is_roof_room', 'is_loft',
'is_flat', 'is_thatched', 'is_at_rafters', 'is_assumed',
'has_dwelling_above', 'is_valid', 'insulation_thickness',
'original_description_ENDING', 'clean_description_ENDING',
'thermal_transmittance_ENDING', 'thermal_transmittance_unit_ENDING',
'is_pitched_ENDING', 'is_roof_room_ENDING', 'is_loft_ENDING',
'is_flat_ENDING', 'is_thatched_ENDING', 'is_at_rafters_ENDING',
'is_assumed_ENDING', 'has_dwelling_above_ENDING', 'is_valid_ENDING',
'insulation_thickness_ENDING',
]
}
for component in ["walls", "floor", "roof"]:
component_upper = component.upper()
df = df.merge(
pd.DataFrame(cleaned_lookup[f"{component}-description"]),
how="left",
left_on=f"{component_upper}_DESCRIPTION_STARTING",
right_on="original_description",
).merge(
pd.DataFrame(cleaned_lookup[f"{component}-description"]),
how="left",
left_on=f"{component_upper}_DESCRIPTION_ENDING",
right_on="original_description",
suffixes=("", "_ENDING")
)
if component == "walls":
# We make sure the wall construction hasn't changed
df = df[
(df["is_cavity_wall"] == df["is_cavity_wall_ENDING"]) &
(df["is_solid_brick"] == df["is_solid_brick_ENDING"]) &
(df["is_timber_frame"] == df["is_timber_frame_ENDING"]) &
(df["is_granite_or_whinstone"] == df["is_granite_or_whinstone_ENDING"]) &
(df["is_cob"] == df["is_cob_ENDING"]) &
(df["is_sandstone_or_limestone"] == df["is_sandstone_or_limestone_ENDING"])
]
elif component == "floor":
df = df[
(df["is_suspended"] == df["is_suspended_ENDING"]) &
(df["is_solid"] == df["is_solid_ENDING"]) &
(df["another_property_below"] == df["another_property_below_ENDING"]) &
(df["is_to_unheated_space"] == df["is_to_unheated_space_ENDING"])
]
else:
df = df[
(df["is_pitched"] == df["is_pitched_ENDING"]) &
(df["is_roof_room"] == df["is_roof_room_ENDING"]) &
(df["is_loft"] == df["is_loft_ENDING"]) &
(df["is_flat"] == df["is_flat_ENDING"]) &
(df["is_thatched"] == df["is_thatched_ENDING"]) &
(df["is_at_rafters"] == df["is_at_rafters_ENDING"]) &
(df["has_dwelling_above"] == df["has_dwelling_above_ENDING"])
]
# Drop the binary indicators and replace the original description with the cleaned version
# Drop original cols
original_cols = [
f"{component_upper}_DESCRIPTION_STARTING", f"{component_upper}_DESCRIPTION_ENDING"
]
df = df.drop(
columns=cols_to_drop[component] + original_cols
).rename(
columns={
"clean_description": f"{component_upper}_DESCRIPTION_STARTING",
"clean_description_ENDING": f"{component_upper}_DESCRIPTION_ENDING",
}
)
return df
def app():
# Get all the files in the directory
# Data glossary:
# https://epc.opendatacommunities.org/docs/guidance#glossary
cleaned_lookup = get_cleaned()
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
cleaning_dataset = []
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
data_processor = DataProcessor(filepath=filepath)
df = data_processor.pre_process()
cleaning_averages = data_processor.make_cleaning_averages()
data_by_urpn = []
for uprn, property_data in df.groupby("UPRN", observed=True):
# Fixed features - these are property attributes that shouldn't change over time
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1):
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = (
property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
)
# Extract the columns that are not all None
modified_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=property_data,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
# Combine all fields together
fixed_data.update(mandatory_field_data)
fixed_data.update(latest_field_data)
# Apply cleaning to fixed_data
fixed_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([fixed_data]),
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
).to_dict("records")[0]
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = modified_property_data[
COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
property_model_data = []
for idx in range(0, modified_property_data.shape[0] - 1):
if idx >= modified_property_data.shape[0] - 1:
break
earliest_record = variable_data.iloc[idx]
latest_record = variable_data.iloc[idx + 1]
# Check if the sap gets better or worse
gets_better = earliest_record[RDSAP_RESPONSE] <= latest_record[RDSAP_RESPONSE]
if gets_better:
starting_sap = earliest_record[RDSAP_RESPONSE]
starting_heat_demand = earliest_record[HEAT_DEMAND_RESPONSE]
starting_carbon = earliest_record[CARBON_RESPONSE]
rdsap_change = latest_record[RDSAP_RESPONSE] - starting_sap
heat_demand_change = latest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
carbon_change = latest_record[CARBON_RESPONSE] - starting_carbon
starting_record = earliest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = latest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
else:
starting_sap = latest_record[RDSAP_RESPONSE]
starting_heat_demand = latest_record[HEAT_DEMAND_RESPONSE]
starting_carbon = latest_record[CARBON_RESPONSE]
rdsap_change = earliest_record[RDSAP_RESPONSE] - starting_sap
heat_demand_change = earliest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
carbon_change = earliest_record[CARBON_RESPONSE] - starting_carbon
starting_record = latest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = earliest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
if rdsap_change == 0:
continue
features = pd.concat([starting_record, ending_record])
property_model_data.append(
{
"UPRN": uprn,
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
"CARBON_CHANGE": carbon_change,
"SAP_STARTING": starting_sap,
"HEAT_DEMAND_STARTING": starting_heat_demand,
"CARBON_STARTING": starting_carbon,
**fixed_data,
**features.to_dict(),
}
)
data_by_urpn.extend(property_model_data)
data_by_urpn_df = pd.DataFrame(data_by_urpn)
# Add some temporal features - we look at the days from the standard starting point in time
# for the starting and ending date so all records are from a fixed point
data_by_urpn_df["DAYS_TO_STARTING"] = (
pd.to_datetime(data_by_urpn_df["LODGEMENT_DATE_STARTING"]) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
data_by_urpn_df["DAYS_TO_ENDING"] = (
pd.to_datetime(data_by_urpn_df["LODGEMENT_DATE_ENDING"]) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
# TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and
# floors, we may want to use the U-value. We may also want to handle the (assumed) tags
# within descriptions
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
if pd.isnull(data_by_urpn_df).sum().sum():
raise ValueError("Null values found in dataset")
data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup)
dataset.append(data_by_urpn_df)
cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_dataset.append(cleaning_averages)
# Store cleaning dataset in s3 as a parquet file
cleaning_dataset = pd.concat(cleaning_dataset)
save_dataframe_to_s3_parquet(
df=cleaning_dataset,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
output = pd.concat(dataset)
save_dataframe_to_s3_parquet(
df=output,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/dataset.parquet",
)
if __name__ == "__main__":
app()