mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
202 lines
8.3 KiB
Python
202 lines
8.3 KiB
Python
from tqdm import tqdm
|
|
import os
|
|
import pandas as pd
|
|
import msgpack
|
|
|
|
from model_data.EpcClean import EpcClean
|
|
from model_data.analysis.UvalueEstimations import UvalueEstimations
|
|
from model_data.simulation_system.core.Settings import EARLIEST_EPC_DATE
|
|
from pathlib import Path
|
|
from utils.s3 import save_data_to_s3
|
|
|
|
LAND_REGISTRY_PATHS = [
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv",
|
|
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv",
|
|
]
|
|
|
|
EPC_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates"
|
|
|
|
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev")
|
|
|
|
|
|
def app():
|
|
"""
|
|
For a pre-defined list of constituencies and property data_types, we'll download EPC data from the API
|
|
and produce a dataset of cleaned fields so that when we get new properties, we can quickly
|
|
sanitise any description data
|
|
|
|
Currently, this application is just run on a local machine
|
|
"""
|
|
|
|
cleaned_data = {}
|
|
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
|
|
for directory in tqdm(epc_directories):
|
|
directory_destructured = str(directory).split("/")[-1].split("-")
|
|
gss_code = directory_destructured[1]
|
|
local_authority = directory_destructured[2]
|
|
|
|
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
|
|
# Rename the columns to the same format as the api returns
|
|
data.columns = [c.replace("_", "-").lower() for c in data.columns]
|
|
# Take just date before the date threshold
|
|
data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE]
|
|
|
|
# Convert to list of dictioaries as returned by the api
|
|
data = data.to_dict("records")
|
|
|
|
# Incorporate input data into cleaning
|
|
cleaner = EpcClean(data)
|
|
|
|
cleaner.clean()
|
|
# Extended cleaned_data
|
|
for k, data in cleaner.cleaned.items():
|
|
if k not in cleaned_data:
|
|
cleaned_data[k] = data
|
|
else:
|
|
existing_descriptions = [x["original_description"] for x in cleaned_data[k]]
|
|
new_data = [x for x in data if x["original_description"] not in existing_descriptions]
|
|
cleaned_data[k].extend(new_data)
|
|
|
|
# TODO: Add property age band into this
|
|
# uvalue_estimates = UvalueEstimations(data=data)
|
|
# uvalue_estimates.get_estimates(cleaner=cleaner)
|
|
# # TODO: Store these to a s3
|
|
# uvalue_estimates.walls
|
|
# uvalue_estimates.floors
|
|
# uvalue_estimates.roofs
|
|
|
|
# Basic check to make sure all descriptions are unique
|
|
for _, cleaned in cleaned_data.items():
|
|
descriptions = [x["original_description"] for x in cleaned]
|
|
if len(descriptions) != len(set(descriptions)):
|
|
raise ValueError("Duplicated descriptions found, check me")
|
|
|
|
# Finally, we attach u-values to the descriptions for walls, roofs and floors
|
|
|
|
df = pd.DataFrame(cleaned_data["roof-description"])
|
|
df = df[pd.isnull(df["thermal_transmittance"])]
|
|
|
|
def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched):
|
|
"""Get the U-value from table S9 based on the insulation thickness."""
|
|
if thickness in ["below average", "average", "above average", "none", None] or (
|
|
not is_loft and not is_roof_room
|
|
):
|
|
return None
|
|
elif thickness.endswith("+"):
|
|
thickness = int(thickness[:-1])
|
|
else:
|
|
try:
|
|
thickness = int(thickness)
|
|
except ValueError:
|
|
# If thickness is not a valid number (could be a string or None), return None
|
|
return None
|
|
|
|
# Determine the column to refer based on the roof type
|
|
column = 'Thatched_roof_U_value_W_m2K' if is_thatched else 'Slates_or_tiles_U_value_W_m2K'
|
|
|
|
# Get the correct U-value based on the insulation thickness
|
|
return s9[s9['Insulation_thickness_mm'] >= thickness][column].iloc[0]
|
|
|
|
def get_roof_u_value(description_dict, age_band, s9, s10):
|
|
"""
|
|
Determine the U-value for a roof based on the description dictionary and age band.
|
|
|
|
We use table s9 is the insulation thickness was measured, otherwise we use table s10.
|
|
|
|
Parameters:
|
|
description_dict (dict): Dictionary containing the details of the roof description.
|
|
age_band (str): The age band of the property.
|
|
s9 (pd.DataFrame): The DataFrame representing table S9.
|
|
s10 (pd.DataFrame): The DataFrame representing table S10.
|
|
|
|
Returns:
|
|
float: The determined U-value.
|
|
"""
|
|
|
|
# If there is a dwelling above, the U-value is 0
|
|
if description_dict['has_dwelling_above']:
|
|
return 0.0
|
|
|
|
# Step 1: Try to get the U-value from table S9 based on the insulation thickness
|
|
u_value = get_u_value_from_s9(
|
|
thickness=description_dict['insulation_thickness'],
|
|
s9=s9,
|
|
is_loft=description_dict['is_loft'],
|
|
is_roof_room=description_dict['is_roof_room'],
|
|
is_thatched=description_dict['is_thatched']
|
|
)
|
|
|
|
if u_value is not None:
|
|
return u_value
|
|
|
|
# Step 2: If the U-value could not be determined from table S9, use table S10
|
|
|
|
# Define the columns to be used based on the description details
|
|
if description_dict['is_flat']:
|
|
column = 'Flat_roof'
|
|
elif description_dict['is_thatched']:
|
|
if description_dict['is_roof_room']:
|
|
column = 'Thatched_roof_room_in_roof'
|
|
else:
|
|
column = 'Thatched_roof'
|
|
elif description_dict['is_roof_room']:
|
|
column = 'Room_in_roof_slates_or_tiles'
|
|
elif description_dict['is_pitched']:
|
|
if description_dict['is_at_rafters']:
|
|
column = 'Pitched_slates_or_tiles_insulation_at_rafters'
|
|
else:
|
|
column = 'Pitched_slates_or_tiles_insulation_between_joists_or_unknown'
|
|
else:
|
|
# Default to pitched roof with insulation between joists or unknown
|
|
column = 'Pitched_slates_or_tiles_insulation_between_joists_or_unknown'
|
|
|
|
# Get the U-value from table S10 based on the age band and the determined column
|
|
u_value = s10.loc[s10['Age_band'].str.contains(age_band), column].values[0]
|
|
|
|
return u_value
|
|
|
|
from recommendations.rdsap_tables import age_bands
|
|
|
|
z = pd.DataFrame(cleaned_data["roof-description"])
|
|
z = z[pd.isnull(z["thermal_transmittance"])]
|
|
z["insulation_thickness"].value_counts()
|
|
z[z["insulation_thickness"] == "above average"]
|
|
|
|
z.head(30).to_dict("records")
|
|
|
|
for i, roof in enumerate(cleaned_data["roof-description"]):
|
|
if roof["thermal_transmittance"] is not None or "Average thermal transmittance" in roof["clean_description"]:
|
|
continue
|
|
|
|
for ab in age_bands:
|
|
value = float(
|
|
get_roof_u_value(
|
|
description_dict=roof,
|
|
age_band=ab,
|
|
s9=table_s9,
|
|
s10=table_s10
|
|
)
|
|
)
|
|
|
|
# We store a singular file however we could store the data under the following file path:
|
|
# cleaned_epc_data/{component}/{original_description}/cleaned.bson
|
|
# where component is one of the keys of cleaned_data. If we store it against the original data, this
|
|
# data being read in will be extremely small, meaning quicker load times. We'll begin by storing as a single
|
|
# file and monitor usage patterns to see if it makes sense to split the data up
|
|
|
|
save_data_to_s3(
|
|
data=msgpack.packb(cleaned_data, use_bin_type=True),
|
|
s3_file_name="cleaned_epc_data/cleaned.bson",
|
|
bucket_name=f"retrofit-data-{ENVIRONMENT}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Initialising cleaner app run")
|
|
app()
|