Merge pull request #374 from Hestia-Homes/remote-assessment

Completed Stonewater Wave 3 Modelling
This commit is contained in:
KhalimCK 2024-11-21 11:47:39 +00:00 committed by GitHub
commit f6612c0cd4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1465 additions and 43 deletions

View file

@ -52,6 +52,20 @@ aiha_wave_3_features = aiha_original_asset_data[
wall_type_breakdown = aiha_wave_3_features["Wall type"].value_counts()
property_type_breakdown = aiha_wave_3_features.groupby(["Property type", "floor"]).size().reset_index()
aiha_wave_3_features[aiha_wave_3_features["Property type"] == "Flat"][["Street address", "Postcode"]]
# 4 Yetev Lev Court  ... Semi-Detached mid - Medium
# B 86 Bethune Road ... Mid-Terrace top. - Low
# A 80 Bethune Road ... Mid-Terrace ground. - Low
# B 80 Bethune Road ... \n \n - Low
# A 9 Clapton Common ... Semi-Detached ground. - Low
# C 9 Clapton Common ... End-Terrace \n. - Low
# B 89 Manor Road ... \n \n. - Low
# A 6 Northfield Road ... Detached top. - Low
# 13 Northfield Rd ... Semi-Detached \n - Low
# A 73 Manor Road ... End-Terrace \n - Low
# B 73 Manor Road ... Detached top - Low
# Hornsey data - contained in original asset list
hornsey_asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/SHDF - Template - EOI - Hornsey Housing "
@ -88,5 +102,5 @@ caha_epc_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/caha_extracted_property_data.xlsx"
)
caha_epc_data["property_type"].value_counts()
caha_epc_data["wall_type"].value_counts()
caha_epc_data[caha_epc_data["address"] != "33 Woodhouse Road"]["property_type"].value_counts()
caha_epc_data[caha_epc_data["address"] != "33 Woodhouse Road"]["wall_type"].value_counts()

View file

@ -8,6 +8,7 @@ from tqdm import tqdm
import pandas as pd
import numpy as np
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from etl.spatial.OpenUprnClient import OpenUprnClient
from backend.SearchEpc import SearchEpc
from utils.s3 import save_csv_to_s3
@ -60,6 +61,7 @@ def hornsey():
}
extracted_data = []
asset_list = []
hornsey_asset_list["row_id"] = hornsey_asset_list.index
for _, home in tqdm(hornsey_asset_list.iterrows(), total=len(hornsey_asset_list)):
if home["Address letter or number"] == "Flat 1 36 Haringey Park":
@ -108,12 +110,24 @@ def hornsey():
asset_list.append(
{
"uprn": newest_epc["uprn"],
"row_id": home["row_id"],
"address": home["Address letter or number"],
"postcode": home["Postcode"],
"property_type": "Flat", # They're all flats
}
)
# Get conservation area data
# uprns = [x["uprn"] for x in extracted_data]
# conservation_area_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev")
#
# addresses = pd.DataFrame(asset_list)
# addresses["uprn"] = addresses["uprn"].astype(int)
# conservation_area_df = conservation_area_data.merge(addresses, how="left", right_on="uprn", left_on="UPRN")
# conservation_area_df.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/hornsey_conservation_area_data.csv"
# )
# We format the extracted data so that is has the same structure as non-intrusive recommendations
# We then get the UPRNs and create the asset list
@ -213,6 +227,8 @@ def caha():
# If pattern doesn't match, return original address
return address
caha_asset_list["row_id"] = caha_asset_list.index
extracted_data = []
asset_list = []
for _, home in tqdm(caha_asset_list.iterrows(), total=len(caha_asset_list)):
@ -270,6 +286,7 @@ def caha():
asset_list.append(
{
"row_id": home["row_id"],
"uprn": uprn,
"address": address,
"postcode": home["Postcode"],
@ -280,6 +297,24 @@ def caha():
}
)
# Missing row ids
missed = [r for r in caha_asset_list["row_id"].tolist() if r not in [x["row_id"] for x in asset_list]]
no_data = [x for x in asset_list if x["uprn"] in [None, ""]]
no_data = pd.DataFrame(no_data)
# Get conservation area data
uprns = [x["uprn"] for x in extracted_data if x["uprn"] not in ["", None]]
conservation_area_data = OpenUprnClient.get_spatial_data([100022526362], "retrofit-data-dev")
addresses = pd.DataFrame(asset_list)
addresses["uprn"] = addresses["uprn"].astype(str)
conservation_area_data["UPRN"] = conservation_area_data["UPRN"].astype(str)
conservation_area_df = conservation_area_data.merge(addresses, how="left", right_on="uprn", left_on="UPRN")
conservation_area_df.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/caha_conservation_area_data.csv"
)
non_invasive_recommendations = [
{
"uprn": r["uprn"],

View file

@ -1,7 +1,7 @@
import pandas as pd
from utils.s3 import save_csv_to_s3
PORTFOLIO_ID = 111
PORTFOLIO_ID = 120
USER_ID = 8
@ -13,10 +13,11 @@ def app():
asset_list = [
{
"uprn": 100050770761,
"address": "12 Sheardown Street",
"postcode": "DN4 0BH"
"uprn": 100030334057,
"address": "5, Lynton Street",
"postcode": "DE22 3RW"
}
]
asset_list = pd.DataFrame(asset_list)
@ -30,11 +31,22 @@ def app():
non_invasive_recommendations = [
{
"uprn": 100050770761,
"uprn": 100030334057,
"recommendations": [
{
"type": "extension_cavity_wall_insulation",
"type": "internal_wall_insulation",
"sap_points": 9,
"survey": True
},
{
"type": "external_wall_insulation",
"sap_points": 9,
"survey": True
},
{
"type": "suspended_floor_insulation",
"sap_points": 2,
"survey": True
}
]
}
@ -49,8 +61,8 @@ def app():
valuation_data = [
{
"uprn": 100050770761,
"value": 67_000
"uprn": 100030334057,
"value": 133_000
}
]
# Store valuation data to s3

View file

@ -229,7 +229,3 @@ def app():
filename = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/southend/southend EPC Data pull - 14 Nov "
"2024.xlsx")
asset_list.to_excel(filename, index=False)
asset_list["% of the Roof with PV"].value_counts()
asset_list[asset_list["% of the Roof with PV"] == "50.0"][["Address", "Postcode"]]

File diff suppressed because it is too large Load diff

View file

@ -7,4 +7,5 @@ epc-api-python==1.0.2
usaddress==0.5.11
fuzzywuzzy==0.18.0
python-dotenv
scipy

View file

@ -26,6 +26,20 @@ class RetrieveFindMyEpc:
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
@staticmethod
def extract_low_carbon_sources(soup):
# Find the section header
section_header = soup.find("h3", string="Low and zero carbon energy sources")
if not section_header:
return {}
# Locate the list following the header
energy_list = section_header.find_next("ul")
# Extract the list items
sources = {item.get_text(strip=True): True for item in energy_list.find_all("li")}
return sources
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None):
"""
For a post code and address, we pull out all the required data from the find my epc website
@ -112,6 +126,7 @@ class RetrieveFindMyEpc:
# Find all h3 headers for each step and extract their related information
step_headers = recommendations_div.find_all('h3', class_='govuk-heading-m')
previous_sap_score = current_sap
previous_epc = current_rating.split(' ')[-6]
for step_num, step_header in enumerate(step_headers, start=1):
# Extract the step title (the measure)
measure_title = step_header.text.strip().replace(f"Step {step_num}: ", "")
@ -124,7 +139,11 @@ class RetrieveFindMyEpc:
# Check if the potential rating div is found
if potential_rating_div:
# Extract the rating text within the SVG text element
rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold').text.strip()
extracted_rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold')
if extracted_rating_text is not None:
rating_text = extracted_rating_text.text.strip()
else:
rating_text = " ".join([str(previous_sap_score), previous_epc])
# Parse the rating text to separate the numeric rating and EPC letter
new_rating = int(rating_text.split()[0])
new_epc = rating_text.split()[1]
@ -138,6 +157,7 @@ class RetrieveFindMyEpc:
"sap_points": new_rating - previous_sap_score
})
previous_sap_score = new_rating
previous_epc = new_epc
# Search for the assessment informaton
assessment_information = address_res.find('div', {'id': 'information'})
@ -191,6 +211,9 @@ class RetrieveFindMyEpc:
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
# 4) Low and zero carbon energy sources
low_carbon_energy_sources = self.extract_low_carbon_sources(address_res)
resulting_data = {
'epc_certificate': epc_certificate,
'current_epc_rating': current_rating.split(' ')[-6],
@ -200,7 +223,8 @@ class RetrieveFindMyEpc:
"heating_text": heating_text,
"hot_water_text": hot_water_text,
"recommendations": recommendations,
**assessment_data
**assessment_data,
**low_carbon_energy_sources
}
return resulting_data
@ -246,6 +270,31 @@ class RetrieveFindMyEpc:
],
"Band A condensing boiler": ["boiler_upgrade"],
"Double glazing": ["double_glazing"],
"Flue gas heat recovery device in conjunction with boiler": ["flue_gas_heat_recovery"],
"Wind turbine": ["wind_turbine"],
"Loft insulation": ["loft_insulation"],
"Solar photovoltaic (PV) panels": ["solar_pv"],
"Party wall insulation": ["party_wall_insulation"],
'Draught proofing': ["draught_proofing"],
"Roof insulation recommendation": [],
"Cavity wall insulation recommendation": [],
"Windows draught proofing": [],
"Low energy lighting for all fixed outlets": ["low_energy_lighting"],
"Cylinder thermostat recommendation": [],
"Heating controls recommendation": [],
"Replace boiler with Band A condensing boiler": [],
"Solar panel recommendation": [],
"Double glazing recommendation": [],
"Solid wall insulation recommendation": [],
"Fuel change recommendation": [],
"PV Cells recommendation": [],
"Replacement glazing units": ["double_glazing"],
"Heating controls (time and temperature zone control)": ["time_temperature_zone_control"],
"High heat retention storage heaters": ["high_heat_retention_storage_heaters"],
"Gas condensing boiler": ["boiler_upgrade"],
"Change room heaters to condensing boiler": ["boiler_upgrade"],
"Cylinder thermostat": ["cylinder_thermostat"],
"Heat recovery system for mixer showers": ["heat_recovery_shower"],
}
survey = True

View file

@ -0,0 +1,333 @@
import os
import time
from idlelib.iomenu import errors
import pandas as pd
import numpy as np
from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(asset_list, fulladdress_column, address1_column, postcode_column):
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
house_number = home[address1_column]
full_address = home[fulladdress_column]
searcher = SearchEpc(
address1=str(house_number),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home["row_id"])
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
"row_id": home["row_id"],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home["row_id"])
time.sleep(5)
return epc_data, errors, no_epc
def extract_address1(asset_list, full_address_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
raise ValueError(f"Method {method} not recognized")
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
Data request contents:
Date of last EPC
Reason for EPC
SAP score on register
Property Type
Property Area
Property Age
Any Dimensions (HLP,PW,RH)
Property Wall Construction
Heating Type
Secondary Heating
Loft Insulation Depth
Additional if possible:
Heat loss calculations
EPC recommendations
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Bromford/"
DATA_FILENAME = "Bromford programme review.xlsx"
SHEET_NAME = "Bromford"
POSTCODE_COLUMN = "Postcode"
FULLADDRESS_COLUMN = None
ADDRESS1_COLUMN = "No."
ADDRESS1_METHOD = "first_two_words"
ADDRESS_COLS_TO_CONCAT = ["No.", "Address"]
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list["Postcode"])]
asset_list["row_id"] = asset_list.index
# We clean up portential non-breaking spaces, and double spaces
for col in [c for c in [POSTCODE_COLUMN, FULLADDRESS_COLUMN, ADDRESS1_COLUMN] if c is not None]:
asset_list[col] = asset_list[col].astype(str)
asset_list[col] = asset_list[col].str.replace('\xa0', ' ', regex=False)
asset_list[col] = asset_list[col].str.replace(' ', ' ', regex=False)
if ADDRESS1_COLUMN is None:
ADDRESS1_COLUMN = "address1_extracted"
asset_list = extract_address1(
asset_list=asset_list, full_address_col=FULLADDRESS_COLUMN, method=ADDRESS1_METHOD
)
if FULLADDRESS_COLUMN is None:
FULLADDRESS_COLUMN = "fulladdress_extracted"
# We concatenate the columns in ADDRESS_COLS_TO_CONCAT, on commas
asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply(lambda x: ", ".join(x), axis=1)
# We check for duplicated addresses
asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN]
if asset_list["deduper"].duplicated().sum():
# Drop the dupes
print(f"There are {asset_list['deduper'].duplicated().sum()} duplicated addresses - dropping")
asset_list = asset_list[~asset_list["deduper"].duplicated()]
epc_data, errors, no_epc = get_data(
asset_list=asset_list,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN
)
# We now retrieve any failed properties
asset_list_failed = asset_list[asset_list["row_id"].isin(errors)]
epc_data_failed, _, _ = get_data(
asset_list=asset_list_failed,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN
)
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
epc_df = pd.DataFrame(epc_data)
# We expand out the recommendations
recommendations_df = epc_df[["row_id", "recommendations"]]
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = ["row_id"] + list(unique_recommendations)
transformed_data = []
for _, row in recommendations_df.iterrows():
# Initialize a dictionary for this row with False for all recommendations
row_data = {col: False for col in columns}
row_data["row_id"] = row["row_id"]
# Set True for each recommendation present in this row
for rec in row["recommendations"]:
recommendation_text = rec["improvement-summary-text"]
row_data[recommendation_text] = True
# Append the row data to transformed_data
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
# Drop the column that is ""
transformed_df = transformed_df.drop(columns=[""])
# Get the find my epc data
find_my_epc_data = epc_df[["row_id", "find_my_epc_data"]].drop(columns=["find_my_epc_data"]).join(
pd.json_normalize(epc_df["find_my_epc_data"])
)
# We check if we get the solar pv column:
if "Solar photovoltaics" not in find_my_epc_data.columns:
find_my_epc_data["Solar photovoltaics"] = False
# Retrieve just the data we need
epc_df = epc_df[
[
"row_id",
"uprn",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
# New fields needed
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
#
"energy-consumption-current", # kwh/m2
"photo-supply",
]
]
asset_list = asset_list.merge(
epc_df,
how="left",
on="row_id"
).merge(
find_my_epc_data[
[
"row_id", "heating_text", "hot_water_text", 'Assessors name',
"Assessor's Telephone", "Assessor's Email", "Accreditation scheme",
"Assessors ID", "Solar photovoltaics"
]
].rename(
columns={
"Solar photovoltaics": "Has Solar PV",
"heating_text": "Heating Estimated kWh",
"hot_water_text": "Hot Water Estimated kWh",
}
),
how="left",
on="row_id"
)
asset_list["Has Solar PV"] = asset_list["Has Solar PV"] | ~asset_list["photo-supply"].isin(["0.0", 0, None, ""])
asset_list = asset_list.drop(columns=["photo-supply"])
# Rename the columns
asset_list = asset_list.rename(columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
})
asset_list["Estimated Number of Floors"] = asset_list.apply(
lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull(
x["Property Type"]) else None, axis=1
)
asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float)
# Replace "" value with None
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None)
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float)
asset_list["Estimated Perimeter (m)"] = asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"],
num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"],
), axis=1
)
asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype"]
),
axis=1
)
asset_list["Roof Insulation Thickness"] = asset_list.apply(
lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull(
x["Roof Construction"]) else None,
axis=1
)
# For all of the columns in transformed_df, prefix with "Recommendation: "
for col in transformed_df.columns:
if col == "row_id":
continue
transformed_df = transformed_df.rename(columns={col: f"Recommendation: {col}"})
asset_list = asset_list.merge(
transformed_df,
how="left",
on="row_id"
)
asset_list = asset_list.drop(columns=["row_id"])
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
asset_list.to_excel(filename, index=False)

View file

@ -172,6 +172,11 @@ class FloorRecommendations(Definitions):
insulation_materials = pd.DataFrame(insulation_materials)
non_invasive_recs = next(
(r for r in self.property.non_invasive_recommendations if
r["type"] == insulation_materials["type"].values[0]), {}
)
lowest_selected_u_value = None
for _, insulation_material_group in insulation_materials.groupby("description"):
@ -217,6 +222,9 @@ class FloorRecommendations(Definitions):
else:
raise NotImplementedError("Implement me!")
sap_points = non_invasive_recs.get("sap_points", None)
survey = non_invasive_recs.get("survey", False)
floor_ending_config = FloorAttributes(new_description).process()
floor_simulation_config = check_simulation_difference(
new_config=floor_ending_config, old_config=self.property.floor, prefix="floor_"
@ -245,7 +253,8 @@ class FloorRecommendations(Definitions):
"description": self._make_floor_description(material),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": None,
"sap_points": sap_points,
"survey": survey,
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": {

View file

@ -66,7 +66,7 @@ class HotwaterRecommendations:
(self.property.hotwater["heater_type"] in ["electric immersion"]) &
(self.property.data["hot-water-energy-eff"] == "Very Poor") &
(self.property.hotwater["no_system_present"] is None) &
len(has_tank_recommendation) == 0
(len(has_tank_recommendation) == 0)
):
self.recommend_tank_insulation(phase=phase)
return