Merge pull request #329 from Hestia-Homes/sfr-investigation

Sfr investigation
This commit is contained in:
KhalimCK 2024-08-02 14:18:48 +01:00 committed by GitHub
commit 19c471f614
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 791 additions and 117 deletions

View file

@ -377,7 +377,9 @@ class Property:
x["type"] != "internal_wall_insulation"
]
else:
epc_transformations = [x["description_simulation"] for x in represenative_recs_to_this_phase]
epc_transformations = [
x["description_simulation"] for x in represenative_recs_to_this_phase
]
# It is possible that we could have two simulations applied to the same descriptions
# We extract these out
@ -407,7 +409,8 @@ class Property:
continue
raise NotImplementedError(
"Already have this key in the phase_epc_transformation - implement me")
"Already have this key in the phase_epc_transformation - implement me"
)
phase_epc_transformation[k] = v
simulation_epc = self.epc_record.prepared_epc.copy()

View file

@ -9,6 +9,7 @@ from backend.app.db.functions.solar_functions import get_solar_data, store_batch
from utils.logger import setup_logger
from sklearn.preprocessing import MinMaxScaler
from recommendations.Costs import Costs
from math import sin, cos, sqrt, atan2, radians
logger = setup_logger()
@ -70,6 +71,9 @@ class GoogleSolarApi:
# Indicates if we need to store the data to the db
self.need_to_store = False
# Indicates if we think we have both units attached to a semi-detached property
self.double_property = False
def get_building_insights(self, longitude, latitude, required_quality="MEDIUM", max_retries=None):
"""
Make an API request to retrieve building insights based on the given longitude and latitude, with retry
@ -116,7 +120,7 @@ class GoogleSolarApi:
required_quality="MEDIUM",
is_building=False,
session=None,
uprn=None
uprn=None,
):
"""
Wrapper function that calls get_building_insights and extracts roof segments, with caching.
@ -147,6 +151,14 @@ class GoogleSolarApi:
# Extract key data from the insights response
self.roof_segments = self.insights_data["solarPotential"].get('roofSegmentStats', [])
# Automatically exclude north-facing segments
self.exclude_north_facing_segments()
# If a property is semi-detached, it's possible for us to include segments from an attached unit
if (property_instance.data["built-form"] == "Semi-Detached") and (
property_instance.data["extension-count"] == 0
):
self.exclude_likely_duplicate_surfaces()
self.roof_area = self.insights_data["solarPotential"]["wholeRoofStats"]['areaMeters2']
self.floor_area = self.insights_data["solarPotential"]["wholeRoofStats"]['groundAreaMeters2']
self.panel_area = (
@ -162,9 +174,6 @@ class GoogleSolarApi:
# It should be straightforward, but I'd rather see an actual instance of this happening
raise NotImplementedError("Panel wattage is not 400W - implement me")
# Automatically exclude north-facing segments
self.exclude_north_facing_segments()
self.roof_segment_indexes = [segment['segmentIndex'] for segment in self.roof_segments]
# We now start finding the solar panel configurations
@ -172,6 +181,11 @@ class GoogleSolarApi:
energy_consumption=energy_consumption, is_building=is_building, property_instance=property_instance
)
# Finally, if we have a double property, we half the data we stored area
if self.double_property:
self.roof_area = self.roof_area / 2
self.floor_area = self.floor_area / 2
def save_to_db(self, session, uprns_to_location, scenario_type):
if self.insights_data is None:
raise ValueError("No api data to store")
@ -338,7 +352,13 @@ class GoogleSolarApi:
# - surplus: this is the amount of additional energy generated, and therefore how much will be exported
# - surplus_value: the value of the surplus energy - this feeds into generation_value, when relevant
# - expected_payback_years: the number of years it will take to pay back the initial investment
lifetime_energy_consumption = energy_consumption * self.installation_life_span
# If we have a double property (i.e. the solar api has returned data for two units) we size up the solar panels
# for double the consumption, as if for two units.
if self.double_property:
lifetime_energy_consumption = energy_consumption * 2 * self.installation_life_span
else:
lifetime_energy_consumption = energy_consumption * self.installation_life_span
roi_results = []
for _, panel_config in panel_performance.iterrows():
lifetime_ac_kwh = panel_config["lifetime_ac_kwh"]
@ -408,6 +428,31 @@ class GoogleSolarApi:
panel_performance["expected_payback_years"] = np.ceil(panel_performance["expected_payback_years"]).astype(int)
if self.double_property:
# Now that we've optimise to an energy consumption that is double the original, we need to half the
# results
panel_performance["n_panels_halved"] = panel_performance["n_panels"] / 2
n_panels_required = {int(x) for x in np.floor(panel_performance["n_panels"] / 2)}
# We filter the data on this number of panels
panel_performance = panel_performance[panel_performance["n_panels_halved"].isin(n_panels_required)]
# We half the generation values
for col in [
"yearly_dc_energy",
"total_cost",
"panneled_roof_area",
"array_wattage",
"initial_ac_kwh_per_year",
"lifetime_ac_kwh",
"lifetime_dc_kwh",
"generation_value",
"generation_deficit",
"surplus"
]:
panel_performance[col] = panel_performance[col] / 2
panel_performance["n_panels"] = panel_performance["n_panels_halved"]
panel_performance = panel_performance.drop(columns=["n_panels_halved"])
self.panel_performance = panel_performance
def exclude_north_facing_segments(self):
@ -427,3 +472,78 @@ class GoogleSolarApi:
filtered_segments.append(segment)
self.roof_segments = filtered_segments
@staticmethod
def haversine(lat1, lon1, lat2, lon2):
"""
Calculate the great-circle distance between two points on the Earth
given their latitude and longitude in decimal degrees. Using haversine formula.
"""
R = 6373.0 # approximate radius of earth in km
lat1 = radians(lat1)
lon1 = radians(lon1)
lat2 = radians(lat2)
lon2 = radians(lon2)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
distance = R * c
return distance
def exclude_likely_duplicate_surfaces(self):
"""
By checking the azimuth of the segments, we can exclude any segments that are likely to be duplicates
:return:
"""
def is_similar(segment1, segment2, azimuth_tol=20):
azimuth_diff = abs(segment1['azimuthDegrees'] - segment2['azimuthDegrees'])
return azimuth_diff <= azimuth_tol
property_center = self.insights_data["center"]
deduped_segments = []
dropped_segments = []
for segment in self.roof_segments:
if not deduped_segments:
deduped_segments.append(segment)
continue
similar_segments = [s for s in deduped_segments if is_similar(segment, s)]
if not similar_segments:
deduped_segments.append(segment)
else:
# Compare distances to the property center and keep the closer segment
for similar_segment in similar_segments:
current_dist = self.haversine(
property_center['latitude'], property_center['longitude'],
segment['center']['latitude'], segment['center']['longitude']
)
similar_dist = self.haversine(
property_center['latitude'], property_center['longitude'],
similar_segment['center']['latitude'], similar_segment['center']['longitude']
)
if current_dist < similar_dist:
deduped_segments.remove(similar_segment)
deduped_segments.append(segment)
dropped_segments.append(similar_segment)
else:
dropped_segments.append(segment)
# If we have a semi-detached property that has duplicated segments, we should expect to half the number of
# segments
if len(deduped_segments) < len(self.roof_segments):
if len(deduped_segments) != len(self.roof_segments) / 2:
# We don't perform any dropping in this case
return
# Because the segments are duplicated, but the sizes aren't necessarily split perfectly in half, what
# we need to do is perform the solar analysis and then half the results. We set an indicator which
# implies we should do this
self.double_property = True

View file

@ -408,25 +408,6 @@ async def trigger_plan(body: PlanTriggerRequest):
if not input_properties:
return Response(status_code=204)
# If we have any work to do, we create a new scenario
engine_scenario = create_scenario(
session=session,
scenario={
"name": body.scenario_name,
"created_at": created_at,
"budget": body.budget,
"portfolio_id": body.portfolio_id,
"housing_type": body.housing_type,
"goal": body.goal,
"trigger_file_path": body.trigger_file_path,
"already_installed_file_path": body.already_installed_file_path,
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
@ -458,6 +439,11 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Performing solar analysis")
# TODO: Tidy this up
# TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour
# TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with
# extensions, since it doesn't seem to do a great job
# TODO: For simple properties, we should do a comparison/check between the solar API's roof area and the
# basic estimate of roof area
building_ids = [
{
"building_id": p.building_id,
@ -728,11 +714,41 @@ async def trigger_plan(body: PlanTriggerRequest):
]
recommendations[property_id] = final_recommendations
# df = []
# for rec in recommendations[list(recommendations.keys())[0]]:
# df.append(
# {
# "id": rec["recommendation_id"],
# "description": rec["description"],
# "sap": rec["sap_points"],
# }
# )
# df = pd.DataFrame(df)
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
logger.info("Uploading recommendations to the database")
# If we have any work to do, we create a new scenario
engine_scenario = create_scenario(
session=session,
scenario={
"name": body.scenario_name,
"created_at": created_at,
"budget": body.budget,
"portfolio_id": body.portfolio_id,
"housing_type": body.housing_type,
"goal": body.goal,
"trigger_file_path": body.trigger_file_path,
"already_installed_file_path": body.already_installed_file_path,
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
)
property_valuation_increases = []
session.commit()
new_epc_bands = {}

View file

@ -131,14 +131,17 @@ def app():
sample_size = 500
energy_consumption_data = []
cavity_walls_data = []
for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)):
# Skip the first 50
if i < 57:
continue
# if i < 57:
# continue
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold
data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE]

View file

@ -75,10 +75,15 @@ def find_f_g_properties(paths):
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str)
# Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this
epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], format='mixed', errors="coerce")
if pd.isnull(pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")).sum():
raise Exception("wtf")
epc_data = epc_data.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN")
# Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this
epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")
epc_data = epc_data.sort_values(
["LODGEMENT_DATE", "LODGEMENT_DATETIME"], ascending=False
).drop_duplicates("UPRN")
# Get G & F properties
epc_data = epc_data[epc_data["CURRENT_ENERGY_RATING"].isin(["G", "F"])]
@ -401,6 +406,8 @@ def app():
~company_ownership["Property Address"].str.lower().str.startswith(starting_term)
]
# address = properties[properties["UPRN"] == 100030253055].squeeze()
freehold_matching_lookup = [] # 634
leasehold_matching_lookup = [] # 86
shared_leasehold_match = []
@ -493,12 +500,18 @@ def app():
# freehold_matching_lookup = pd.read_excel("freehold_matching_lookup V2.xlsx")
# leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup V2.xlsx")
# freehold_matching_lookup.shape
# (1537, 4)
# leasehold_matching_lookup.shape
# (390, 4)
# The approximate matches aren't very good
freehold_matching_lookup = freehold_matching_lookup[freehold_matching_lookup["match_type"] == "exact"]
leasehold_matching_lookup = leasehold_matching_lookup[leasehold_matching_lookup["match_type"] == "exact"]
# Combine
combined_matching_lookup = pd.concat([freehold_matching_lookup, leasehold_matching_lookup])
# Remove duplicates
combined_matching_lookup = remove_duplicate_matches(
matching_lookup=combined_matching_lookup, properties=properties, company_ownership=company_ownership
@ -566,7 +579,6 @@ def app():
land_registry_matches = []
for _, match in tqdm(matched_addresses.iterrows(), total=len(matched_addresses)):
# Filter land registry on the postcode
lr_filtered = land_registry[
(land_registry["postcode"] == match["epc_postcode"].lower().strip())
@ -782,7 +794,7 @@ def app():
right_on="uprn"
).drop(columns=["uprn"])
# Flat anything that sold in the last year
# Flag anything that sold in the last year
matched_addresses["sold_recently"] = (
matched_addresses["date_of_transfer"] >= pd.Timestamp.now() - pd.DateOffset(years=1)
)
@ -792,6 +804,9 @@ def app():
(matched_addresses["TRANSACTION_TYPE"].isin(["marketed sale", "non marketed sale"]))
)
# Save this
# matched_addresses.to_excel("combined_aggregate - pre filter 28th July.xlsx", index=False)
# Drop rows on the booleans
matched_addresses = matched_addresses[
~matched_addresses["sold_recently"] &
@ -835,7 +850,7 @@ def app():
# investment_50m_properties.to_excel("investment_50m_properties 28th July.xlsx", index=False)
# Store the EPC data
# portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 29th July.xlsx", index=False)
# portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 28th July.xlsx", index=False)
# We check if any of these properties are in a conservation area
valuations = pd.read_excel("property value.xlsx")
@ -997,3 +1012,230 @@ def prepare_anonymised_data():
)
df.to_excel("Property List - 50% redacted.xlsx", index=False)
def adhoc_change_of_portfolio_analysis_july_2024():
"""
This is just some adhoc analysis, which answers some questions which arose upon refreshing the SFR portfolio
in late July 2024
:return:
"""
# Question 1: Which properties in the previous portfolio were in conservation areas or had listed/heritage status?
def answer_q1():
# Data was just stored here:
geospatial_data = pd.read_excel("geospatial_data.xlsx")
special_buildings = geospatial_data[
(geospatial_data["conservation_status"] == 1) |
geospatial_data["is_listed_building"] |
geospatial_data["is_heritage_building"]
]
print(
f"There were {special_buildings.shape[0]} properties in the previous portfolio which were in conservation "
f"areas or had listed/heritage status"
)
print(f"{(special_buildings['conservation_status'] == 1).sum()} were in a conservation area")
print(f"{special_buildings['is_listed_building'].sum()} were listed buildings")
print(f"{special_buildings['is_heritage_building'].sum()} were heritage buildings")
answer_q1()
# Question 2: For each property in the old portfolio, why was it lost?
def answer_q2():
# We read in the previous 50m portfolio
previous_portfolio = pd.read_excel("investment_50m_properties 28th May.xlsx") # 39 owners
new_matched_addresses = pd.read_excel("combined_aggregate - pre filter 28th July.xlsx")
new_portfolio = pd.read_excel("investment_50m_properties 28th July.xlsx") # 69 owners
# dropped units
dropped_units = previous_portfolio[
~previous_portfolio["UPRN"].isin(new_portfolio["UPRN"].values)
]
# Lots of properties are missed out - why
# 1) What was dropped, but was in the matched addresses and therefore was maybe filtered out
dropped_units_matched = dropped_units[
dropped_units["UPRN"].isin(new_matched_addresses["UPRN"])
].copy()
dropped_units_matched = dropped_units_matched.merge(
new_matched_addresses[
["UPRN", 'transaction_id', 'price', 'date_of_transfer', 'sold_recently', 'sale_lodged_recently']
],
how="left", on="UPRN"
)
# 97 units here - how mant were sold
of_which_sold = dropped_units_matched[
dropped_units_matched["sold_recently"]
]
n_sold = of_which_sold.shape[0]
print(f"{n_sold} sold recently ({n_sold / previous_portfolio.shape[0] * 100})%")
of_which_have_sale_epc_but_not_sold = dropped_units_matched[
~dropped_units_matched["sold_recently"] & dropped_units_matched["sale_lodged_recently"]
]
n_with_sale_epc_but_not_yet_sold = of_which_have_sale_epc_but_not_sold.shape[0]
print(
f"{n_with_sale_epc_but_not_yet_sold} have a sale EPC but have not sold yet ("
f"{n_with_sale_epc_but_not_yet_sold / previous_portfolio.shape[0] * 100})%"
)
# What about things that haven't sold or don't look likely to sell
not_sold = dropped_units_matched[
~dropped_units_matched["sold_recently"] & ~dropped_units_matched["sale_lodged_recently"]
]
new_owner_sizes = new_portfolio.groupby(
["Company Registration No. (1)"]
).size().reset_index().rename(columns={0: "Number of Properties"})
new_owner_sizes = new_owner_sizes.sort_values("Number of Properties", ascending=False)
previous_owner_sizes = previous_portfolio.groupby(
["Company Registration No. (1)"]
).size().reset_index().rename(columns={0: "Number of Properties"})
previous_owner_sizes = previous_owner_sizes.sort_values("Number of Properties", ascending=False)
# Let's just confirm that we took in a bigger owner, as we see this unit was still matched
owner_too_small = []
owner_big_enough = []
for _, property in not_sold.iterrows():
owner_reg_id = property["Company Registration No. (1)"]
old_portfolio_owner_size = previous_owner_sizes[
previous_owner_sizes["Company Registration No. (1)"] == owner_reg_id
]
# We make sure that the number of properties is smaller than the new smallest number
if (
old_portfolio_owner_size["Number of Properties"].values[0] >
new_owner_sizes["Number of Properties"].min()
):
owner_big_enough.append(property.to_dict())
continue
owner_too_small.append(property.to_dict())
n_owner_too_small = len(owner_too_small)
owner_big_enough = pd.DataFrame(owner_big_enough)
summary = []
for _, record in owner_big_enough.iterrows():
# Do we have this new owner?
new_owner = new_portfolio[
new_portfolio["Company Registration No. (1)"] == record["Company Registration No. (1)"]
]
if new_owner.empty:
# Why don't we have this new owner
new_owner_data = new_matched_addresses[
new_matched_addresses["Company Registration No. (1)"] == record["Company Registration No. (1)"]
]
new_owner_data_filtered = new_owner_data[
~new_owner_data["sold_recently"] & ~new_owner_data["sale_lodged_recently"]
]
summary.append(
{
"Owner Name": record["Proprietor Name (1)"],
"Owner reg id": record["Company Registration No. (1)"],
"N properties in new portfolio before filtering": new_owner_data.shape[0],
"N properties in new portfolio after filtering": new_owner_data_filtered.shape[0],
}
)
continue
raise Exception("something went wrong")
summary = pd.DataFrame(summary)
not_accounted_for = summary[
(
summary["N properties in new portfolio before filtering"] <
previous_owner_sizes["Number of Properties"].min()
)
]
# We have two owners not accounted for:
# ALLMID LIMITED, 01959058
# CORAL RACING LIMITED, 541600
# What happened to these owners?
new_epc = pd.read_excel("EPC F & G Properties - V2.xlsx")
allmid = previous_portfolio[previous_portfolio["Company Registration No. (1)"] == "01959058"].copy()
# Check if any of the properties are not in the new EPC data
allmid["not_in_new_epc"] = ~allmid["UPRN"].isin(new_epc["UPRN"])
allmid["not_in_matched_pre_filtered"] = ~allmid["UPRN"].isin(new_matched_addresses["UPRN"])
# In the previous portfolio, Allmid had 4 properties and in the re-build, it has just 2. Why?
# Firstly, one of their properties was re-surveyed not at an F/G
# Secondly, one of their properties is no longer owned by them:
# https://www.zoopla.co.uk/property/uprn/100070553074/
# So as an owner, they fell out of the ranking
coral_racing = previous_portfolio[previous_portfolio["Company Registration No. (1)"] == "541600"].copy()
coral_racing["not_in_new_epc"] = ~coral_racing["UPRN"].isin(new_epc["UPRN"])
coral_racing["not_in_matched_pre_filtered"] = ~coral_racing["UPRN"].isin(new_matched_addresses["UPRN"])
# Coral goes down from 4 -> 1 on refresh, so what happened?
# 1) 2 properties had new EPCs and re-scored higher
# 2) 1 property, 85A Market Street, Church Gresley, Swadlincote, DE11 9PN is no longer matched to the ownership
# data, which is correct
# Why were these units lost?
# There's just 1 owner, who is BARHAM PROPERTY LTD
owner_too_big_ids = owner_big_enough["Company Registration No. (1)"].unique()
owner_too_big_names = owner_big_enough["Proprietor Name (1)"].unique()
previous_owner_size = previous_owner_sizes[
previous_owner_sizes["Company Registration No. (1)"].isin(owner_too_big_ids)
]
new_owner_size = new_matched_addresses[
new_matched_addresses["Company Registration No. (1)"].isin(owner_too_big_ids) |
new_matched_addresses["Proprietor Name (1)"].isin(owner_too_big_names)
]
n_unsold = new_owner_size[~new_owner_size["sold_recently"] & ~new_owner_size["sale_lodged_recently"]].shape
# Happy with the justification to this point
assert (
(n_sold + n_with_sale_epc_but_not_yet_sold + n_owner_too_small + len(owner_big_enough)) ==
dropped_units_matched.shape[0]
)
# We now have a list of properties that were lost from the previous iteration to the next that were not matched
dropped_units_unmatched = dropped_units[
~dropped_units["UPRN"].isin(new_matched_addresses["UPRN"])
].copy()
# A few possibilities: They aren't in the EPC data?
new_epc = pd.read_excel("EPC F & G Properties - V2.xlsx")
unmatched_not_in_epc = dropped_units_unmatched[
~dropped_units_unmatched["UPRN"].isin(new_epc["UPRN"])
]
# There are 17 units that have had new EPCs above a G
# Who were the owners? - various, nothing particularly remarkable
(
previous_portfolio[
previous_portfolio["UPRN"].isin(unmatched_not_in_epc["UPRN"])
]["Proprietor Name (1)"].value_counts()
)
# 22 final units to be accounted for...!
unmatched_in_epc = dropped_units_unmatched[
dropped_units_unmatched["UPRN"].isin(new_epc["UPRN"])
]
# Some of them will be due to ownership
# TODO: Read in freehold/leashold data and see how many of these were non-exact matches!
leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup V2.xlsx")
freehold_matching_lookup = pd.read_excel("freehold_matching_lookup V2.xlsx")
combined_matching_lookup = pd.concat([leasehold_matching_lookup, freehold_matching_lookup])
# THis is 13 matches, all of them approximate
weak_matches = unmatched_in_epc.merge(combined_matching_lookup, how="inner", on="UPRN")
# These have been lost due to ownership updates. This has been checked manually for every unit and there has
# been sale activity for each one, justifying the change in ownership data
remaining_matches = unmatched_in_epc[
~unmatched_in_epc["UPRN"].isin(weak_matches["UPRN"])
]
assert dropped_units.shape[0] == (
(n_sold + n_with_sale_epc_but_not_yet_sold + n_owner_too_small + len(owner_big_enough)) + len(
weak_matches) + unmatched_not_in_epc.shape[0]
)

View file

@ -0,0 +1,90 @@
import inspect
import pandas as pd
from tqdm import tqdm
from pathlib import Path
src_file_path = inspect.getfile(lambda: None)
EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates"
def app():
# For EPCs lodged from 2020 onwards, this collects data on the energy efficiency categories for wall insulation
# so that when we simulate, we know what the resulting energy efficiency category will be
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
date_cutoff = "2020-01-01"
walls_data = []
ashp_data = []
for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
insulated_walls = data[
data["walls-description"].isin(
[
"Cavity wall, filled cavity",
"Solid brick, with internal insulation",
"Solid brick, with external insulation",
]
)
]
insulated_walls = insulated_walls[~pd.isnull(insulated_walls["uprn"])]
insulated_walls = insulated_walls[
pd.to_datetime(insulated_walls["lodgement-date"]) >= date_cutoff
]
ashp = data[
data["mainheat-description"] == "Air source heat pump, radiators, electric"
]
ashp = ashp[~pd.isnull(ashp["uprn"])]
ashp = ashp[
pd.to_datetime(ashp["lodgement-date"]) >= date_cutoff
]
walls_data.append(insulated_walls)
ashp_data.append(ashp)
walls_df = pd.concat(walls_data)
ashp_df = pd.concat(ashp_data)
ashp_agg = (
ashp_df.
groupby(
["construction-age-band", "mainheat-description", "mainheatcont-description", "mainheat-energy-eff",
"mainheatc-energy-eff"]
)
.size()
.reset_index()
)
ashp_agg = ashp_agg[
ashp_agg["mainheatcont-description"].isin(
["Programmer, TRVs and bypass", "Time and temperature zone control"]
)
]
aggregations = {}
for description in [
"Cavity wall, filled cavity", "Solid brick, with internal insulation", "Solid brick, with external insulation"
]:
aggregation = walls_df[
walls_df["walls-description"] == description
].groupby(
["construction-age-band", "walls-energy-eff"]
).size().reset_index().rename(columns={0: "count"})
# For each grouping of age band, we use the most populus energy efficiency category
aggregation_deduped = aggregation.sort_values("count", ascending=False).drop_duplicates("construction-age-band")
aggregations[description] = aggregation_deduped
# Since these tables are small, we just convert them to python dictionaries
# This data is just held in the wall_energy_efficiency_values script, rather than s3
df1 = aggregations["Cavity wall, filled cavity"]
df2 = aggregations["Solid brick, with internal insulation"]
df3 = aggregations["Solid brick, with external insulation"]
df1.to_dict("records")
df2.to_dict("records")
df3.to_dict("records")

View file

@ -27,7 +27,7 @@ SCENARIOS = {
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace", "solar_pv", "heating"],
"exclusions": ["floor_insulation", "fireplace", "solar_pv", "heating", 'lighting'],
"budget": None,
"scenario_name": "Low Hanging Fruit",
"multi_plan": True,
@ -42,7 +42,7 @@ SCENARIOS = {
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace"],
"exclusions": ["floor_insulation", "fireplace", 'lighting'],
"budget": None,
"scenario_name": "Deep Retrofit",
"multi_plan": True,
@ -57,7 +57,7 @@ SCENARIOS = {
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["fireplace"],
"exclusions": ["fireplace", 'lighting'],
"budget": None,
"scenario_name": "Whole House Retrofit",
"multi_plan": True,

View file

@ -64,6 +64,8 @@ SMART_APPLIANCE_THERMOSTAT_COST = 400
PROGRAMMER_COST = 120
ROOM_THERMOSTAT_COST = 150
TRVS_COST = 35
BYPASS_COST = 350 # Based on desktop research for a complex installation
# https://www.checkatrade.com/blog/cost-guides/cost-install-water-shut-off-valve/
# Cost for TTZC
# Smart thermostat based on checkatrade https://www.checkatrade.com/blog/cost-guides/cost-smart-thermostat/
@ -1254,6 +1256,34 @@ class Costs:
"labour_days": labour_days,
}
def programmer_trvs_bypass(self, number_heated_rooms, has_programmer, has_trvs, has_bypass):
total_cost = 0
labour_hours = 0
if not has_programmer:
total_cost += PROGRAMMER_COST
labour_hours += 1
if not has_trvs:
total_cost += TRVS_COST * number_heated_rooms
labour_hours += 0.25 * number_heated_rooms
if not has_bypass:
total_cost += BYPASS_COST
labour_hours += 0.5
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": 1,
}
def heater_removal(self, n_rooms):
"""
Estimates the costs of removal of heaters, including the redecoration costs of the space behind the heater

View file

@ -40,7 +40,10 @@ class HeatingControlRecommender:
return
if heating_description in ["Air source heat pump, radiators, electric"]:
# For an ASHP, we can recommend time and temperature zone controls, as well as programmer, trvs and a bypass
# which are common configurations for ASHPs
self.recommend_time_temperature_zone_controls()
self.recommend_programmer_trvs_bypass()
def recommend_room_heaters_electric_controls(self):
"""
@ -279,3 +282,55 @@ class HeatingControlRecommender:
"description_simulation": description_simulation
}
)
def recommend_programmer_trvs_bypass(self):
# We don't perform any checks here - this is likely to be used in conjunction with an ASHP recommendation
new_controls_description = "Programmer, TRVs and bypass"
ending_config = MainheatControlAttributes(new_controls_description).process()
simulation_config = check_simulation_difference(
new_config=ending_config, old_config=self.property.main_heating_controls
)
# Only adjust if the current system is below good
if self.property.data["mainheatc-energy-eff"] in ["Poor", "Very Poor"]:
simulation_config["mainheatc_energy_eff_ending"] = "Average"
else:
simulation_config["mainheatc_energy_eff_ending"] = self.property.data["mainheatc-energy-eff"]
description_simulation = {
"mainheatcont-description": new_controls_description,
"mainheatc-energy-eff": simulation_config["mainheatc_energy_eff_ending"]
}
has_programmer = self.property.main_heating_controls["switch_system"] == "programmer"
has_trvs = self.property.main_heating_controls["trvs"] is not None
has_bypass = self.property.main_heating_controls["auxiliary_systems"] == "bypass"
cost_result = self.costs.programmer_trvs_bypass(
number_heated_rooms=int(self.property.data["number-heated-rooms"]),
has_trvs=has_trvs,
has_programmer=has_programmer,
has_bypass=has_bypass
)
description = "Install a Bypass valve, TRVs and a Programmer"
already_installed = "heating_control" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
description = "Heating controls have already been upgraded, no further action needed."
self.recommendation.append(
{
"type": "heating_control",
"parts": [],
"description": description,
**cost_result,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": description_simulation
}
)

View file

@ -204,107 +204,138 @@ class HeatingRecommender:
ashp_costs[key] += controls_recommender.recommendation[0][key]
already_installed = "air_source_heat_pump" in self.property.already_installed
controls_recommendations = controls_recommender.recommendation
if already_installed or not controls_recommendations:
# We set an empty object, so we just produce one recommendation
controls_recommendations = [None]
if already_installed:
ashp_costs = override_costs(ashp_costs)
description = "The property already has an air source heat pump, no further action needed."
else:
if controls_recommender.recommendation:
description = ("Install an air source heat pump, and upgrade heating controls to Smart Thermostats, "
"room sensors and smart radiator valves (time & temperature zone control).")
else:
# This is a map from the heating controls description to the description of the air source heat pump set up
ashp_descriptions = {
"Time and temperature zone control": (
"Install an air source heat pump, and upgrade heating controls to Smart Thermostats, "
"room sensors and smart radiator valves (time & temperature zone control)."
),
"Programmer, TRVs and bypass": (
"Install an air source heat pump, with programmer, TRVs and a Bypass valve."
),
}
new_heating_description = "Air source heat pump, radiators, electric"
new_hot_water_description = "From main system"
ashp_recommendations = []
for controls_rec in controls_recommendations:
ashp_costs_with_controls = ashp_costs.copy()
if controls_rec:
for key in ashp_costs_with_controls:
ashp_costs_with_controls[key] += controls_rec[key]
if controls_rec is None:
description = "Install an air source heat pump."
elif already_installed:
description = "The property already has an air source heat pump, no further action needed."
else:
description = ashp_descriptions[controls_rec["description_simulation"]["mainheatcont-description"]]
# If the property does not have existing cavity and loft insulation, we include a note that the cost
# includes the boiler upgrade scheme and that the cavity and loft need to be treated, to ensure access
# to the funding
if has_cavity_or_loft_recommendations:
description = description + (f" The cost includes the £"
f"{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant. "
f"You must ensure that the property has an insulated cavity and "
f"270mm+ loft insulation to qualify for the grant")
description = description + (
f" The cost includes the £"
f"{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant. "
f"You must ensure that the property has an insulated cavity and "
f"270mm+ loft insulation to qualify for the grant"
)
else:
description = description + (f" The cost includes the £"
f"{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant")
description = description + (
f" The cost includes the £{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant"
)
new_heating_description = "Air source heat pump, radiators, electric"
new_hot_water_description = "From main system"
simulation_config = {
"mainheat_energy_eff_ending": "Good",
"hot_water_energy_eff_ending": "Good"
}
description_simulation = {
"mainheat-description": new_heating_description,
"mainheat-energy-eff": simulation_config["mainheat_energy_eff_ending"],
"hot-water-energy-eff": simulation_config["hot_water_energy_eff_ending"],
"hotwater-description": new_hot_water_description,
}
# Installation of a boiler improves the hot water system so we need to reflect this in
# the outcome of the recommendation
heating_ending_config = MainHeatAttributes(new_heating_description).process()
hotwater_ending_config = HotWaterAttributes(new_hot_water_description).process()
# If the property does not currently have electric main fuel, we'll simulate the change
fuel_ending_config = {}
if self.property.main_fuel["fuel_type"] != "electricity":
new_fuel_description = "electricity (not community)"
fuel_ending_config = MainFuelAttributes(new_fuel_description).process()
description_simulation = {
**description_simulation,
"main-fuel": new_fuel_description
print("TEMP UPDATED FOR 77 Perryn!!!!!")
simulation_config = {
"mainheat_energy_eff_ending": "Good",
"hot_water_energy_eff_ending": "Good"
}
description_simulation = {
"mainheat-description": new_heating_description,
"mainheat-energy-eff": simulation_config["mainheat_energy_eff_ending"],
"hot-water-energy-eff": simulation_config["hot_water_energy_eff_ending"],
"hotwater-description": new_hot_water_description,
}
# Installation of a boiler improves the hot water system so we need to reflect this in
# the outcome of the recommendation
heating_ending_config = MainHeatAttributes(new_heating_description).process()
hotwater_ending_config = HotWaterAttributes(new_hot_water_description).process()
# Check the simulation differences
heating_simulation_config = check_simulation_difference(
new_config=heating_ending_config, old_config=self.property.main_heating
)
hotwater_simulation_config = check_simulation_difference(
new_config=hotwater_ending_config, old_config=self.property.hotwater
)
fuel_simulation_config = check_simulation_difference(
new_config=fuel_ending_config, old_config=self.property.main_fuel
)
# If the property does not currently have electric main fuel, we'll simulate the change
fuel_ending_config = {}
if self.property.main_fuel["fuel_type"] != "electricity":
new_fuel_description = "electricity (not community)"
fuel_ending_config = MainFuelAttributes(new_fuel_description).process()
description_simulation = {
**description_simulation,
"main-fuel": new_fuel_description
}
simulation_config = {
**simulation_config,
**heating_simulation_config,
**hotwater_simulation_config,
**fuel_simulation_config,
}
# Check the simulation differences
heating_simulation_config = check_simulation_difference(
new_config=heating_ending_config, old_config=self.property.main_heating
)
hotwater_simulation_config = check_simulation_difference(
new_config=hotwater_ending_config, old_config=self.property.hotwater
)
fuel_simulation_config = check_simulation_difference(
new_config=fuel_ending_config, old_config=self.property.main_fuel
)
if controls_recommender.recommendation:
# We should have just the single recommendation for heat controls, which is time
# and temperature zone controls
if len(controls_recommender.recommendation) != 1:
raise NotImplementedError("More than one heat controls recommendation for air source heat pump")
simulation_config = {
**simulation_config,
**controls_recommender.recommendation[0]["simulation_config"]
**heating_simulation_config,
**hotwater_simulation_config,
**fuel_simulation_config,
}
description_simulation = {
**description_simulation,
**controls_recommender.recommendation[0]["description_simulation"]
if controls_rec is not None:
# We should have just the single recommendation for heat controls, which is time
# and temperature zone controls
simulation_config = {
**simulation_config,
**controls_rec["simulation_config"]
}
description_simulation = {
**description_simulation,
**controls_rec["description_simulation"]
}
ashp_recommendation = {
"phase": phase,
"parts": [
# TODO
],
"type": "heating",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": description_simulation,
**ashp_costs_with_controls
}
ashp_recommendation = {
"phase": phase,
"parts": [
# TODO
],
"type": "heating",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": description_simulation,
**ashp_costs
}
ashp_recommendations.append(ashp_recommendation)
if _return:
return [ashp_recommendation]
self.heating_recommendations.append(ashp_recommendation)
return [ashp_recommendations]
self.heating_recommendations.extend(ashp_recommendations)
@staticmethod
def check_simulation_difference(old_config, new_config):

View file

@ -230,6 +230,16 @@ class Recommendations:
# When check if these recommendations have two different types, such as solid wall insulation
# If we have multiple types, we group by type and then select the best recommendation for each type
# If we have a heating and heating control recommendation, we use JUST the heating reommendation
has_both_heating_types = all(
x in [rec["type"] for rec in recommendations_by_type] for x in ["heating", "heating_control"]
)
if has_both_heating_types:
# Take just heating
recommendations_by_type = [
rec for rec in recommendations_by_type if rec["type"] == "heating"
]
recommendations_by_type = sorted(recommendations_by_type, key=lambda x: x["type"])
representative_recommendations = []
for _type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]):

View file

@ -13,6 +13,7 @@ from recommendations.recommendation_utils import (
)
from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION
from recommendations.Costs import Costs
from recommendations.wall_energy_efficiency_values import cavity_wall_energy_eff, iwi_energy_eff, ewi_energy_eff
from utils.logger import setup_logger
logger = setup_logger()
@ -404,11 +405,28 @@ class WallRecommendations(Definitions):
simulation_config = {}
if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]:
if wall_ending_config["is_cavity_wall"]:
efficiency_data = [
x for x in cavity_wall_energy_eff if
x["construction-age-band"] == self.property.construction_age_band
][0]
elif wall_ending_config["internal_insulation"]:
efficiency_data = [
x for x in iwi_energy_eff if
x["construction-age-band"] == self.property.construction_age_band
][0]
else:
efficiency_data = [
x for x in ewi_energy_eff if
x["construction-age-band"] == self.property.construction_age_band
][0]
simulation_config = {
"walls_energy_eff_ending": "Good"
"walls_energy_eff_ending": efficiency_data["walls-energy-eff"]
}
# We check if we have double insulation in any instances
# TODO: We should pull the energy efficiency categories on double insulation instances, though it's quite rate
double_insulation = (
(wall_ending_config["is_filled_cavity"] and wall_ending_config["external_insulation"]) or
(wall_ending_config["is_filled_cavity"] and wall_ending_config["internal_insulation"]) or

View file

@ -0,0 +1,56 @@
cavity_wall_energy_eff = [
{'construction-age-band': 'England and Wales: 1950-1966', 'walls-energy-eff': 'Average', 'count': 605820},
{'construction-age-band': 'England and Wales: 1967-1975', 'walls-energy-eff': 'Average', 'count': 410998},
{'construction-age-band': 'England and Wales: 1930-1949', 'walls-energy-eff': 'Average', 'count': 263575},
{'construction-age-band': 'England and Wales: 1976-1982', 'walls-energy-eff': 'Good', 'count': 206654},
{'construction-age-band': 'England and Wales: 1983-1990', 'walls-energy-eff': 'Good', 'count': 106489},
{'construction-age-band': 'England and Wales: 1900-1929', 'walls-energy-eff': 'Average', 'count': 58399},
{'construction-age-band': 'England and Wales: 1991-1995', 'walls-energy-eff': 'Good', 'count': 58252},
{'construction-age-band': 'England and Wales: 1996-2002', 'walls-energy-eff': 'Good', 'count': 35141},
{'construction-age-band': 'England and Wales: 2003-2006', 'walls-energy-eff': 'Good', 'count': 7194},
{'construction-age-band': 'England and Wales: 2007-2011', 'walls-energy-eff': 'Good', 'count': 2639},
{'construction-age-band': 'England and Wales: before 1900', 'walls-energy-eff': 'Average', 'count': 2495},
{'construction-age-band': 'England and Wales: 2012 onwards', 'walls-energy-eff': 'Very Good', 'count': 1158},
{'construction-age-band': 'England and Wales: 2007 onwards', 'walls-energy-eff': 'Good', 'count': 357},
{'construction-age-band': 'INVALID!', 'walls-energy-eff': 'Very Good', 'count': 88}
]
iwi_energy_eff = [
{'construction-age-band': 'England and Wales: 1900-1929', 'walls-energy-eff': 'Good', 'count': 22415},
{'construction-age-band': 'England and Wales: before 1900', 'walls-energy-eff': 'Good',
'count': 13422},
{'construction-age-band': 'England and Wales: 1930-1949', 'walls-energy-eff': 'Good', 'count': 6640},
{'construction-age-band': 'England and Wales: 1950-1966', 'walls-energy-eff': 'Good', 'count': 1391},
{'construction-age-band': 'England and Wales: 1967-1975', 'walls-energy-eff': 'Good', 'count': 663},
{'construction-age-band': 'England and Wales: 2003-2006', 'walls-energy-eff': 'Very Good',
'count': 516},
{'construction-age-band': 'England and Wales: 2007-2011', 'walls-energy-eff': 'Very Good',
'count': 463},
{'construction-age-band': 'England and Wales: 2012 onwards', 'walls-energy-eff': 'Very Good',
'count': 353},
{'construction-age-band': 'England and Wales: 1996-2002', 'walls-energy-eff': 'Good', 'count': 218},
{'construction-age-band': 'England and Wales: 1983-1990', 'walls-energy-eff': 'Very Good',
'count': 166},
{'construction-age-band': 'England and Wales: 1976-1982', 'walls-energy-eff': 'Very Good',
'count': 121},
{'construction-age-band': 'England and Wales: 1991-1995', 'walls-energy-eff': 'Good', 'count': 104},
{'construction-age-band': 'England and Wales: 2007 onwards', 'walls-energy-eff': 'Very Good',
'count': 74}, {'construction-age-band': 'INVALID!', 'walls-energy-eff': 'Very Good', 'count': 26}
]
ewi_energy_eff = [
{'construction-age-band': 'England and Wales: 1900-1929', 'walls-energy-eff': 'Good', 'count': 18427},
{'construction-age-band': 'England and Wales: 1930-1949', 'walls-energy-eff': 'Good', 'count': 17803},
{'construction-age-band': 'England and Wales: 1950-1966', 'walls-energy-eff': 'Good', 'count': 4306},
{'construction-age-band': 'England and Wales: before 1900', 'walls-energy-eff': 'Good', 'count': 2955},
{'construction-age-band': 'England and Wales: 1967-1975', 'walls-energy-eff': 'Good', 'count': 647},
{'construction-age-band': 'England and Wales: 1976-1982', 'walls-energy-eff': 'Very Good', 'count': 188},
{'construction-age-band': 'England and Wales: 2007-2011', 'walls-energy-eff': 'Very Good', 'count': 73},
{'construction-age-band': 'England and Wales: 2003-2006', 'walls-energy-eff': 'Very Good', 'count': 49},
{'construction-age-band': 'England and Wales: 2012 onwards', 'walls-energy-eff': 'Very Good', 'count': 37},
{'construction-age-band': 'England and Wales: 1983-1990', 'walls-energy-eff': 'Good', 'count': 31},
{'construction-age-band': 'England and Wales: 1996-2002', 'walls-energy-eff': 'Very Good', 'count': 21},
{'construction-age-band': 'England and Wales: 1991-1995', 'walls-energy-eff': 'Good', 'count': 14},
{'construction-age-band': 'England and Wales: 2007 onwards', 'walls-energy-eff': 'Very Good', 'count': 8},
{'construction-age-band': 'INVALID!', 'walls-energy-eff': 'Very Good', 'count': 4}
]