Model/model_data/simulation_system/generate_rdsap_change.py
2023-09-21 15:14:10 +01:00

489 lines
20 KiB
Python

import pandas as pd
from tqdm import tqdm
import msgpack
from pathlib import Path
from model_data.simulation_system.core.Settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
EARLIEST_EPC_DATE,
CARBON_RESPONSE,
)
from model_data.simulation_system.core.DataProcessor import DataProcessor
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3, read_dataframe_from_s3_parquet
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import get_wall_u_value, get_roof_u_value
DATA_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates"
def get_cleaned():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def process_and_prune_desriptions(df, cleaned_lookup):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
:param df:
:param cleaned_lookup:
:return:
"""
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
'original_description', 'thermal_transmittance_unit',
'original_description_ENDING',
'thermal_transmittance_unit_ENDING',
'is_cavity_wall_ENDING', 'is_filled_cavity_ENDING',
'is_solid_brick_ENDING', 'is_system_built_ENDING',
'is_timber_frame_ENDING', 'is_granite_or_whinstone_ENDING',
'is_as_built_ENDING', 'is_cob_ENDING', 'is_assumed_ENDING',
'is_sandstone_or_limestone_ENDING',
# Re remove the is_assumed columns
"is_assumed", "is_assumed_ENDING"
],
"floor": [
"original_description", "clean_description", "thermal_transmittance_unit",
"no_data", "no_data_ENDING", "original_description_ENDING",
"clean_description_ENDING", "thermal_transmittance_unit_ENDING",
"is_suspended_ENDING", "is_solid_ENDING", "another_property_below_ENDING",
"is_to_unheated_space_ENDING", "is_to_external_air_ENDING", "is_assumed",
"is_assumed_ENDING"
],
"roof": [
"original_description", "clean_description", "thermal_transmittance_unit",
"is_assumed", "is_valid", "original_description_ENDING", "clean_description_ENDING",
"thermal_transmittance_unit_ENDING", "is_pitched_ENDING", "is_roof_room_ENDING",
"is_loft_ENDING", "is_flat_ENDING", "is_thatched_ENDING", "is_at_rafters_ENDING",
"has_dwelling_above_ENDING", "is_assumed_ENDING", "is_valid_ENDING"
],
"hotwater": [
"original_description", "clean_description", "assumed", "original_description_ENDING",
"clean_description_ENDING", "assumed_ENDING"
],
"mainheat": [
"original_description", "clean_description", "original_description_ENDING",
"has_assumed", "original_description_ENDING", "clean_description_ENDING",
"has_assumed_ENDING",
],
"mainheatcont": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING"
],
"windows": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing", "glazing_coverage", "no_data", "has_glazing_ENDING", "glazing_coverage_ENDING",
"no_data_ENDING"
],
"main-fuel": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING"
],
}
for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
component_upper = component.upper()
if component == "main-fuel":
component_upper = component_upper.replace("-", "_")
cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description"
left_on_starting = (
f"{component_upper}_STARTING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_STARTING"
)
left_on_ending = (
f"{component_upper}_ENDING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_ENDING"
)
df = df.merge(
pd.DataFrame(cleaned_lookup[cleaned_key]),
how="left",
left_on=left_on_starting,
right_on="original_description",
).merge(
pd.DataFrame(cleaned_lookup[cleaned_key]),
how="left",
left_on=left_on_ending,
right_on="original_description",
suffixes=("", "_ENDING")
)
if component == "walls":
# We make sure the wall construction hasn't changed
df = df[
(df["is_cavity_wall"] == df["is_cavity_wall_ENDING"]) &
(df["is_solid_brick"] == df["is_solid_brick_ENDING"]) &
(df["is_timber_frame"] == df["is_timber_frame_ENDING"]) &
(df["is_granite_or_whinstone"] == df["is_granite_or_whinstone_ENDING"]) &
(df["is_cob"] == df["is_cob_ENDING"]) &
(df["is_sandstone_or_limestone"] == df["is_sandstone_or_limestone_ENDING"])
]
elif component == "floor":
df = df[
(df["is_suspended"] == df["is_suspended_ENDING"]) &
(df["is_solid"] == df["is_solid_ENDING"]) &
(df["another_property_below"] == df["another_property_below_ENDING"]) &
(df["is_to_unheated_space"] == df["is_to_unheated_space_ENDING"]) &
(df["is_to_external_air"] == df["is_to_external_air_ENDING"])
]
elif component == "roof":
df = df[
(df["is_pitched"] == df["is_pitched_ENDING"]) &
(df["is_roof_room"] == df["is_roof_room_ENDING"]) &
(df["is_loft"] == df["is_loft_ENDING"]) &
(df["is_flat"] == df["is_flat_ENDING"]) &
(df["is_thatched"] == df["is_thatched_ENDING"]) &
(df["is_at_rafters"] == df["is_at_rafters_ENDING"]) &
(df["has_dwelling_above"] == df["has_dwelling_above_ENDING"])
]
# Drop the binary indicators and replace the original description with the cleaned version
# Drop original cols
original_cols = [
f"{component_upper}_DESCRIPTION_STARTING", f"{component_upper}_DESCRIPTION_ENDING"
] if component != "main-fuel" else [
f"{component_upper}_STARTING", f"{component_upper}_ENDING"
]
df = df.drop(columns=cols_to_drop[component] + original_cols)
# If we have an insulation_thickness column, rename it
if "insulation_thickness" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"insulation_thickness": f"{component}_insulation_thickness",
"insulation_thickness_ENDING": f"{component}_insulation_thickness_ENDING",
}
)
# If we have thermal transmittance, rename it
if "thermal_transmittance" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"thermal_transmittance": f"{component}_thermal_transmittance",
"thermal_transmittance_ENDING": f"{component}_thermal_transmittance_ENDING",
}
)
# If we have tarrif, rename it
if "tariff_type" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"tariff_type": f"{component}_tariff_type",
"tariff_type_ENDING": f"{component}_tariff_type_ENDING",
}
)
# We need the walls descriptions so we rename them to distinguish them
if component == "walls":
df = df.rename(
columns={
"clean_description": f"{component}_clean_description",
"clean_description_ENDING": f"{component}_clean_description_ENDING",
}
)
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
df = df.drop(columns=["LIGHTING_DESCRIPTION_STARTING", "LIGHTING_DESCRIPTION_ENDING"])
return df
def make_uvalues(df):
df["row_index"] = df.index
uvalues = []
for _, x in df.iterrows():
uprn = x["UPRN"]
row_index = x["row_index"]
# ~~~~~~~~~~~~~~~~~~
# Walls
# ~~~~~~~~~~~~~~~~~~
starting_wall_uvalue = x["walls_thermal_transmittance"]
if pd.isnull(starting_wall_uvalue):
starting_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description"],
age_band=england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]],
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
ending_wall_uvalue = x["walls_thermal_transmittance_ENDING"]
if pd.isnull(ending_wall_uvalue):
if x["walls_clean_description"] != x["walls_clean_description_ENDING"]:
ending_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description_ENDING"],
age_band=england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]],
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
else:
ending_wall_uvalue = starting_wall_uvalue
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
starting_roof_uvalue = x["roof_thermal_transmittance"]
if pd.isnull(starting_roof_uvalue):
starting_roof_uvalue = get_roof_u_value(
insulation_thickness=x["roof_insulation_thickness"],
has_dwelling_above=x["has_dwelling_above"],
is_loft=x["is_loft"],
is_roof_room=x["is_roof_room"],
is_thatched=x["is_thatched"],
is_flat=x["is_flat"],
is_pitched=x["is_pitched"],
is_at_rafters=x["is_at_rafters"],
age_band=england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]]
)
ending_roof_uvalue = x["roof_thermal_transmittance_ENDING"]
if pd.isnull(ending_roof_uvalue):
ending_roof_uvalue = get_roof_u_value(
insulation_thickness=x["roof_insulation_thickness_ENDING"],
has_dwelling_above=x["has_dwelling_above"],
is_loft=x["is_loft"],
is_roof_room=x["is_roof_room"],
is_thatched=x["is_thatched"],
is_flat=x["is_flat"],
is_pitched=x["is_pitched"],
is_at_rafters=x["is_at_rafters"],
age_band=england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]]
)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
uvalues.append(
{
"UPRN": uprn,
"row_index": row_index,
"starting_wall_uvalue": starting_wall_uvalue,
"ending_wall_uvalue": ending_wall_uvalue,
"starting_roof_uvalue": starting_roof_uvalue,
"ending_roof_uvalue": ending_roof_uvalue,
}
)
uvalues = pd.DataFrame(uvalues)
df = df.merge(
uvalues, how="left", on=["UPRN", "row_index"]
).drop(columns="row_index")
return df
def app():
# Get all the files in the directory
# Data glossary:
# https://epc.opendatacommunities.org/docs/guidance#glossary
cleaned_lookup = get_cleaned()
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
cleaning_dataset = []
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
data_processor = DataProcessor(filepath=filepath)
df = data_processor.pre_process()
cleaning_averages = data_processor.make_cleaning_averages()
# We have some odd cases with missing constituency so we fill
df = df.fillna({"CONSTITUENCY": df["CONSTITUENCY"].mode().values[0]})
df = DataProcessor.apply_averages_cleaning(
data_to_clean=df,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
data_by_urpn = []
for uprn, property_data in df.groupby("UPRN", observed=True):
# Fixed features - these are property attributes that shouldn't change over time
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = (
property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
)
# Combine all fields together
fixed_data.update(mandatory_field_data)
fixed_data.update(latest_field_data)
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[
COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
property_model_data = []
for idx in range(0, property_data.shape[0] - 1):
if idx >= property_data.shape[0] - 1:
break
earliest_record = variable_data.iloc[idx]
latest_record = variable_data.iloc[idx + 1]
# Check if the sap gets better or worse
gets_better = earliest_record[RDSAP_RESPONSE] <= latest_record[RDSAP_RESPONSE]
if gets_better:
starting_sap = earliest_record[RDSAP_RESPONSE]
starting_heat_demand = earliest_record[HEAT_DEMAND_RESPONSE]
starting_carbon = earliest_record[CARBON_RESPONSE]
rdsap_change = latest_record[RDSAP_RESPONSE] - starting_sap
heat_demand_change = latest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
carbon_change = latest_record[CARBON_RESPONSE] - starting_carbon
starting_record = earliest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = latest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
else:
starting_sap = latest_record[RDSAP_RESPONSE]
starting_heat_demand = latest_record[HEAT_DEMAND_RESPONSE]
starting_carbon = latest_record[CARBON_RESPONSE]
rdsap_change = earliest_record[RDSAP_RESPONSE] - starting_sap
heat_demand_change = earliest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
carbon_change = earliest_record[CARBON_RESPONSE] - starting_carbon
starting_record = latest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = earliest_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
if rdsap_change == 0:
continue
features = pd.concat([starting_record, ending_record])
property_model_data.append(
{
"UPRN": uprn,
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
"CARBON_CHANGE": carbon_change,
"SAP_STARTING": starting_sap,
"HEAT_DEMAND_STARTING": starting_heat_demand,
"CARBON_STARTING": starting_carbon,
**fixed_data,
**features.to_dict(),
}
)
data_by_urpn.extend(property_model_data)
data_by_urpn_df = pd.DataFrame(data_by_urpn)
# Add some temporal features - we look at the days from the standard starting point in time
# for the starting and ending date so all records are from a fixed point
data_by_urpn_df["DAYS_TO_STARTING"] = (
pd.to_datetime(data_by_urpn_df["LODGEMENT_DATE_STARTING"]) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
data_by_urpn_df["DAYS_TO_ENDING"] = (
pd.to_datetime(data_by_urpn_df["LODGEMENT_DATE_ENDING"]) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
if pd.isnull(data_by_urpn_df).sum().sum():
raise ValueError("Null values found in dataset")
data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup)
# Apply u-values
########################
# Walls
########################
for col in ["walls_clean_description", "walls_clean_description_ENDING"]:
data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip()
data_by_urpn_df = make_uvalues(data_by_urpn_df).drop(
columns=["walls_clean_description", "walls_clean_description_ENDING"]
)
get_wall_u_value(clean_description=)
if pd.isnull(data_by_urpn_df).sum().sum():
raise ValueError("Null values found in dataset after process_and_prune_desriptions")
dataset.append(data_by_urpn_df)
cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_dataset.append(cleaning_averages)
# Store cleaning dataset in s3 as a parquet file
cleaning_dataset = pd.concat(cleaning_dataset)
save_dataframe_to_s3_parquet(
df=cleaning_dataset,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
output = pd.concat(dataset)
save_dataframe_to_s3_parquet(
df=output,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/dataset_new.parquet",
)
if __name__ == "__main__":
app()