This commit is contained in:
Khalim Conn-Kowlessar 2024-05-30 13:06:03 +01:00
commit 46fd33d74c
6 changed files with 302 additions and 129 deletions

View file

@ -5,7 +5,7 @@ from BaseUtility import Definitions
from etl.epc.settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
IGNORED_TRANSACTION_TYPES,
# IGNORED_TRANSACTION_TYPES,
IGNORED_FLOOR_LEVELS,
IGNORED_PROPERTY_TYPES,
IGNORED_TENURES,
@ -56,8 +56,11 @@ construction_age_remap = {
expanded_map = {
i: [
label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
][0] for i in range(0, 3001)
label
for label, bounds in construction_age_bounds_map.items()
if (i <= bounds["u"]) and (i >= bounds["l"])
][0]
for i in range(0, 3001)
}
@ -74,8 +77,13 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training", violation_mode: bool = False) -> None:
def __init__(
self,
data: pd.DataFrame | None = None,
cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training",
violation_mode: bool = False,
) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
@ -86,7 +94,9 @@ class EPCDataProcessor:
self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
self.cleaning_averages: pd.DataFrame = (
cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
)
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
@ -103,7 +113,9 @@ class EPCDataProcessor:
ignore_step = True if self.run_mode == "newdata" else False
if filepath is not None:
self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.load_data(
filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]
)
if len(self.data) == 0:
raise Exception("No data to process - check filepath/ data being passed in")
@ -121,7 +133,8 @@ class EPCDataProcessor:
self.clean_multi_glaze_proportion(ignore_step=ignore_step)
self.clean_photo_supply()
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"],
ignore_step=ignore_step,
)
self.fill_na_fields()
@ -188,7 +201,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[
0
]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
"""
@ -201,7 +216,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
self.data = self.data.fillna(
{"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}
)
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
"""
@ -301,7 +318,7 @@ class EPCDataProcessor:
"""
if self.violation_mode:
# TODO: to fill in
# TODO: to fill in
return
if ignore_step:
@ -311,9 +328,7 @@ class EPCDataProcessor:
lambda x: self.clean_construction_age_band(x)
)
self.data = self.data[
~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])
]
self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])]
def clean_missing_rooms(self, ignore_step: bool = False):
"""
@ -331,31 +346,45 @@ class EPCDataProcessor:
return
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0])
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(
lambda x: x.split(" ")[0]
)
def apply_clean(data, matching_columns):
cleaning_data = data[~pd.isnull(data[col])].groupby(
matching_columns
)[col].median().reset_index()
data = data.merge(
cleaning_data, how="left", on=matching_columns, suffixes=("", "_CLEANING")
cleaning_data = (
data[~pd.isnull(data[col])]
.groupby(matching_columns)[col]
.median()
.reset_index()
)
data[col] = np.where(pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col])
data = data.merge(
cleaning_data,
how="left",
on=matching_columns,
suffixes=("", "_CLEANING"),
)
data[col] = np.where(
pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]
)
data = data.drop(columns=f"{col}_CLEANING")
return data
for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]:
to_index = 3
matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"]
matching_columns = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"POSTAL_AREA",
]
has_missings = pd.isnull(self.data[col]).sum()
while has_missings:
self.data = apply_clean(
data=self.data,
matching_columns=matching_columns[0:to_index + 1]
data=self.data, matching_columns=matching_columns[0 : to_index + 1]
)
has_missings = pd.isnull(self.data[col]).sum()
@ -363,7 +392,10 @@ class EPCDataProcessor:
# Check if we've gotten to index 0 and still have missings - something has gone wrong or
# we have a very unique property type
if has_missings:
raise NotImplementedError("Handle this edge case, we still have missings for column %s" % col)
raise NotImplementedError(
"Handle this edge case, we still have missings for column %s"
% col
)
break
to_index -= 1
@ -410,7 +442,7 @@ class EPCDataProcessor:
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else
# COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
# self.na_remapping()
@ -437,9 +469,11 @@ class EPCDataProcessor:
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if auto_subset_columns else fill_na_map
fill_na_map_apply = (
{k: v for k, v in fill_na_map.items() if k in self.data.columns}
if auto_subset_columns
else fill_na_map
)
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
@ -535,28 +569,34 @@ class EPCDataProcessor:
for variable in AVERAGE_FIXED_FEATURES:
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_AVERAGE"]
)
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_AVERAGE"
)
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"]
)
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_PROPERTY_AVERAGE"
)
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"]
)
cleaning_averages_filled["variable"] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_BUILT_FORM_AVERAGE"
)
# If there still is na values, use average across all epc in consituecy
cleaning_averages_filled[variable] = cleaning_averages_filled[
@ -573,7 +613,9 @@ class EPCDataProcessor:
self.cleaning_averages = cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None:
def retain_multiple_epc_properties(
self, epc_minimum_count: int = 1, ignore_step: bool = False
) -> None:
"""
Reduce the data futher by keeping only datasets with multiple epcs
"""
@ -592,12 +634,16 @@ class EPCDataProcessor:
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
def recast_df_columns(
self, column_mappings: dict, auto_subset_columns: bool = False
) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
column_mappings = {
k: v for k, v in column_mappings.items() if k in self.data.columns
}
for key, values in column_mappings.items():
if key not in self.data.columns:
@ -608,13 +654,17 @@ class EPCDataProcessor:
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
def recast_all_data(
self, column_mappings: dict, auto_subset_columns: bool = False
) -> None:
"""
Using a dictionary to recast all columns at once
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
column_mappings = {
k: v for k, v in column_mappings.items() if k in self.data.columns
}
self.data = self.data.astype(column_mappings)
@ -625,14 +675,26 @@ class EPCDataProcessor:
if self.violation_mode:
violation_uprn_missing = pd.isnull(self.data["UPRN"])
violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
violation_old_lodgment_date = (
self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
)
# violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(
IGNORED_FLOOR_LEVELS
)
violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE
violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"])
violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"])
violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"])
violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
violation_missing_windows_description = pd.isnull(
self.data["WINDOWS_DESCRIPTION"]
)
violation_missing_hotwater_description = pd.isnull(
self.data["HOTWATER_DESCRIPTION"]
)
violation_missing_roof_description = pd.isnull(
self.data["ROOF_DESCRIPTION"]
)
violation_invalid_property_type = (
self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
)
violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES)
violation_df = pd.concat(
@ -647,7 +709,8 @@ class EPCDataProcessor:
violation_missing_roof_description,
violation_invalid_property_type,
violation_invalid_tenure,
], axis=1,
],
axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
@ -658,8 +721,8 @@ class EPCDataProcessor:
"violation_missing_hotwater_description",
"violation_missing_roof_description",
"violation_invalid_property_type",
"violation_invalid_tenure"
]
"violation_invalid_tenure",
],
)
self.data = pd.concat([self.data, violation_df], axis=1)
@ -685,10 +748,8 @@ class EPCDataProcessor:
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
self.data = self.data[
~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
]
# self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)]
self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE]
# We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them
@ -705,7 +766,7 @@ class EPCDataProcessor:
self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)]
# We remap zero values to None
self.data.loc[self.data['FLOOR_HEIGHT'] == 0, 'FLOOR_HEIGHT'] = None
self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
"""
@ -734,7 +795,11 @@ class EPCDataProcessor:
@staticmethod
def apply_averages_cleaning(
data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False
data_to_clean,
cleaning_data,
cols_to_merge_on,
colnames=None,
ignore_step: bool = False,
):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
@ -752,12 +817,13 @@ class EPCDataProcessor:
# The desired colnames to clean - which may not be present
if colnames is None:
colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"]
colnames = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
cols_to_clean = [
c for c in colnames if
c in data_to_clean.columns
]
cols_to_clean = [c for c in colnames if c in data_to_clean.columns]
# Enforce data types
for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]:
@ -768,7 +834,15 @@ class EPCDataProcessor:
# Calculate averages
cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg(
dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean)))
dict(
zip(
cols_to_clean,
[
"mean",
]
* len(cols_to_clean),
)
)
)
# Merge with the original data
@ -777,7 +851,7 @@ class EPCDataProcessor:
cleaning_averages_to_merge,
on=columns_to_merge_on,
suffixes=("", "_AVERAGE"),
how='left'
how="left",
)
global_averages = cleaning_data[cols_to_clean].mean()
@ -806,14 +880,20 @@ class EPCDataProcessor:
raise Exception("Suffix should be one of _starting or _ending")
if suffix == "_STARTING":
starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix)
starting_cols = (
self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
.copy()
.add_suffix(suffix)
)
fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy()
return pd.concat([starting_cols, fixed_cols], axis=1)
return self.data[
ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES
].copy().add_suffix(suffix)
return (
self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
.copy()
.add_suffix(suffix)
)
def get_fixed_features(self) -> pd.DataFrame:
"""
@ -831,14 +911,17 @@ class EPCDataProcessor:
:param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids
:return: DataFrame with coerced columns.
"""
object_columns = df.select_dtypes(include=['object']).columns
object_columns = df.select_dtypes(include=["object"]).columns
if cols_to_ignore:
object_columns = [c for c in object_columns if c not in cols_to_ignore]
for column in object_columns:
unique_values = df[column].dropna().unique()
# If the unique values in the column are 'True' and 'False', convert the column to boolean
if set(unique_values) == {'True', 'False'} or set(unique_values) == {True, False}:
if set(unique_values) == {"True", "False"} or set(unique_values) == {
True,
False,
}:
df[column] = df[column].astype(bool)
return df
@ -877,7 +960,6 @@ class EPCDataProcessor:
@staticmethod
def clean_efficiency_variables(df):
"""
These is scope to clean this by the model per corresponding description.
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and

View file

@ -229,7 +229,9 @@ class TrainingDataset(BaseDataset):
"""
# TODO: move into EPCRecord record
uvalue_columns = [
col for col in self.df.columns if "thermal_transmittance" in col
col
for col in self.df.columns
if "thermal_transmittance" in col and "_unit" not in col
]
for uvalue_col in uvalue_columns:
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
@ -559,9 +561,9 @@ class TrainingDataset(BaseDataset):
"walls": [
# We need to cleaned descriptions for pulling out u-values
"original_description",
"thermal_transmittance_unit",
# "thermal_transmittance_unit",
"original_description_ending",
"thermal_transmittance_unit_ending",
# "thermal_transmittance_unit_ending",
"is_cavity_wall_ending",
"is_solid_brick_ending",
"is_system_built_ending",
@ -703,6 +705,8 @@ class TrainingDataset(BaseDataset):
"insulation_thickness_ending": f"{component}_insulation_thickness_ending",
"thermal_transmittance": f"{component}_thermal_transmittance",
"thermal_transmittance_ending": f"{component}_thermal_transmittance_ending",
"thermal_transmittance_unit": f"{component}_thermal_transmittance_unit",
"thermal_transmittance_unit_ending": f"{component}_thermal_transmittance_unit_ending",
"tariff_type": f"{component}_tariff_type",
"tariff_type_ending": f"{component}_tariff_type_ending",
"clean_description": f"{component}_clean_description",

View file

@ -64,6 +64,21 @@ def get_cleaned_description_mapping():
clean_lookup = get_cleaned_description_mapping()
# TODO: THIS IS A TEMPORARY FIX
new_walls_description_mapping = pd.DataFrame(clean_lookup["walls-description"])
import numpy as np
new_walls_description_mapping["thermal_transmittance_unit"] = np.where(
~pd.isnull(new_walls_description_mapping["thermal_transmittance_unit"]),
"w/m-¦k",
new_walls_description_mapping["thermal_transmittance_unit"],
)
clean_lookup["walls-description"] = new_walls_description_mapping.to_dict(
orient="records"
)
class EPCPipeline:
"""

View file

@ -41,6 +41,15 @@ cleaning_data = read_dataframe_from_s3_parquet(
materials = get_materials(session)
cleaned = get_cleaned()
# TODO: THIS IS A TEMPORARY FIX
new_walls_description_mapping = pd.DataFrame(cleaned["walls-description"])
new_walls_description_mapping.loc[
~new_walls_description_mapping["thermal_transmittance_unit"].isnull(),
"thermal_transmittance_unit",
] = "w/m-¦k"
cleaned["walls-description"] = new_walls_description_mapping.to_dict(orient="records")
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
@ -167,7 +176,7 @@ for scenario_property in scenario_properties:
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend("0")
property_recommendations = recommender.recommend()
wall_recommendations = recommender.wall_recomender.recommendations
loft_recommendations = recommender.roof_recommender.recommendations

View file

@ -23,7 +23,7 @@ class WallRecommendations(Definitions):
# After 1930, Solid brick walls became less populate and instead, cavity walls became a
# more popular choice
YEARS_CAVITY_WALLS_BEGAN = 1930
U_VALUE_UNIT = 'w/m-¦k'
U_VALUE_UNIT = "w/m-¦k"
# part L building regulations indicate that any rennovations on an existing property's walls should
# achieve a U-value of no higher than 0.3
@ -99,9 +99,10 @@ class WallRecommendations(Definitions):
]
self.internal_wall_non_insulation_materials = [
part for part in materials if part["type"] in [
"iwi_wall_demolition", "iwi_vapour_barrier", "iwi_redecoration"
]
part
for part in materials
if part["type"]
in ["iwi_wall_demolition", "iwi_vapour_barrier", "iwi_redecoration"]
]
self.external_wall_insulation_materials = [
@ -109,9 +110,10 @@ class WallRecommendations(Definitions):
]
self.external_wall_non_insulation_materials = [
part for part in materials if part["type"] in [
"ewi_wall_demolition", "ewi_wall_preparation", "ewi_wall_redecoration"
]
part
for part in materials
if part["type"]
in ["ewi_wall_demolition", "ewi_wall_preparation", "ewi_wall_redecoration"]
]
@property
@ -122,7 +124,9 @@ class WallRecommendations(Definitions):
# Current logic: If the property is in a conservation area/heritage building/listed building or a flat,
# it is not suitable for EWI
if self.property.restricted_measures or (self.property.data["property-type"].lower() == "flat"):
if self.property.restricted_measures or (
self.property.data["property-type"].lower() == "flat"
):
return False
return True
@ -174,31 +178,43 @@ class WallRecommendations(Definitions):
# recommend internal wall insulation as a possible measure
u_value = self.property.walls["thermal_transmittance"]
u_value = None if math.isnan(u_value) else u_value
is_cavity_wall = self.property.walls["is_cavity_wall"]
insulation_thickness = self.property.walls["insulation_thickness"]
# We check if the wall is already insulated and if so, we exit
if ((insulation_thickness in ["average", "above average"]) or self.property.walls["is_filled_cavity"]) and (
"cavity_extract_and_refill" not in self.property.non_invasive_recommendations
if (
(insulation_thickness in ["average", "above average"])
or self.property.walls["is_filled_cavity"]
) and (
"cavity_extract_and_refill"
not in self.property.non_invasive_recommendations
):
return
if u_value:
if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT:
raise NotImplementedError("Haven't handled the case of other u value units yet")
raise NotImplementedError(
"Haven't handled the case of other u value units yet"
)
# If the property is a new build and the U-value is below 0.75, we don't recommend insulation because it's
# not practical
if (self.property.data["transaction-type"] == "new dwelling") and (u_value <= self.NEW_BUILD_INSULATED):
if (self.property.data["transaction-type"] == "new dwelling") and (
u_value <= self.NEW_BUILD_INSULATED
):
# Recommend nothing
return
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already
# + it already has a U-value WORSE than the building regulations, so we recommend either internal or
# external wall insulation
if (not is_cavity_wall) and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION) and (
u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE
if (
(not is_cavity_wall)
and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION)
and (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE)
):
# Recommend insulation
self.find_insulation(u_value, phase)
@ -206,8 +222,10 @@ class WallRecommendations(Definitions):
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already
# + it already has a U-value better than the building regulations, so we don't need to recommend anything
if (not is_cavity_wall) and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION) and (
u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE
if (
(not is_cavity_wall)
and (self.property.year_built >= self.YEAR_WALLS_BUILT_WITH_INSULATION)
and (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE)
):
# Recommend nothing
return
@ -270,28 +288,40 @@ class WallRecommendations(Definitions):
recommendations = []
for _, material in insulation_materials.iterrows():
part_u_value = r_value_per_mm_to_u_value(cavity_width, material["r_value_per_mm"])
part_u_value = r_value_per_mm_to_u_value(
cavity_width, material["r_value_per_mm"]
)
_, new_u_value = calculate_u_value_uplift(u_value, part_u_value)
new_u_value = math.ceil(new_u_value * 100.0) / 100.0
if is_diminishing_returns(
recommendations, new_u_value, lowest_selected_u_value, self.DIMINISHING_RETURNS_U_VALUE
recommendations,
new_u_value,
lowest_selected_u_value,
self.DIMINISHING_RETURNS_U_VALUE,
):
continue
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_CAVITY_WALL_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
lowest_selected_u_value = update_lowest_selected_u_value(
lowest_selected_u_value, new_u_value
)
is_extraction_and_refill = "cavity_extract_and_refill" in self.property.non_invasive_recommendations
is_extraction_and_refill = (
"cavity_extract_and_refill"
in self.property.non_invasive_recommendations
)
cost_result = self.costs.cavity_wall_insulation(
wall_area=self.property.insulation_wall_area,
material=material.to_dict(),
is_extraction_and_refill=is_extraction_and_refill
is_extraction_and_refill=is_extraction_and_refill,
)
already_installed = "cavity_wall_insulation" in self.property.already_installed
already_installed = (
"cavity_wall_insulation" in self.property.already_installed
)
if already_installed:
cost_result = override_costs(cost_result)
@ -323,7 +353,7 @@ class WallRecommendations(Definitions):
part=material.to_dict(),
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
cost_result=cost_result
cost_result=cost_result,
)
],
"type": "cavity_wall_insulation",
@ -378,7 +408,9 @@ class WallRecommendations(Definitions):
for _, material in insulation_material_group.iterrows():
part_u_value = r_value_per_mm_to_u_value(material["depth"], material["r_value_per_mm"])
part_u_value = r_value_per_mm_to_u_value(
material["depth"], material["r_value_per_mm"]
)
_, new_u_value = calculate_u_value_uplift(u_value, part_u_value)
new_u_value = math.ceil(new_u_value * 100.0) / 100.0
@ -389,22 +421,30 @@ class WallRecommendations(Definitions):
# further into the diminishing returns threshold and can shouldn't be
if is_diminishing_returns(
recommendations, new_u_value, lowest_selected_u_value, self.DIMINISHING_RETURNS_U_VALUE
recommendations,
new_u_value,
lowest_selected_u_value,
self.DIMINISHING_RETURNS_U_VALUE,
):
continue
# We allow a small tolerance for error so we don't discount the recommendation entirely
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
lowest_selected_u_value = update_lowest_selected_u_value(
lowest_selected_u_value, new_u_value
)
if material["type"] == "internal_wall_insulation":
cost_result = self.costs.internal_wall_insulation(
wall_area=self.property.insulation_wall_area,
material=material.to_dict(),
non_insulation_materials=non_insulation_materials
non_insulation_materials=non_insulation_materials,
)
already_installed = (
"internal_wall_insulation"
in self.property.already_installed
)
already_installed = "internal_wall_insulation" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
@ -416,9 +456,12 @@ class WallRecommendations(Definitions):
cost_result = self.costs.external_wall_insulation(
wall_area=self.property.insulation_wall_area,
material=material.to_dict(),
non_insulation_materials=non_insulation_materials
non_insulation_materials=non_insulation_materials,
)
already_installed = (
"external_wall_insulation"
in self.property.already_installed
)
already_installed = "external_wall_insulation" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
@ -452,7 +495,7 @@ class WallRecommendations(Definitions):
part=material.to_dict(),
quantity=self.property.insulation_wall_area,
quantity_unit=QuantityUnits.m2.value,
cost_result=cost_result
cost_result=cost_result,
)
],
"type": material["type"],
@ -484,16 +527,18 @@ class WallRecommendations(Definitions):
if self.ewi_valid:
ewi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.external_wall_insulation_materials),
insulation_materials=pd.DataFrame(
self.external_wall_insulation_materials
),
non_insulation_materials=self.external_wall_non_insulation_materials,
phase=phase
phase=phase,
)
iwi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),
non_insulation_materials=self.internal_wall_non_insulation_materials,
phase=phase
phase=phase,
)
self.recommendations += ewi_recommendations + iwi_recommendations
@ -501,12 +546,16 @@ class WallRecommendations(Definitions):
@staticmethod
def _make_description(material):
if material["type"] == "internal_wall_insulation":
return (f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on internal "
f"walls")
return (
f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on internal "
f"walls"
)
if material["type"] == "external_wall_insulation":
return (f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on external "
f"walls")
return (
f"Install {int(material['depth'])}{material['depth_unit']} {material['description']} on external "
f"walls"
)
if material["type"] == "cavity_wall_insulation":
return f"Fill cavity with {material['description']}"

View file

@ -14,7 +14,7 @@ class WindowsRecommendations:
# glazed
"most": 0.33,
# If glazing is partial, we assume 50/50 split between glazed and unglazed
"partial": 0.5
"partial": 0.5,
}
def __init__(self, property_instance: Property, materials: List):
@ -52,14 +52,20 @@ class WindowsRecommendations:
if not number_of_windows:
raise ValueError("Number of windows not specified")
if self.property.windows["has_glazing"] & (self.property.windows["glazing_coverage"] == "full"):
if self.property.windows["has_glazing"] & (
self.property.windows["glazing_coverage"] == "full"
):
return
# We scale the number of windows based on the proportion of existing glazing
if self.property.data["multi-glaze-proportion"] != "":
n_windows_scalar = 1 - (int(self.property.data["multi-glaze-proportion"]) / 100)
n_windows_scalar = 1 - (
int(self.property.data["multi-glaze-proportion"]) / 100
)
else:
n_windows_scalar = self.COVERAGE_MAP.get(self.property.windows["glazing_coverage"], 1)
n_windows_scalar = self.COVERAGE_MAP.get(
self.property.windows["glazing_coverage"], 1
)
number_of_windows *= n_windows_scalar
number_of_windows = np.ceil(number_of_windows)
@ -68,7 +74,7 @@ class WindowsRecommendations:
cost_result = self.costs.window_glazing(
number_of_windows=number_of_windows,
material=self.glazing_material,
is_secondary_glazing=is_secondary_glazing
is_secondary_glazing=is_secondary_glazing,
)
already_installed = "windows_glazing" in self.property.already_installed
@ -76,18 +82,26 @@ class WindowsRecommendations:
cost_result = override_costs(cost_result)
description = "The property already has double glazing installed. No further action is required."
else:
glazing_type = "secondary glazing" if is_secondary_glazing else "double glazing"
glazing_type = (
"secondary glazing" if is_secondary_glazing else "double glazing"
)
if self.property.windows["glazing_coverage"] in ["partial", "most"]:
description = f"Install {glazing_type} to the remaining windows"
else:
description = f"Install {glazing_type} to all windows"
if self.property.is_listed:
description += ". Secondary glazing recommended due to listed building status"
description += (
". Secondary glazing recommended due to listed building status"
)
elif self.property.is_heritage:
description += ". Secondary glazing recommended due to herigate building status"
description += (
". Secondary glazing recommended due to herigate building status"
)
elif self.property.in_conservation_area:
description += ". Secondary glazing recommended due to conservation area status"
description += (
". Secondary glazing recommended due to conservation area status"
)
self.recommendation = [
{
@ -100,6 +114,6 @@ class WindowsRecommendations:
"sap_points": None,
"already_installed": already_installed,
**cost_result,
"is_secondary_glazing": is_secondary_glazing
"is_secondary_glazing": is_secondary_glazing,
}
]