Merge pull request #371 from Hestia-Homes/stonewater-eco-properties

Stonewater eco properties
This commit is contained in:
KhalimCK 2024-11-08 08:00:23 +00:00 committed by GitHub
commit e25d2f473f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 992 additions and 74 deletions

View file

@ -96,7 +96,7 @@ vartypes = {
'walls-env-eff': 'str',
'transaction-type': 'str',
# 'uprn': "Int64",
'current-energy-efficiency': 'float',
'current-energy-efficiency': 'Int64',
'energy-consumption-current': 'float',
'mainheat-description': 'str',
'lighting-cost-current': 'float',
@ -342,8 +342,12 @@ class SearchEpc:
rows_filtered = [r for r in rows if ", ".join([r["address"], r["posttown"]]) == best_match[0]]
else:
best_match = process.extractOne(address, [r["address"] for r in rows], score_cutoff=0)
# Get the UPRN for the best match
best_match_uprn = {r["uprn"] for r in rows if r["address"] == best_match[0]}.pop()
# Get all of the scores
rows_filtered = [r for r in rows if r["address"] == best_match[0]]
rows_filtered = [
r for r in rows if (r["address"] == best_match[0]) or (r["uprn"] == best_match_uprn)
]
if rows_filtered:
return rows_filtered
@ -642,6 +646,7 @@ class SearchEpc:
estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy()
estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)]
if vartype == "Int64":
# We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'"
# so this handles this
@ -653,6 +658,13 @@ class SearchEpc:
estimated_epc[key] = None
continue
if key == "floor-height":
# We speficially handle this, to avoid extreme values
# We check if we have any rows less than 3.5m
if estimation_data[estimation_data["floor-height"].astype(float) <= 3.5].shape[0] > 0:
# Perform the filter
estimation_data = estimation_data[estimation_data["floor-height"].astype(float) <= 3.5]
if vartype == "Int64":
estimated_value = self._estimate_int(estimation_data, key)
elif vartype == "float":
@ -675,6 +687,14 @@ class SearchEpc:
estimated_epc["current-energy-rating"] = sap_to_epc(estimated_epc["current-energy-efficiency"])
# Convert the cost current and potential variables - to string integers
for variable in ["heating-cost-current", "hot-water-cost-current", "lighting-cost-current",
"heating-cost-potential", "hot-water-cost-potential", "lighting-cost-potential"]:
estimated_epc[variable] = str(int(estimated_epc[variable]))
# This is a string
estimated_epc["low-energy-fixed-light-count"] = str(estimated_epc["low-energy-fixed-light-count"])
estimated_epc["postcode"] = self.postcode
estimated_epc["uprn"] = self.uprn
estimated_epc["address"] = self.full_address

View file

@ -393,6 +393,13 @@ async def trigger_plan(body: PlanTriggerRequest):
session.begin()
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):
raise ValueError("Duplicate UPRNs in the input data")
# If we have patches or overrides, we should read them in here
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
@ -848,6 +855,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# Commit final changes
session.commit()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()

View file

@ -701,7 +701,7 @@ def main():
"starting_sap": 53,
"recommended_measures": [
{
"measure": "Cyliner Insulation",
"measure": "Cylinder Insulation",
"description": "80mm cylinder insulation",
"sap_points": 2,
"ending_sap": 55,

View file

@ -0,0 +1,158 @@
import os
import time
from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.SearchEpc import SearchEpc
from utils.s3 import save_csv_to_s3
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
USER_ID = 8
PORTFOLIO_ID = 117
def app():
"""
This script prepares the asset lists for the additional housing associations, CAHA and Hornsey Housing Trust,
that are forming a consortium led by AIHA
:return:
"""
hornsey_asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/SHDF - Template - EOI - Hornsey Housing "
"Trust.xlsx",
sheet_name="Ksquared-All units information",
header=3
)
# We don't need the first row
hornsey_asset_list = hornsey_asset_list.iloc[1:]
# Fill NA values with empty strings
hornsey_asset_list = hornsey_asset_list.fillna("")
hornsey_asset_list["Address letter or number"] = hornsey_asset_list["Address letter or number"].astype(
str
).str.strip()
hornsey_asset_list["Postcode"] = hornsey_asset_list["Postcode"].astype(str).str.strip()
hornsey_asset_list["Street address"] = hornsey_asset_list["Street address"].astype(str).str.strip()
# Replace double spaces
for col in ["Address letter or number", "Street address", "Postcode"]:
hornsey_asset_list[col] = hornsey_asset_list[col].str.replace(" ", " ")
hornsey_asset_list = hornsey_asset_list[hornsey_asset_list["Address letter or number"] != ""]
missed_uprns = {
"Flat 13A Stowell House": 100021213098,
"Flat 24 Stowell House": 100021213110,
"Flat 1 36 Haringey Park": None
}
extracted_data = []
asset_list = []
for _, home in tqdm(hornsey_asset_list.iterrows(), total=len(hornsey_asset_list)):
if home["Address letter or number"] == "Flat 1 36 Haringey Park":
continue
# Some properties do not have an epc
if not home["Energy starting band (EPC)"]:
asset_list.append(
{
"uprn": missed_uprns[home["Address letter or number"]],
"address": home["Address letter or number"],
"postcode": home["Postcode"],
"property_type": "Flat", # They're all flats
}
)
continue
unit_number = home["Address letter or number"]
street = home["Street address"]
postcode = home["Postcode"]
address = ", ".join([x for x in [unit_number, street] if x])
find_epc_searcher = RetrieveFindMyEpc(address=address, postcode=postcode)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
time.sleep(0.5)
# We need uprn
searcher = SearchEpc(
address1=address,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
full_address=address,
)
searcher.find_property(skip_os=True)
newest_epc = searcher.newest_epc
if newest_epc["current-energy-efficiency"] != home["Energy starting band (EPC)"].split("-")[1]:
raise Exception("Something went wrong with the EPC data")
extracted_data.append(
{
"uprn": newest_epc["uprn"],
**find_epc_data,
"hotwater-description": newest_epc["hotwater-description"],
}
)
asset_list.append(
{
"uprn": newest_epc["uprn"],
"address": home["Address letter or number"],
"postcode": home["Postcode"],
"property_type": "Flat", # They're all flats
}
)
# We format the extracted data so that is has the same structure as non-intrusive recommendations
# We then get the UPRNs and create the asset list
non_invasive_recommendations = [
{
"uprn": r["uprn"],
"recommendations": r["recommendations"]
} for r in extracted_data
]
for r in non_invasive_recommendations:
new_recommendations = []
extracted = [r for r in extracted_data if r["uprn"] == r["uprn"]][0]
for rec in r["recommendations"]:
if extracted["hotwater-description"] == "Gas boiler/circulator, no cylinder thermostat":
if rec["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]:
continue
rec["survey"] = False
new_recommendations.append(rec)
r["recommendations"] = new_recommendations
# Store the asset list in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(asset_list),
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ["boiler_upgrade"]
}
print(body)

View file

@ -1,4 +1,377 @@
import os
import time
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from utils.s3 import read_from_s3, read_pickle_from_s3
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(asset_list):
epc_data = []
errors = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home["Postcode"]
house_number = home["Number"]
full_address = home["Full Address"]
searcher = SearchEpc(
address1=str(house_number),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
epc = {
"row_id": home["row_id"],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"]
}
epc_data.append(epc)
except Exception as e:
errors.append(home["row_id"])
time.sleep(5)
return epc_data, errors
def app():
"""
This code creates a list of cavity properties, for review
"""
archetyped_properties = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 - "
"Archetyped V3.1.xlsx",
header=4
)
cavity_descriptions = [
"Cavity: AsBuilt (1983-1995)",
"Cavity: AsBuilt (Post 1995)",
"Cavity: AsBuilt (Pre 1976)",
"Cavity: AsBuilt (1976-1982)",
]
archetyped_properties["Is Cavity Property"] = archetyped_properties["Wall Type"].isin(cavity_descriptions)
# We also identify any properties where properties were found to need cavity wall insulation
costed_packages = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater - Costed Retrofit Packages "
"20241030 (WIP) Single Model V2.xlsx",
sheet_name="Modelled Packages",
header=13
)
needs_cwi = costed_packages[
costed_packages["Main Wall Insulation"].isin(
[
"Poss Extract CWI & Refill (issues identified)",
"CWI RdSAP Default"
]
)
][["Address ID", "Address", "Current SAP Rating", "Current EPC Band", "Postcode", "Archetype ID",
"Main Wall Insulation",
"Main Roof Type", "Main Roof Insulation", "Main Roof Insulation Thickness"]]
# We flag these properties
archetyped_properties["Survey shows CWI needed for Archetype"] = archetyped_properties["Archetype ID"].isin(
needs_cwi["Archetype ID"]
)
archetyped_properties = archetyped_properties[~pd.isnull(archetyped_properties["Address ID"])]
archetyped_properties = archetyped_properties[archetyped_properties["Address ID"] != "Address ID"]
# this is the big list!!!
features = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Osmosis Reviewed - Parity Download 18.7 - "
"master sheet.csv",
encoding='latin1'
)
features["Address ID"] = features["Address ID"].astype(str)
features_to_merge = features[
[
"Address ID", "Age", "Property Type", "Walls", "Roofs", "Glazing", "Heating", "Main Fuel", "Hot Water",
"Renewables", "Total Floor Area"
]
]
stonewater_cavity_properties = archetyped_properties[
["Name", "Postcode", "Osm. ID", "Address ID", "UPRN", "UDPRN", "Archetype ID", "House no", "Street name",
"Address line 2", "City/Town", "Is Cavity Property", "Survey shows CWI needed for Archetype"]
].merge(
features_to_merge, how="left", on="Address ID"
)
# We filter this down to the properties that are cavity properties
stonewater_cavity_properties = stonewater_cavity_properties[
stonewater_cavity_properties["Is Cavity Property"] |
stonewater_cavity_properties["Survey shows CWI needed for Archetype"]
]
stonewater_cavity_properties["Reason Included"] = "As Built Cavity Property"
stonewater_cavity_properties["Reason Included"] = np.where(
stonewater_cavity_properties["Survey shows CWI needed for Archetype"] &
~stonewater_cavity_properties["Is Cavity Property"],
"Survey revealed potential need for CWI or extract and re-fill",
stonewater_cavity_properties["Reason Included"]
)
stonewater_cavity_properties["Reason Included"] = np.where(
stonewater_cavity_properties["Survey shows CWI needed for Archetype"] &
stonewater_cavity_properties["Is Cavity Property"],
"Surveyed revealed potential need for CWI or extract and re-fill and is an as built cavity property",
stonewater_cavity_properties["Reason Included"]
)
# We indicate the exact properties that need CWI, based on survey findings
stonewater_cavity_properties["Reason Included"] = np.where(
stonewater_cavity_properties["Address ID"].isin(
needs_cwi[needs_cwi["Main Wall Insulation"] == "CWI RdSAP Default"]["Address ID"].astype(int).astype(
str).values
),
"Survey showed this property needs CWI",
stonewater_cavity_properties["Reason Included"]
)
stonewater_cavity_properties["Reason Included"] = np.where(
stonewater_cavity_properties["Address ID"].isin(
needs_cwi[needs_cwi["Main Wall Insulation"] == "Poss Extract CWI & Refill (issues identified)"][
"Address ID"].astype(int).astype(str).values
),
"Survey showed this property could need extract and re-fill",
stonewater_cavity_properties["Reason Included"]
)
# We get the EPC data
epc_data = json.loads(
read_from_s3(
bucket_name="retrofit-data-dev",
s3_file_name="customers/Stonewater/clustering/epc_data.json"
)
)
epc_data = pd.DataFrame(epc_data)
epc_data["uprn"] = np.where(
epc_data["internal_id"] == 1091,
83143766,
epc_data["uprn"]
)
epc_data_batch_2 = read_pickle_from_s3(
s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
bucket_name="retrofit-data-dev"
)
epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
complete_epcs = pd.concat([epc_data, epc_data_batch_2])
epcs_to_merge = complete_epcs[
[
"uprn",
"address",
"postcode",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
"energy-consumption-current"
]
].rename(
columns={
"address": "Address",
"postcode": "Postcode",
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
}
)
# We de-dupe, taking the newest on the date the EPC was lod
epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
stonewater_cavity_properties["UPRN"] = stonewater_cavity_properties["UPRN"].astype("Int64").astype(str)
# Merge the EPCs on, with the data we need
stonewater_cavity_properties = stonewater_cavity_properties.rename(
columns={
"Age": "Parity - Build Age",
"Property Type": "Parity - Property Type",
"Walls": "Parity - Wall Construction",
"Roofs": "Parity - Roof Construction",
"Glazing": "Parity - Glazing Type",
"Heating": "Parity - Heating Type",
"Main Fuel": "Parity - Main Fuel",
"Hot Water": "Parity - Hot Water",
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
).merge(
epcs_to_merge,
how="left",
left_on="UPRN",
right_on="uprn"
)
# We now flag the additional properties in the as built list
additional_properties = features[
~features["Address ID"].isin(archetyped_properties["Address ID"].values)
]
# Filter on as built cavity properties
additional_properties = additional_properties[
additional_properties["Walls"].isin(cavity_descriptions)
]
additional_properties["Full Address"] = additional_properties["Address"].copy()
house_numbers = []
for _, x in tqdm(additional_properties.iterrows(), total=len(additional_properties)):
house_no = SearchEpc.get_house_number(x["Address"].split(",")[0], x["Postcode"])
if house_no is None:
house_no = x["Address"].split(",")[0]
# If we end up with a number like "01" we need to remove the leading zero
house_no = house_no.lstrip("0")
house_numbers.append(
{
"Address ID": x["Address ID"],
"Number": house_no
}
)
house_numbers = pd.DataFrame(house_numbers)
additional_properties = additional_properties.merge(house_numbers, how="left", on="Address ID")
additional_properties["row_id"] = additional_properties["Address ID"].copy()
# Pull the EPCs for these properties
additional_properties_epcs, errors = get_data(additional_properties)
# Save this data as a pickle
# import pickle
# with open("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/additional_properties_epcs.pkl",
# "wb") as f:
# pickle.dump(additional_properties_epcs, f)
# We drop Full Address
additional_properties = additional_properties.drop(columns=["Full Address"])
additional_properties2 = additional_properties[[
"row_id", "Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
"Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area",
]].rename(
columns={
"SAP": "Parity - Predicted SAP",
"SAP Band": "Parity - Predicted SAP Band",
"Age": "Parity - Build Age",
"Property Type": "Parity - Property Type",
"Walls": "Parity - Wall Construction",
"Roofs": "Parity - Roof Construction",
"Glazing": "Parity - Glazing Type",
"Heating": "Parity - Heating Type",
"Main Fuel": "Parity - Main Fuel",
"Hot Water": "Parity - Hot Water",
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
).merge(
pd.DataFrame(additional_properties_epcs)[
[
"row_id",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
"energy-consumption-current"
]
].rename(
columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
}
),
how="left",
on="row_id"
)
# We save the data locally
stonewater_cavity_properties.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties.csv",
index=False
)
additional_properties2.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties.csv",
index=False
)
# Save the survey findings
needs_cwi.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI - WIP.csv",
index=False
)

View file

@ -2,3 +2,9 @@ PyPDF2
pandas
tqdm
openpyxl
boto3
epc-api-python==1.0.2
usaddress==0.5.11
fuzzywuzzy==0.18.0
python-dotenv

View file

@ -359,6 +359,7 @@ class EPCRecord:
self._clean_property_dimensions()
self._clean_number_lighting_outlets()
self._clean_floor_level()
self._clean_floor_height()
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
@ -387,6 +388,20 @@ class EPCRecord:
return df
def _clean_floor_height(self):
""" Remaps anomalies in floor height to the average floor height for the property type """
floor_height_data = self.cleaning_data[
(self.cleaning_data["property_type"] == self.prepared_epc["property-type"]) &
(self.cleaning_data["built_form"] == self.prepared_epc["built-form"])
]
average = floor_height_data["floor_height"].mean()
sd = floor_height_data["floor_height"].std()
# If we're in the top 0.5 percentile of floor heights, we'll set it to the average
if self.prepared_epc["floor-height"] > average + 10 * sd:
self.prepared_epc["floor-height"] = average
if self.prepared_epc["floor-height"] <= 1.665:
self.prepared_epc["floor-height"] = average
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid

View file

@ -0,0 +1,243 @@
import requests
from bs4 import BeautifulSoup
from datetime import datetime
class RetrieveFindMyEpc:
SEARCH_POSTCODE_URL = (
"https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}"
)
BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/111.0.0.0 Safari/537.36'
}
def __init__(self, address: str, postcode: str):
"""
This class is tasked with retrieving the latest EPC data from the find my epc website
:param address: The address of the property
:param postcode: The postcode of the property
"""
self.address = address
self.postcode = postcode
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
def retrieve_newest_find_my_epc_data(self):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
postcode_input = self.postcode.replace(" ", "+")
postcode_search = self.SEARCH_POSTCODE_URL.format(postcode_input=postcode_input)
postcode_response = requests.get(postcode_search, headers=self.HEADERS)
postcode_res = BeautifulSoup(postcode_response.text, features="html.parser")
rows = postcode_res.find_all('tr', class_='govuk-table__row')
extracted_table = []
for row in rows:
# Extract the address and URL
address_tag = row.find('a', class_='govuk-link')
if address_tag is None:
continue
extracted_address = None
extracted_address_url = None
if address_tag:
extracted_address = address_tag.text.strip()
extracted_address_url = address_tag['href']
extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower()
if not extracted_address_cleaned.startswith(self.address_cleaned):
continue
# If the address is a match, we can extract the data
# Extract the expiry date
expiry_date_tag = row.find('td', class_='govuk-table__cell date')
expiry_date = None
if expiry_date_tag is not None:
expiry_date = expiry_date_tag.parent.find('span').text.strip()
extracted_table.append(
{
"extracted_address": extracted_address,
"extracted_address_url": extracted_address_url,
"expiry_date": datetime.strptime(expiry_date, '%d %B %Y'),
}
)
if not extracted_table:
raise ValueError("No EPC found")
if len(extracted_table) > 1:
# We take the one with the most recent expiry date
extracted_table = sorted(extracted_table, key=lambda x: x['expiry_date'], reverse=True)
chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url']
epc_certificate = chosen_epc.split('/')[-1]
address_response = requests.get(chosen_epc, headers=self.HEADERS)
address_res = BeautifulSoup(address_response.text, features="html.parser")
# Key data we want to retrieve:
# 1) Rating
# 2) Bills estimates
# 3) Recommendations and SAP points
# 4) Low and zero carbon energy sources
ratings = address_res.find('desc', {'id': 'svg-desc'}).text
current_rating = ratings.split(".")[0]
potential_rating = ratings.split(".")[1]
current_sap = int(current_rating.split(' ')[-1])
# Retrieve the energy consumption
bills = address_res.find('div', {'id': 'bills-affected'})
bills_list = bills.find_all('li')
if not bills_list:
# If this is the case, it's usually becaue the EPC was very old. Early EPCs did not have this information
heating_text = None
hot_water_text = None
else:
heating_text = bills_list[0].text
hot_water_text = bills_list[1].text
# Retrieve the recommendations and SAP points
recommendations = []
recommendations_div = address_res.find('div', class_='epb-recommended-improvements')
if recommendations_div:
# Find all h3 headers for each step and extract their related information
step_headers = recommendations_div.find_all('h3', class_='govuk-heading-m')
previous_sap_score = current_sap
for step_num, step_header in enumerate(step_headers, start=1):
# Extract the step title (the measure)
measure_title = step_header.text.strip().replace(f"Step {step_num}: ", "")
# Find the div containing the potential rating within the same section
potential_rating_div = step_header.find_next(
'div', class_='epb-recommended-improvements__potential-rating'
)
# Check if the potential rating div is found
if potential_rating_div:
# Extract the rating text within the SVG text element
rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold').text.strip()
# Parse the rating text to separate the numeric rating and EPC letter
new_rating = int(rating_text.split()[0])
new_epc = rating_text.split()[1]
# Append the information as a dictionary to the recommendations list
recommendations.append({
"step": step_num,
"measure": measure_title,
"new_rating": new_rating,
"new_epc": new_epc,
"sap_points": new_rating - previous_sap_score
})
previous_sap_score = new_rating
# Search for the assessment informaton
assessment_information = address_res.find('div', {'id': 'information'})
# Parse this information
rows = assessment_information.find_all('div', class_='govuk-summary-list__row')
# Create a dictionary to hold the parsed information
assessment_data = {}
for row in rows:
key = row.find('dt').text.strip()
if key == "Type of assessment":
# We dont reliably extract this
continue
value_tag = row.find('dd')
# Check if value contains a link (email)
if value_tag.find('a'):
value = value_tag.find('a').text.strip()
elif value_tag.find('summary'):
value = value_tag.find('span').text.strip()
else:
value = value_tag.text.strip()
# These are keys that we have for both the surveyor and the acreditation scheme. Firstly, we'll
# get the surveyor's name and email so we make that information clear
if key in ["Telephone", "Email"]:
if "Assessor's " + key not in assessment_data:
assessment_data["Assessor's " + key] = value
else:
assessment_data["Accreditation Scheme's " + key] = value
continue
assessment_data[key] = value
expected_keys = [
'Assessors name',
"Assessor's Telephone",
"Assessor's Email",
'Assessors ID',
'Accreditation scheme',
'Assessors declaration',
"Accreditation Scheme's Telephone",
"Accreditation Scheme's Email",
'Date of assessment',
'Date of certificate'
]
# Check we have all the expected keys
for key in expected_keys:
if key not in assessment_data:
raise ValueError(f"Missing key: {key}")
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations)
resulting_data = {
'epc_certificate': epc_certificate,
'current_epc_rating': current_rating.split(' ')[-6],
'current_epc_efficiency': current_sap,
'potential_epc_rating': potential_rating.split(' ')[-6],
"potential_epc_efficiency": int(potential_rating.split(' ')[-1]),
"heating_text": heating_text,
"hot_water_text": hot_water_text,
"recommendations": recommendations,
**assessment_data
}
return resulting_data
def format_recommendations(self, recommendations):
"""
This function converts the recommendations to a format that we can use in the engine as a non-intrusive survey
:param recommendations:
:return:
"""
measure_map = {
"Internal or external wall insulation": ["internal_wall_insulation", "external_wall_insulation"],
"Hot water cylinder insulation": ["hot_water_tank_insulation"],
"Hot water cylinder thermostat": ["cylinder_thermostat"],
"High performance external doors": ["insulated_doors"],
"Floor insulation (solid floor)": ["solid_floor_insulation"],
"Double glazed windows": ["double_glazing"],
"Cavity wall insulation": ["cavity_wall_insulation"],
"Replace boiler with new condensing boiler": ["boiler_upgrade"],
"Floor insulation": ["floor_insulation"], # Recommendation typically associated to older EPCs
"Heating controls (programmer, room thermostat and TRVs)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Low energy lighting": ["low_energy_lighting"],
}
formatted_recommendations = []
for rec in recommendations:
mapped = measure_map[rec["measure"]]
for measure in mapped:
formatted_recommendations.append(
{
"type": measure,
"sap_points": rec["sap_points"],
"survey": True
}
)
return formatted_recommendations

View file

@ -0,0 +1,2 @@
pandas
beautifulsoup4

View file

@ -21,11 +21,44 @@ class HotwaterRecommendations:
"""
# Reset the recommendations
self.recommendations = []
non_invasive_recommendations = self.property.non_invasive_recommendations
if non_invasive_recommendations:
measures = [
r["type"] for r in non_invasive_recommendations if
r["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]
]
recommendations_phase = phase
for m in measures:
non_invasive_rec = [
r for r in non_invasive_recommendations if r["type"] == m
][0]
if m == "hot_water_tank_insulation":
# We need to be able to stack these recommendations
self.recommend_tank_insulation(
phase=recommendations_phase,
sap_points=non_invasive_rec["sap_points"],
survey=non_invasive_rec["survey"],
)
recommendations_phase += 1
elif m == "cylinder_thermostat":
self.recommend_cylinder_thermostat(
phase=recommendations_phase,
sap_points=non_invasive_rec["sap_points"],
survey=non_invasive_rec["survey"],
)
recommendations_phase += 1
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
# If there is no system present, but access to the mains, we
if self.property.hotwater["clean_description"] == "Gas boiler/circulator, no cylinder thermostat":
# Handle this case specifically:
self.recommend_cylinder_thermostat_gas_boiler_circulator(phase=phase)
return
# If there is no system present, but access to the mains, we
if (
(self.property.hotwater["heater_type"] in ["electric immersion"]) &
@ -39,7 +72,7 @@ class HotwaterRecommendations:
self.recommend_cylinder_thermostat(phase=phase)
return
def recommend_tank_insulation(self, phase):
def recommend_tank_insulation(self, phase, sap_points=None, survey=False, _return=False):
"""
If the home has a very poor hot water system, this is often indicative of a lack of insulation on the hot water
tank. This is a very simple and cost effective improvement that can be made to the home. It will likely
@ -55,27 +88,30 @@ class HotwaterRecommendations:
else:
description = "Insulate hot water tank"
self.recommendations.append(
{
"phase": phase,
"parts": [],
"type": "hot_water_tank_insulation",
"measure_type": "hot_water_tank_insulation",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
**recommendation_cost,
"simulation_config": {"hot_water_energy_eff_ending": "Poor"},
"description_simulation": {
"hot-water-energy-eff": "Poor"
}
}
)
to_append = {
"phase": phase,
"parts": [],
"type": "hot_water_tank_insulation",
"measure_type": "hot_water_tank_insulation",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": sap_points,
"already_installed": already_installed,
**recommendation_cost,
"simulation_config": {"hot_water_energy_eff_ending": "Poor"},
"description_simulation": {
"hot-water-energy-eff": "Poor"
},
"survey": survey
}
if _return:
return to_append
self.recommendations.append(to_append)
return
def recommend_cylinder_thermostat(self, phase):
def recommend_cylinder_thermostat(self, phase, sap_points=None, survey=False, _return=False):
"""
If the home has a very poor hot water system, this is often indicative of a lack of insulation on the hot water
tank. This is a very simple and cost effective improvement that can be made to the home.
@ -101,23 +137,86 @@ class HotwaterRecommendations:
**hotwater_simulation_config
}
self.recommendations.append(
{
"phase": phase,
"parts": [],
"type": "cylinder_thermostat",
"measure_type": "cylinder_thermostat",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
**recommendation_cost,
"simulation_config": simulation_config,
"description_simulation": {
"hot-water-energy-eff": self.property.data["hot-water-energy-eff"],
"hotwater-description": new_epc_description,
}
}
)
to_append = {
"phase": phase,
"parts": [],
"type": "cylinder_thermostat",
"measure_type": "cylinder_thermostat",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": sap_points,
"already_installed": already_installed,
**recommendation_cost,
"simulation_config": simulation_config,
"description_simulation": {
"hot-water-energy-eff": self.property.data["hot-water-energy-eff"],
"hotwater-description": new_epc_description,
},
"survey": survey
}
if _return:
return to_append
self.recommendations.append(to_append)
return
def recommend_cylinder_thermostat_gas_boiler_circulator(self, phase):
"""
If the home has a very poor hot water system, this is often indicative of a lack of insulation on the
hot water
tank. This is a very simple and cost effective improvement that can be made to the home.
"""
thermostat_recommendation_cost = self.costs.cylinder_thermostat()
cylinder_recommendation_cost = self.costs.hot_water_tank_insulation()
# Add them
total_cost = {
k: thermostat_recommendation_cost[k] + cylinder_recommendation_cost[k] for k in
thermostat_recommendation_cost.keys()
}
already_installed = "cylinder_thermostat" in self.property.already_installed
if already_installed:
total_cost = override_costs(total_cost)
description = "Cylinder thermostat & insulation has already been installed, no further action required"
else:
description = "Install a smart cylinder thermostat and insulate the hot water tank with 80mm insulation"
new_epc_description = "From main system"
hotwater_ending_config = HotWaterAttributes(new_epc_description).process()
hotwater_simulation_config = check_simulation_difference(
new_config=hotwater_ending_config, old_config=self.property.hotwater
)
if self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor", "Average"]:
new_efficiency = "Good"
else:
new_efficiency = self.property.data["hot-water-energy-eff"]
simulation_config = {
"hot_water_energy_eff_ending": new_efficiency,
**hotwater_simulation_config
}
to_append = {
"phase": phase,
"parts": [],
"type": "cylinder_thermostat",
"measure_type": "cylinder_thermostat",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": already_installed,
**total_cost,
"simulation_config": simulation_config,
"description_simulation": {
"hot-water-energy-eff": simulation_config["hot_water_energy_eff_ending"],
"hotwater-description": new_epc_description,
},
"survey": False
}
self.recommendations.append(to_append)
return

View file

@ -142,12 +142,9 @@ class Recommendations:
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
# insulation
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this
# has no
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
# have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
# insulation We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this
# has no real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
# have any wall or roof recommendations, we will ensure that ventilation is included in the simulation
if (
(self.wall_recomender.recommendations or self.roof_recommender.recommendations) and
("ventilation" in measures)
@ -253,8 +250,13 @@ class Recommendations:
if "hot_water" in measures:
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
if len(self.hotwater_recommender.recommendations) > 1:
for r in self.hotwater_recommender.recommendations:
property_recommendations.append([r])
phase += 1
else:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
if "secondary_heating" in measures:
self.secondary_heating_recommender.recommend(phase=phase)

View file

@ -152,6 +152,9 @@ class RoofRecommendations:
if self.is_room_roof_insulated_or_unsuitable(measures):
return
if self.property.roof["is_thatched"]:
return
# If we have a u-value already, need to implement this
if u_value:
if u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:

View file

@ -540,15 +540,10 @@ class WallRecommendations(Definitions):
lowest_selected_u_value = None
recommendations = []
iwi_non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "internal_wall_insulation"), {}
non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if
r["type"] == insulation_materials["type"].values[0]), {}
)
ewi_non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "external_wall_insulation"), {}
)
if ewi_non_invasive_recommendations:
raise NotImplementedError("Implement ewi non-invasive recommendations")
for _, insulation_material_group in insulation_materials.groupby("description"):
@ -590,31 +585,25 @@ class WallRecommendations(Definitions):
if already_installed:
cost_result = override_costs(cost_result)
if non_invasive_recommendations.get("cost") is not None:
raise NotImplementedError(
"Not handled passing costs from non-invasive recommendations for iwi"
)
if material["type"] == "internal_wall_insulation":
if iwi_non_invasive_recommendations.get("cost") is not None:
raise NotImplementedError(
"Not handled passing costs from non-invasive recommendations for iwi"
)
sap_points = iwi_non_invasive_recommendations.get("sap_points", None)
survey = iwi_non_invasive_recommendations.get("survey", False)
new_description = self.get_internal_external_wall_description(
self.INTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value
)
elif material["type"] == "external_wall_insulation":
sap_points = ewi_non_invasive_recommendations.get("sap_points", None)
survey = ewi_non_invasive_recommendations.get("survey", False)
new_description = self.get_internal_external_wall_description(
self.EXTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value
)
else:
raise ValueError("Invalid material type")
sap_points = non_invasive_recommendations.get("sap_points", None)
survey = non_invasive_recommendations.get("survey", False)
wall_ending_config = WallAttributes(new_description).process()
walls_simulation_config = check_simulation_difference(

View file

@ -257,7 +257,7 @@ epc_wall_description_map = {
"Timber frame, as built, partial insulation": "Timber frame as built",
"Timber frame, as built, no insulation": "Timber frame as built",
"Timber frame, with external insulation": "Timber frame with internal insulation",
"Timber frame, with internal insulation": "Timber frame with internal insulation",
############################
# Sandstone/limestones wall mappings
############################