added simulation_config convention to wall recommendations

This commit is contained in:
Khalim Conn-Kowlessar 2024-05-28 17:22:29 +01:00
parent a2586ab4b6
commit 0a3055d70b
5 changed files with 176 additions and 72 deletions

View file

@ -353,55 +353,6 @@ class Property:
for recommendation in recommendations:
# For the list of recommendations we have, we iteratively update the output
# We update the description to indicate it's insulated
if recommendation["type"] in [
"internal_wall_insulation",
"external_wall_insulation",
"cavity_wall_insulation",
]:
# # If we have a non-incasive recommendation that the cavity wall is partially filled, we skip the
# # cavity wall insulation recommendation (since on the EPC, the property will look like how it did
# # before any works)
# if "cavity_surveyed_as_filled_is_partial" in non_invasive_recommendations:
# continue
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
output["walls_thermal_transmittance_ending"] = recommendation[
"new_u_value"
]
# Setting the insulation thickness here to above average should be tested further because we
# don't see a high volume of instances for this
output["walls_insulation_thickness_ending"] = "average"
# In some edge cases, or when running the mds report we might see the energy efficiency already
# in Good or Very Good
if output["walls_energy_eff_ending"] not in ["Good", "Very Good"]:
output["walls_energy_eff_ending"] = "Good"
# TODO TEMP - should be ending?
output["is_as_built"] = False
# Note: often when the wall is insulatied, the internal/external insulation is not noted so we should
# test the impact of using these booleans
if recommendation["type"] == "external_wall_insulation":
output["external_insulation_ending"] = True
output["internal_insulation_ending"] = False
if recommendation["type"] == "internal_wall_insulation":
output["external_insulation_ending"] = False
output["internal_insulation_ending"] = True
if recommendation["type"] == "cavity_wall_insulation":
output["is_filled_cavity_ending"] = True
else:
if output["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["walls_insulation_thickness_ending"] is None:
output["walls_insulation_thickness_ending"] = "none"
# Update description to indicate it's insulate
if recommendation["type"] in [
"solid_floor_insulation",
@ -518,9 +469,12 @@ class Property:
)
if recommendation["type"] in [
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating"
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
]:
# We update the data, as defined in the recommendaton
if output["walls_insulation_thickness_ending"] is None:
output["walls_insulation_thickness_ending"] = "none"
simulation_config = recommendation["simulation_config"]
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning

View file

@ -739,6 +739,7 @@ async def build_mds(body: PlanTriggerRequest):
representative_recommendations = {}
for p in tqdm(input_properties):
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
mds = Mds(property_instance=p, materials=materials)

View file

@ -20,27 +20,39 @@ def aggregate_matches(matching_lookup, company_ownership, properties):
properties[["UPRN", "LOCAL_AUTHORITY_LABEL"]], how="left", on="UPRN"
)
counts = (
df.groupby(["Company Registration No. (1)", "Proprietor Name (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"]
df.groupby(["Company Registration No. (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"]
.count()
.reset_index(name="number_of_properties")
)
counts = counts.sort_values("number_of_properties", ascending=False)
pivot_counts = counts.pivot_table(
index=["Company Registration No. (1)", "Proprietor Name (1)"], # Rows: companies and proprietors
index=["Company Registration No. (1)"], # Rows: companies and proprietors
columns="LOCAL_AUTHORITY_LABEL", # Columns: each local authority
values="number_of_properties", # The counts of properties
fill_value=0 # Fill missing values with 0 (where there are no properties owned)
).reset_index()
total_counts = (
df.groupby(["Company Registration No. (1)", "Proprietor Name (1)"])["UPRN"]
df.groupby(["Company Registration No. (1)"])["UPRN"]
.count()
.reset_index(name="total_number_of_properties")
)
# We have cases where the same company registration number results in the same company name, so we produce a best
# name per company registration number
best_names = (
df.groupby(["Company Registration No. (1)"])["Proprietor Name (1)"]
.first()
.reset_index()
)
total_counts = best_names.merge(
total_counts, how="left", on=["Company Registration No. (1)"]
)
pivot_counts = pivot_counts.merge(
total_counts, how="left", on=["Company Registration No. (1)", "Proprietor Name (1)"]
total_counts, how="left", on=["Company Registration No. (1)"]
)
pivot_counts = pivot_counts.sort_values("total_number_of_properties", ascending=False)
@ -187,7 +199,45 @@ def remove_duplicate_matches(matching_lookup, properties, company_ownership):
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return merged
return matching_lookup
def remove_duplicate_uprn_matches(matching_lookup, properties, company_ownership):
dupe_uprns = matching_lookup[matching_lookup["UPRN"].duplicated()]["UPRN"].unique().tolist()
to_drop = []
for dupe_uprn in dupe_uprns:
dupe_data = matching_lookup[matching_lookup["UPRN"] == dupe_uprn].copy()
matched_addresses = dupe_data.merge(
properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}),
how="left", on="UPRN"
).merge(
company_ownership[["Title Number", "Property Address"]],
how="left", on="Title Number"
)
# We perform levenstein to get the best match
best_match = levenstein_match(
matching_string=matched_addresses["Property Address"].values[0],
df=matched_addresses,
address_col="epc_address"
)
matches_to_drop = matched_addresses[
~matched_addresses["Title Number"].isin(best_match["Title Number"].values)
]
to_drop.append(
matches_to_drop[["UPRN", "Title Number"]].copy()
)
to_drop = pd.concat(to_drop)
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return merged
@ -254,6 +304,9 @@ def app():
properties = properties[
properties["TENURE"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"])
]
# We have some duplicated on UPRN
# Take the newest UPRN
properties = properties.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN")
# Remove entries where the address begins with the term "land adjoining", or other records that don't reference the
# the property itself
@ -354,16 +407,29 @@ def app():
freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup)
leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup)
shared_leasehold_match = pd.concat(shared_leasehold_match)
shared_freehold_match = pd.concat(shared_freehold_match)
# freehold_matching_lookup.to_excel("freehold_matching_lookup_new.xlsx")
# leasehold_matching_lookup.to_excel("leasehold_matching_lookup_new.xlsx")
# shared_leasehold_match.to_excel("shared_leasehold_match_new.xlsx")
# shared_freehold_match.to_excel("shared_freehold_match_new.xlsx")
# The approximate matches aren't very good
freehold_matching_lookup = freehold_matching_lookup[freehold_matching_lookup["match_type"] == "exact"]
leasehold_matching_lookup = leasehold_matching_lookup[leasehold_matching_lookup["match_type"] == "exact"]
# There are some cases where we have duplicates
freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership)
leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership)
# Combine
combined_matching_lookup = pd.concat([freehold_matching_lookup, leasehold_matching_lookup])
# Remove duplicates
combined_matching_lookup = remove_duplicate_matches(combined_matching_lookup, properties, company_ownership)
# We also have duplicates at a UPRN level
combined_matching_lookup = remove_duplicate_uprn_matches(combined_matching_lookup, properties, company_ownership)
matched_addresses = pd.concat([freehold_matching_lookup, leasehold_matching_lookup]).merge(
# There are some cases where we have duplicates
# freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership)
# leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership)
matched_addresses = combined_matching_lookup.merge(
properties[["UPRN", "ADDRESS", "CURRENT_ENERGY_EFFICIENCY", "CURRENT_ENERGY_RATING"]].rename(
columns={"ADDRESS": "epc_address"}),
how="left", on="UPRN"
@ -374,9 +440,9 @@ def app():
# shared_freehold_match = pd.DataFrame(shared_freehold_match)
# Strore these files
freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx")
leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx")
shared_leasehold_match.to_excel("shared_leasehold_match.xlsx")
# freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx")
# leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx")
# shared_leasehold_match.to_excel("shared_leasehold_match.xlsx")
# shared_freehold_match.to_excel("shared_freehold_match.xlsx")
# read the files
# freehold_matching_lookup = pd.read_excel("freehold_matching_lookup.xlsx")
@ -387,11 +453,9 @@ def app():
leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties)
combined_aggregate = aggregate_matches(
pd.concat([freehold_matching_lookup, leasehold_matching_lookup]), company_ownership, properties
combined_matching_lookup, company_ownership, properties
)
df = pd.concat([freehold_matching_lookup, leasehold_matching_lookup])
investment_20m = combined_aggregate[combined_aggregate["cumulative_value"] <= 20_500_000]
investment_50m = combined_aggregate[combined_aggregate["cumulative_value"] <= 51_000_000]
@ -403,10 +467,15 @@ def app():
matched_addresses["Company Registration No. (1)"].isin(investment_50m["Company Registration No. (1)"])
]
investment_20m_properties.to_excel("investment_20m_properties.xlsx")
investment_50m_properties.to_excel("investment_50m_properties.xlsx")
portfolio_epc_data_50m = properties[properties["UPRN"].isin(investment_50m_properties["UPRN"])]
portfolio_epc_data_20m = properties[properties["UPRN"].isin(investment_20m_properties["UPRN"])]
properties["WALLS_DESCRIPTION"].value_counts(normalize=True)
investment_20m_properties.to_excel("investment_20m_properties 28th May.xlsx", index=False)
investment_50m_properties.to_excel("investment_50m_properties 28th May.xlsx", index=False)
# Store the EPC data
portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 28th May.xlsx", index=False)
portfolio_epc_data_20m.to_excel("portfolio_epc_data_20m 28th May.xlsx", index=False)
def company_aggregation():

View file

@ -6,9 +6,10 @@ import pandas as pd
from datatypes.enums import QuantityUnits
from backend.Property import Property
from BaseUtility import Definitions
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_wall_u_value, override_costs
get_recommended_part, get_wall_u_value, override_costs, check_simulation_difference
)
from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION
from recommendations.Costs import Costs
@ -53,6 +54,24 @@ class WallRecommendations(Definitions):
# threshold
NEW_BUILD_INSULATED = 0.75
# These are the ending descriptions we consider for walls with external insulation
EXTERNALLY_INSULATED_WALL_DESCRIPTIONS = {
"solid_brick": "Solid brick, with external insulation",
"cob": "Cob, with external insulation",
"system_built": "System built, with external insulation",
"granite_or_whinstone": 'Granite or whinstone, with external insulation',
"sandstone_or_limestone": 'Sandstone or limestone, with external insulation',
}
# These are the ending descriptions we consider for walls with internal insulation
INTERNALLY_INSULATED_WALL_DESCRIPTIONS = {
"solid_brick": "Solid brick, with internal insulation",
"cob": "Cob, with internal insulation",
"system_built": "System built, with internal insulation",
"granite_or_whinstone": 'Granite or whinstone, with internal insulation',
"sandstone_or_limestone": 'Sandstone or limestone, with internal insulation',
}
def __init__(
self,
property_instance: Property,
@ -279,6 +298,21 @@ class WallRecommendations(Definitions):
# updated the new u-value with the best possible our installers have
new_u_value = max(0.31, new_u_value)
wall_ending_config = WallAttributes("Cavity wall, filled cavity").process()
simulation_config = {}
if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]:
simulation_config = {
"walls_energy_eff_ending": "Good",
"walls_thermal_transmittance_ending": new_u_value
}
walls_simulation_config = check_simulation_difference(
new_config=wall_ending_config, old_config=self.property.walls, prefix="walls_"
)
simulation_config = {**simulation_config, **walls_simulation_config}
recommendations.append(
{
"phase": phase,
@ -296,12 +330,31 @@ class WallRecommendations(Definitions):
"new_u_value": new_u_value,
"sap_points": None,
"already_installed": already_installed,
"simulation_config": simulation_config,
**cost_result
}
)
self.recommendations = recommendations
def get_internal_external_wall_description(self, description_map):
if self.property.walls["is_solid_brick"]:
return description_map["solid_brick"]
if self.property.walls["is_cob"]:
return description_map["cob"]
if self.property.walls["is_system_built"]:
return description_map["system_built"]
if self.property.walls["is_granite_or_whinstone"]:
return description_map["granite_or_whinstone"]
if self.property.walls["is_sandstone_or_limestone"]:
return description_map["sandstone_or_limestone"]
raise NotImplementedError("Not implemented yet")
def _find_insulation(self, u_value, insulation_materials, non_insulation_materials, phase):
lowest_selected_u_value = None
@ -340,6 +393,10 @@ class WallRecommendations(Definitions):
if already_installed:
cost_result = override_costs(cost_result)
new_description = self.get_internal_external_wall_description(
self.INTERNALLY_INSULATED_WALL_DESCRIPTIONS
)
elif material["type"] == "external_wall_insulation":
cost_result = self.costs.external_wall_insulation(
wall_area=self.property.insulation_wall_area,
@ -349,9 +406,28 @@ class WallRecommendations(Definitions):
already_installed = "external_wall_insulation" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
new_description = self.get_internal_external_wall_description(
self.EXTERNALLY_INSULATED_WALL_DESCRIPTIONS
)
else:
raise ValueError("Invalid material type")
wall_ending_config = WallAttributes(new_description).process()
simulation_config = {}
if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]:
simulation_config = {
"walls_thermal_transmittance_ending": new_u_value,
"walls_energy_eff_ending": "Good"
}
walls_simulation_config = check_simulation_difference(
new_config=wall_ending_config, old_config=self.property.walls, prefix="walls_"
)
simulation_config = {**simulation_config, **walls_simulation_config}
recommendations.append(
{
"phase": phase,
@ -369,6 +445,7 @@ class WallRecommendations(Definitions):
"new_u_value": new_u_value,
"already_installed": already_installed,
"sap_points": None,
"simulation_config": simulation_config,
**cost_result
}
)

View file

@ -756,15 +756,18 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned):
return cavity_age
def check_simulation_difference(old_config, new_config):
def check_simulation_difference(old_config, new_config, prefix=""):
"""
Given two dictionaries, that describe the heating control configurations, this method will compare the two
and pick out the differences. These differences will be things that have been added and things that have been
removed. This will be used to determine how we should be updating the configuration in the simulation
:return:
"""
differences = {key + "_ending": new_config[key] for key in new_config if old_config[key] != new_config[key]}
differences = {}
for key in new_config:
if old_config[key] != new_config[key]:
new_key = prefix + key + "_ending" if key == "is_assumed" else key + "_ending"
differences[new_key] = new_config[key]
return differences