Merge pull request #346 from Hestia-Homes/etl-michael-fix

add temp fix for None value instead of string none - should come from…
This commit is contained in:
KhalimCK 2024-10-04 09:25:32 +01:00 committed by GitHub
commit ea84e5bf99
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 145 additions and 82 deletions

View file

@ -203,11 +203,11 @@ class TrainingDataset(BaseDataset):
common_cols = [[col + "_starting", col + "_ending"] for col in common_cols]
self.df = self.df.loc[
:,
no_suffix_cols
+ only_ending_cols
+ [col for cols in common_cols for col in cols],
]
:,
no_suffix_cols
+ only_ending_cols
+ [col for cols in common_cols for col in cols],
]
def _remove_abnormal_change_in_floor_area(self):
"""
@ -511,7 +511,7 @@ class TrainingDataset(BaseDataset):
expanded_df["is_sandstone_or_limestone"]
== expanded_df["is_sandstone_or_limestone_ending"]
)
]
]
elif component == "floor":
expanded_df = expanded_df[
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"])
@ -528,7 +528,7 @@ class TrainingDataset(BaseDataset):
expanded_df["is_to_external_air"]
== expanded_df["is_to_external_air_ending"]
)
]
]
elif component == "roof":
expanded_df = expanded_df[
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"])
@ -541,7 +541,7 @@ class TrainingDataset(BaseDataset):
expanded_df["has_dwelling_above"]
== expanded_df["has_dwelling_above_ending"]
)
]
]
return expanded_df

View file

@ -7,10 +7,18 @@ import numpy as np
import pandas as pd
from recommendations.rdsap_tables import (
epc_wall_description_map, wall_uvalues_df, default_wall_thickness, table_s9 as s9, table_s10 as s10,
table_s11 as s11, table_s12 as s12
epc_wall_description_map,
wall_uvalues_df,
default_wall_thickness,
table_s9 as s9,
table_s10 as s10,
table_s11 as s11,
table_s12 as s12,
)
from recommendations.config import (
PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION,
PARTIAL_CAVITY_DESCRIPTIONS,
)
from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION, PARTIAL_CAVITY_DESCRIPTIONS
def r_value_per_mm_to_u_value(depth_mm: int, r_value_per_mm: float):
@ -62,7 +70,9 @@ def calculate_u_value_uplift(u_value, insulation_u_value):
return u_value_uplift, new_u_value
def is_diminishing_returns(recommendations, new_u_value, lowest_selected_u_value, diminishing_returns_u_value):
def is_diminishing_returns(
recommendations, new_u_value, lowest_selected_u_value, diminishing_returns_u_value
):
"""
What are defines diminishing returns?
1) The new u value is lower than the lowest selected u value
@ -136,9 +146,15 @@ def apply_formula_s_5_1_1(is_granite_or_whinstone, is_sandstone_or_limestone, ag
S.5.1.1
"""
stone_wall_thickness = [x for x in default_wall_thickness if x["type"] == "stone"][0]
stone_wall_thickness = [x for x in default_wall_thickness if x["type"] == "stone"][
0
]
thickness = stone_wall_thickness["J_K_L"] if age_band in ["J", "L", "L"] else stone_wall_thickness[age_band]
thickness = (
stone_wall_thickness["J_K_L"]
if age_band in ["J", "L", "L"]
else stone_wall_thickness[age_band]
)
if is_granite_or_whinstone:
return 3.3 - 0.002 * thickness
@ -146,7 +162,9 @@ def apply_formula_s_5_1_1(is_granite_or_whinstone, is_sandstone_or_limestone, ag
if is_sandstone_or_limestone:
return 3 - 0.002 * thickness
raise ValueError("This should only be called when is_granite_or_whinstone or is_sandstone_or_limestone is True")
raise ValueError(
"This should only be called when is_granite_or_whinstone or is_sandstone_or_limestone is True"
)
def get_wall_u_value(
@ -164,16 +182,30 @@ def get_wall_u_value(
if clean_description in PARTIAL_CAVITY_DESCRIPTIONS:
# If we have a partial cavity fill, we linearly interpolate the u-value. This isn't necessarily the perfect
# method and how we do this should be explored, however we want to distinguish between the old
filled_uvalue = float(wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Filled cavity"][age_band].values[0])
unfilled_uvalue = float(wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Cavity as built"][age_band].values[0])
filled_uvalue = float(
wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Filled cavity"][
age_band
].values[0]
)
unfilled_uvalue = float(
wall_uvalues_df[wall_uvalues_df["Wall_type"] == "Cavity as built"][
age_band
].values[0]
)
mapped_value = str(
unfilled_uvalue - (PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION * (unfilled_uvalue - filled_uvalue))
unfilled_uvalue
- (
PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION
* (unfilled_uvalue - filled_uvalue)
)
)
else:
mapped_description = epc_wall_description_map[clean_description]
mapped_value = wall_uvalues_df[wall_uvalues_df["Wall_type"] == mapped_description][age_band].values[0]
mapped_value = wall_uvalues_df[
wall_uvalues_df["Wall_type"] == mapped_description
][age_band].values[0]
if pd.isnull(mapped_value) and "Park home" in mapped_description:
# We don't know enough in this case so we default to 0
@ -185,17 +217,19 @@ def get_wall_u_value(
apply_formula_s_5_1_1(
is_granite_or_whinstone=is_granite_or_whinstone,
is_sandstone_or_limestone=is_sandstone_or_limestone,
age_band=age_band
age_band=age_band,
)
)
if "b" in mapped_value:
potential_uvalue = float(mapped_value.replace("b", ""))
formula_uvalue = float(apply_formula_s_5_1_1(
is_granite_or_whinstone=is_granite_or_whinstone,
is_sandstone_or_limestone=is_sandstone_or_limestone,
age_band=age_band
))
formula_uvalue = float(
apply_formula_s_5_1_1(
is_granite_or_whinstone=is_granite_or_whinstone,
is_sandstone_or_limestone=is_sandstone_or_limestone,
age_band=age_band,
)
)
return min(potential_uvalue, formula_uvalue)
if mapped_value == "s1.1.2":
@ -205,11 +239,16 @@ def get_wall_u_value(
return float(mapped_value)
def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched, is_at_rafters):
def get_u_value_from_s9(
thickness, s9, is_loft, is_roof_room, is_thatched, is_at_rafters
):
"""Get the U-value from table S9 based on the insulation thickness."""
# If the roof as pitched & insulated at the rafters, it's a room roof
if is_roof_room or is_at_rafters:
# TODO: We get None instead of a string none, this should be fixed
if thickness is None:
thickness = "none"
# We re-map the thickness
thickness_map = {
"below average": "50",
@ -233,10 +272,14 @@ def get_u_value_from_s9(thickness, s9, is_loft, is_roof_room, is_thatched, is_at
return None
# Determine the column to refer based on the roof type
column = 'Thatched_roof_U_value_W_m2K' if is_thatched else 'Slates_or_tiles_U_value_W_m2K'
column = (
"Thatched_roof_U_value_W_m2K"
if is_thatched
else "Slates_or_tiles_U_value_W_m2K"
)
# Get the correct U-value based on the insulation thickness
return s9[s9['Insulation_thickness_mm'] >= thickness][column].iloc[0]
return s9[s9["Insulation_thickness_mm"] >= thickness][column].iloc[0]
def get_roof_u_value(
@ -249,7 +292,7 @@ def get_roof_u_value(
is_flat,
is_pitched,
is_at_rafters,
**kwargs
**kwargs,
):
"""
Determine the U-value for a roof based on the description dictionary and age band.
@ -292,7 +335,7 @@ def get_roof_u_value(
is_loft=is_loft,
is_roof_room=is_roof_room,
is_thatched=is_thatched,
is_at_rafters=is_at_rafters
is_at_rafters=is_at_rafters,
)
if u_value is not None:
@ -302,25 +345,25 @@ def get_roof_u_value(
# Define the columns to be used based on the description details
if is_flat:
column = 'Flat_roof'
column = "Flat_roof"
elif is_thatched:
if is_roof_room:
column = 'Thatched_roof_room_in_roof'
column = "Thatched_roof_room_in_roof"
else:
column = 'Thatched_roof'
column = "Thatched_roof"
elif is_roof_room:
column = 'Room_in_roof_slates_or_tiles'
column = "Room_in_roof_slates_or_tiles"
elif is_pitched:
if is_at_rafters:
column = 'Pitched_slates_or_tiles_insulation_at_rafters'
column = "Pitched_slates_or_tiles_insulation_at_rafters"
else:
column = 'Pitched_slates_or_tiles_insulation_between_joists_or_unknown'
column = "Pitched_slates_or_tiles_insulation_between_joists_or_unknown"
else:
# Default to pitched roof with insulation between joists or unknown
column = 'Pitched_slates_or_tiles_insulation_between_joists_or_unknown'
column = "Pitched_slates_or_tiles_insulation_between_joists_or_unknown"
# Get the U-value from table S10 based on the age band and the determined column
u_value = s10.loc[s10['Age_band'].str.contains(age_band), column].values[0]
u_value = s10.loc[s10["Age_band"].str.contains(age_band), column].values[0]
return float(u_value)
@ -397,10 +440,14 @@ def get_exposed_floor_uvalue(insulation_thickness_str, age_band):
else:
insulation_thickness = int(insulation_thickness_str.replace("mm", ""))
return s12[s12["age_band"] == age_band][f"insulation_{insulation_thickness}"].values[0]
return s12[s12["age_band"] == age_band][
f"insulation_{insulation_thickness}"
].values[0]
def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulation_thickness=None):
def get_floor_u_value(
floor_type, area, perimeter, age_band, wall_type, insulation_thickness=None
):
"""
Estimate the u-value of a suspended floor, based on RdSap methodology
Default U-value for UNINSULATED suspended floor, based on RdSAP methodology
@ -446,14 +493,19 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati
Rsi = 0.17 # in m²K/W
Rse = 0.04 # in m²K/W
lambda_ins = 0.035 # thermal conductivity of floor insulation in W/m·K
wall_thickness = [x[age_band] for x in default_wall_thickness if x["type"] == wall_type][0]
wall_thickness = [
x[age_band] for x in default_wall_thickness if x["type"] == wall_type
][0]
if wall_thickness is None and wall_type == "park home":
# We don't know enough and likely won't make recommendations
return 0
wall_thickness = wall_thickness / 1000
if insulation_thickness is None:
insulation_lookup = s11[s11["Age_band"].str.contains(age_band) & s11["Floor_construction"] == floor_type]
insulation_lookup = s11[
s11["Age_band"].str.contains(age_band) & s11["Floor_construction"]
== floor_type
]
if insulation_lookup.empty:
insulation_thickness = 0
else:
@ -465,7 +517,7 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati
# Calculate B
B = 2 * area / perimeter
if floor_type == 'solid':
if floor_type == "solid":
# Calculate dt
dt = wall_thickness + lambda_g * (Rsi + Rf + Rse)
@ -475,7 +527,7 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati
else:
U = lambda_g / (0.457 * B + dt)
elif floor_type == 'suspended':
elif floor_type == "suspended":
# Define additional constants for suspended floors
h = 0.3 # height above external ground level in meters
v = 5 # average wind speed at 10 m height in m/s
@ -498,7 +550,9 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati
U = 1 / (2 * Rsi + Rf + 1 / (Ug + Ux))
else:
raise ValueError("Invalid floor type. Acceptable values are 'solid' or 'suspended'.")
raise ValueError(
"Invalid floor type. Acceptable values are 'solid' or 'suspended'."
)
return round(U, 2) # rounding U value to two decimal places
@ -509,7 +563,13 @@ def extract_insulation_thickness(insulation_thickness_str):
:param insulation_thickness_str:
:return:
"""
if insulation_thickness_str in ["none", "average", "below average", "above average", None]:
if insulation_thickness_str in [
"none",
"average",
"below average",
"above average",
None,
]:
return None
if isinstance(insulation_thickness_str, (float, int)):
@ -527,7 +587,7 @@ def get_wall_type(
is_cob,
is_system_built,
is_park_home,
**kwargs
**kwargs,
) -> Union[str, None]:
"""
Converts booleans to a string wall type, for querying the wall thickness table
@ -573,10 +633,10 @@ def estimate_external_wall_area(num_floors, floor_height, perimeter, built_form)
total_wall_area = wall_area_one_floor * num_floors
number_exposed_walls = {
'End-Terrace': 3,
'Mid-Terrace': 2,
'Semi-Detached': 3,
'Detached': 4,
"End-Terrace": 3,
"Mid-Terrace": 2,
"Semi-Detached": 3,
"Detached": 4,
}
exposed_wall_area = total_wall_area * (number_exposed_walls.get(built_form, 3) / 4)
@ -622,27 +682,12 @@ def convert_thickness_to_numeric(string_thickness, is_pitched, is_flat):
return 0
if is_pitched:
lookup = {
"none": 0,
"below average": 50,
"average": 100,
"above average": 270
}
lookup = {"none": 0, "below average": 50, "average": 100, "above average": 270}
elif is_flat:
# For a flat roof, if it's below average, we assume it's 0 and requires a re-roof
lookup = {
"none": 0,
"below average": 0,
"average": 100,
"above average": 150
}
lookup = {"none": 0, "below average": 0, "average": 100, "above average": 150}
else:
lookup = {
"none": 0,
"below average": 100,
"average": 270,
"above average": 270
}
lookup = {"none": 0, "below average": 100, "average": 270, "above average": 270}
mapped = lookup.get(string_thickness)
@ -697,11 +742,16 @@ def estimate_windows(
# Assuming most houses will have at least one kitchen and one bathroom
# Scale non-habitable windows with the number of habitable rooms
non_habitable_base = 2 # Base for kitchen and bathroom
extra_non_habitable = max(0, (number_habitable_rooms - 3) // 2) # Extra for large houses
extra_non_habitable = max(
0, (number_habitable_rooms - 3) // 2
) # Extra for large houses
window_count += non_habitable_base + extra_non_habitable
# Adjustments based on built form and property type
if property_type in ["House", "Bungalow"] and built_form in ["Semi-Detached", "Detached"]:
if property_type in ["House", "Bungalow"] and built_form in [
"Semi-Detached",
"Detached",
]:
built_form_lookup = {
"Semi-Detached": 3,
"Detached": 4,
@ -728,7 +778,10 @@ def estimate_windows(
window_count += 2
# Adjust for construction age band
if construction_age_band in ["England and Wales: before 1900", "England and Wales: 1900-1929"]:
if construction_age_band in [
"England and Wales: before 1900",
"England and Wales: 1900-1929",
]:
# Older houses with smaller, more numerous windows
window_count += 1
@ -751,7 +804,11 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned):
df = []
for x in all_epcs:
# Get the cleaned mapping
mapped = [y for y in cleaned["walls-description"] if y["original_description"] == x["walls-description"]]
mapped = [
y
for y in cleaned["walls-description"]
if y["original_description"] == x["walls-description"]
]
if not mapped:
continue
df.append(
@ -768,7 +825,9 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned):
return cavity_age
def check_simulation_difference(old_config, new_config, prefix="", keys_with_prefix=None):
def check_simulation_difference(
old_config, new_config, prefix="", keys_with_prefix=None
):
"""
Given two dictionaries, that describe the heating control configurations, this method will compare the two
and pick out the differences. These differences will be things that have been added and things that have been
@ -777,14 +836,17 @@ def check_simulation_difference(old_config, new_config, prefix="", keys_with_pre
"""
keys_with_prefix = (
["is_assumed", "thermal_transmittance", "insulation_thickness"] if keys_with_prefix is None
["is_assumed", "thermal_transmittance", "insulation_thickness"]
if keys_with_prefix is None
else keys_with_prefix
)
differences = {}
for key in new_config:
if old_config[key] != new_config[key]:
new_key = prefix + key + "_ending" if key in keys_with_prefix else key + "_ending"
new_key = (
prefix + key + "_ending" if key in keys_with_prefix else key + "_ending"
)
differences[new_key] = new_config[key]
return differences
@ -811,17 +873,18 @@ def combine_recommendation_configs(recommendation_config1, recommendation_config
"""
# Efficiency values - keys which contain _energy_eff_ending
eff_1 = {
k: v for k, v in recommendation_config1.items() if ("_energy_eff_ending" in k) or ("-energy-eff" in k)
k: v
for k, v in recommendation_config1.items()
if ("_energy_eff_ending" in k) or ("-energy-eff" in k)
}
eff_2 = {
k: v for k, v in recommendation_config2.items() if ("_energy_eff_ending" in k) or ("-energy-eff" in k)
k: v
for k, v in recommendation_config2.items()
if ("_energy_eff_ending" in k) or ("-energy-eff" in k)
}
# We combine the simulation configs
combined = {
**recommendation_config1,
**recommendation_config2
}
combined = {**recommendation_config1, **recommendation_config2}
# Find overlapping keys
overlapping_keys = set(eff_1.keys()).intersection(set(eff_2.keys()))