Merge pull request #373 from Hestia-Homes/caha-model

Caha model
This commit is contained in:
KhalimCK 2024-11-15 15:00:43 +00:00 committed by GitHub
commit 9b038a8dcb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 830 additions and 22 deletions

View file

@ -792,9 +792,14 @@ class GoogleSolarApi:
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
# Or if we have a solar non-invasive recommendation
non_invasive_rec = next(
(r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"), {}
).get("array_wattage")
if (
(not property_instance.is_solar_pv_valid()) or
[r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"]
non_invasive_rec is not None
):
continue

View file

@ -394,7 +394,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x]
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):

View file

@ -0,0 +1,92 @@
"""
This is an adhoc script, used to pull together some of the figures that are being included in the
Warm Homes: Social Housing Wave 3 funding application
"""
import pandas as pd
import numpy as np
aiha_all_units = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/AIHA Measures Packages 2024_11_13.xlsx",
sheet_name="All Properties - AIHA",
header=2
)
modelled_units = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/AIHA Measures Packages 2024_11_13.xlsx",
sheet_name="Modelled Properties - Measures",
header=5
)
aiha_all_units = aiha_all_units.drop(columns=['Unnamed: 0', 'Unnamed: 1'])
aiha_extracted_property_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/extracted_property_data.csv"
)
aiha_wave_3_units = aiha_all_units[aiha_all_units["Expected Package Cost"].astype(float) > 0]
# TODO: The EPC C property isn't a C!
aiha_epc_breakdown = aiha_wave_3_units["Expected EPC Rating"].replace({"D or E": "E"}).value_counts()
# For CAHA
caha_epc_breakdown = modelled_units[
modelled_units['Survey Key'].str.contains("CAHA")
]['Current EPC Rating'].value_counts()
# For Hornsey
hornsey_epc_breakdown = modelled_units[
modelled_units['Survey Key'].str.contains("HORNSEY")
]['Current EPC Rating'].value_counts()
aiha_original_asset_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/240924- KSQ & Domna Info Merge - AIHA - SHDF Wave 3 "
"bid - Supplementary information.xlsx",
sheet_name="Archetyping Data",
header=2
)
# Get the units in the bid:
aiha_wave_3_features = aiha_original_asset_data[
['Address letter or number', 'Street address', 'Postcode', "Wall type",
"Property type", "built-form", "floor"]
].merge(
aiha_wave_3_units[['Address letter or number', 'Street address', 'Postcode']],
how="inner",
on=["Address letter or number", "Street address", "Postcode"]
)
wall_type_breakdown = aiha_wave_3_features["Wall type"].value_counts()
property_type_breakdown = aiha_wave_3_features.groupby(["Property type", "floor"]).size().reset_index()
# Hornsey data - contained in original asset list
hornsey_asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/SHDF - Template - EOI - Hornsey Housing "
"Trust.xlsx",
sheet_name="Ksquared-All units information",
header=3
)
# We don't need the first row
hornsey_asset_list = hornsey_asset_list.iloc[1:]
# Fill NA values with empty strings
hornsey_asset_list = hornsey_asset_list.fillna("")
hornsey_asset_list["Address letter or number"] = hornsey_asset_list["Address letter or number"].astype(
str
).str.strip()
hornsey_asset_list["Postcode"] = hornsey_asset_list["Postcode"].astype(str).str.strip()
hornsey_asset_list["Street address"] = hornsey_asset_list["Street address"].astype(str).str.strip()
# Replace double spaces
for col in ["Address letter or number", "Street address", "Postcode"]:
hornsey_asset_list[col] = hornsey_asset_list[col].str.replace(" ", " ")
hornsey_asset_list = hornsey_asset_list[hornsey_asset_list["Address letter or number"] != ""]
hornsey_asset_list["Wall Type Cleaned"] = np.where(
hornsey_asset_list["Wall type"].str.contains("Cavity"),
"Cavity",
"Solid"
)
hornsey_asset_list["Property type"].value_counts()
# CAHA
caha_epc_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/caha_extracted_property_data.xlsx"
)
caha_epc_data["property_type"].value_counts()
caha_epc_data["wall_type"].value_counts()

View file

@ -92,9 +92,13 @@ def main():
# THis is the data we need for the AIHA project
measures_data = extracted_surveys[
["survey_key", "address", "postcode", "current-energy-efficiency", "current-energy-rating", "number_of_floors"]
["survey_key", "address", "postcode", "current-energy-efficiency", "current-energy-rating",
"number_of_floors", "walls-description", "property-type", "built-form"]
]
measures_data = measures_data.sort_values("survey_key", ascending=True)
measures_data.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/extracted_property_data.csv",
)
# Note:
# The properties will still have "Very poor" ratings for their hot water

View file

@ -1,9 +1,12 @@
import os
import time
import re
from etl.epc.settings import EARLIEST_EPC_DATE
from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
import numpy as np
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.SearchEpc import SearchEpc
from utils.s3 import save_csv_to_s3
@ -12,9 +15,10 @@ load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
USER_ID = 8
PORTFOLIO_ID = 117
CAHA_PORTFOLIO_ID = 118
def app():
def hornsey():
"""
This script prepares the asset lists for the additional housing associations, CAHA and Hornsey Housing Trust,
that are forming a consortium led by AIHA
@ -43,6 +47,12 @@ def app():
hornsey_asset_list = hornsey_asset_list[hornsey_asset_list["Address letter or number"] != ""]
hornsey_asset_list["Wall Type Cleaned"] = np.where(
"Cavity" in hornsey_asset_list["Wall type"],
"Cavity",
"Solid"
)
missed_uprns = {
"Flat 13A Stowell House": 100021213098,
"Flat 24 Stowell House": 100021213110,
@ -156,3 +166,225 @@ def app():
"exclusions": ["boiler_upgrade"]
}
print(body)
def caha():
caha_asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/AIHA/Copy of AIHA - WHSHF Wave 3 bid - Consortium "
"member properties - CAHA.xlsx",
sheet_name="Ksquared-All units information",
header=3
)
caha_asset_list = caha_asset_list.iloc[1:]
# Fill NA values with empty strings
caha_asset_list = caha_asset_list.fillna("")
caha_asset_list["Address letter or number"] = caha_asset_list["Address letter or number"].astype(
str
).str.strip()
# We Add POstcode as it wasn't populated - split on space and take the last two entries and re-concatenate on space
caha_asset_list["Street address"] = caha_asset_list["Street address"].str.strip()
caha_asset_list["Postcode"] = caha_asset_list["Street address"].str.split(" ").str[-2:].str.join(" ")
# Take just the columns we need
caha_asset_list = caha_asset_list[["Address letter or number", "Street address", "Postcode"]]
for col in ["Address letter or number", "Street address", "Postcode"]:
caha_asset_list[col] = caha_asset_list[col].str.replace(" ", " ")
# Pull the data from find my epc
remap = {
"Flat A, 50 Talbot Road N6 4QP": "50a Talbot Road",
"Flat A, 51 First Avenue EN1 1BN": "51a, First Avenue",
"Flat B, 51 First Avenue EN1 1BN": "51b, First Avenue"
}
def remap_address(address):
# Match patterns like 'Flat A, 30 Grove Park Road'
match = re.match(r'Flat (\w), (\d+) (.+)', address)
if match:
flat_letter = match.group(1) # e.g., 'A'
number = match.group(2) # e.g., '30'
rest_of_address = match.group(3) # e.g., 'Grove Park Road'
# Format the new address as '30A Grove Park Road'
return f"{number}{flat_letter} {rest_of_address}"
# If pattern doesn't match, return original address
return address
extracted_data = []
asset_list = []
for _, home in tqdm(caha_asset_list.iterrows(), total=len(caha_asset_list)):
if home["Street address"] == "35 Stanford road N11 3HY" and home["Address letter or number"] == "":
continue
if home["Street address"] == "29 Victoria Avenue N3 1BD" and home["Address letter or number"] == "":
continue
if home["Street address"] == "11 Victoria Avenue N3 1BD" and home["Address letter or number"] == "Flat A":
continue
if home["Street address"] == "11 Victoria Avenue N3 1BD" and home["Address letter or number"] == "Flat C":
continue
if home["Street address"] == "10 Forest Gardens N17 6XA" and home["Address letter or number"] == "Flat C":
continue
if home["Street address"] == "219 Cann Hall Road E11 3NJ" and home["Address letter or number"] == "Flat B":
continue
unit_number = home["Address letter or number"]
street = home["Street address"]
postcode = home["Postcode"]
address = ", ".join([x for x in [unit_number, street] if x])
address = remap.get(address, address)
address = address.replace(postcode, "").strip()
if "Victoria Avenue" not in address:
address = remap_address(address)
find_epc_searcher = RetrieveFindMyEpc(address=address, postcode=postcode)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data(sap_2012_date=EARLIEST_EPC_DATE)
time.sleep(0.5)
# We need uprn
searcher = SearchEpc(
address1=address,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
full_address=address,
)
searcher.find_property(skip_os=True)
newest_epc = searcher.newest_epc
uprn = newest_epc["uprn"]
if address in ["Flat D, 11 Victoria Avenue", "Flat B, 11 Victoria Avenue"]:
uprn = None
extracted_data.append(
{
"uprn": uprn,
**find_epc_data,
}
)
asset_list.append(
{
"uprn": uprn,
"address": address,
"postcode": home["Postcode"],
"property_type": newest_epc["property-type"],
"wall_type": newest_epc["walls-description"],
"built_form": newest_epc["built-form"],
"flat_storey_count": newest_epc['flat-storey-count'],
}
)
non_invasive_recommendations = [
{
"uprn": r["uprn"],
"recommendations": r["recommendations"]
} for r in extracted_data
]
# for r in non_invasive_recommendations:
# new_recommendations = []
# extracted = [r for r in extracted_data if r["uprn"] == r["uprn"]][0]
# for rec in r["recommendations"]:
# if extracted["hotwater-description"] == "Gas boiler/circulator, no cylinder thermostat":
# if rec["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]:
# continue
# rec["survey"] = False
# new_recommendations.append(rec)
# r["recommendations"] = new_recommendations
# We model the two properties separately
asset_list = pd.DataFrame(asset_list)
# Drop Flat D, 11 Victoria Avenue
asset_list1 = asset_list[asset_list["address"] != "Flat D, 11 Victoria Avenue"]
asset_list2 = asset_list[asset_list["address"] == "Flat D, 11 Victoria Avenue"]
# Store the asset list in s3
filename = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list1.csv"
save_csv_to_s3(
dataframe=asset_list1,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
filename2 = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list2.csv"
save_csv_to_s3(
dataframe=asset_list2,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename2
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
body = {
"portfolio_id": str(CAHA_PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ["boiler_upgrade"]
}
print(body)
body2 = {
"portfolio_id": str(CAHA_PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename2,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ["boiler_upgrade"]
}
print(body2)
#
asset_list3 = [
{
"address": "10b Forest Gardens", "postcode": "N17 6XA", "uprn": 100021180197
}
]
filename3 = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list3.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(asset_list3),
bucket_name="retrofit-plan-inputs-dev",
file_name=filename3
)
body3 = {
"portfolio_id": str(119),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename3,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ["boiler_upgrade"]
}
print(body3)

View file

@ -0,0 +1,235 @@
import os
import time
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from utils.s3 import read_excel_from_s3
from backend.SearchEpc import SearchEpc
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(asset_list):
epc_data = []
errors = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home["Postcode"]
address1 = home["address1"].split(",")[0]
full_address = home["Address"]
searcher = SearchEpc(
address1=str(address1),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
epc = {
"row_id": home["row_id"],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"]
}
epc_data.append(epc)
except Exception as e:
errors.append(home["row_id"])
time.sleep(5)
return epc_data, errors
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
Data request contents:
Date of last EPC
Reason for EPC
SAP score on register
Property Type
Property Area
Property Age
Any Dimensions (HLP,PW,RH)
Property Wall Construction
Heating Type
Secondary Heating
Loft Insulation Depth
Additional if possible:
Heat loss calculations
EPC recommendations
Property UPRN
"""
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/southend/Southend Planned programme.xlsx",
header=0,
sheet_name="Planned RM"
)
asset_list["row_id"] = asset_list.index
asset_list["address1"] = asset_list["Address"].str.split(",").str[0]
epc_data, errors = get_data(asset_list)
# We now retrieve any failed properties
asset_list_failed = asset_list[asset_list["row_id"].isin(errors)]
epc_data_failed, _ = get_data(asset_list_failed)
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
epc_df = pd.DataFrame(epc_data)
# We expand out the recommendations
recommendations_df = epc_df[["row_id", "recommendations"]]
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = ["row_id"] + list(unique_recommendations)
transformed_data = []
for _, row in recommendations_df.iterrows():
# Initialize a dictionary for this row with False for all recommendations
row_data = {col: False for col in columns}
row_data["row_id"] = row["row_id"]
# Set True for each recommendation present in this row
for rec in row["recommendations"]:
recommendation_text = rec["improvement-summary-text"]
row_data[recommendation_text] = True
# Append the row data to transformed_data
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
# Drop the column that is ""
transformed_df = transformed_df.drop(columns=[""])
# Retrieve just the data we need
epc_df = epc_df[
[
"row_id",
"uprn",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
# New fields needed
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
#
"energy-consumption-current", # kwh/m2
"photo-supply",
]
]
asset_list = asset_list.merge(
epc_df,
how="left",
on="row_id"
).merge(
transformed_df,
how="left",
on="row_id"
)
asset_list = asset_list.drop(columns=["row_id"])
# Rename the columns
asset_list = asset_list.rename(columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
"photo-supply": "% of the Roof with PV"
})
asset_list["Estimated Number of Floors"] = asset_list.apply(
lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull(
x["Property Type"]) else None, axis=1
)
asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float)
# Replace "" value with None
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None)
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float)
asset_list["Estimated Perimeter (m)"] = asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"],
num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"],
), axis=1
)
asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype"]
),
axis=1
)
asset_list["Roof Insulation Thickness"] = asset_list.apply(
lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull(
x["Roof Construction"]) else None,
axis=1
)
# Store as an excel
filename = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/southend/southend EPC Data pull - 14 Nov "
"2024.xlsx")
asset_list.to_excel(filename, index=False)
asset_list["% of the Roof with PV"].value_counts()
asset_list[asset_list["% of the Roof with PV"] == "50.0"][["Address", "Postcode"]]

View file

@ -37,6 +37,78 @@ def sap_to_epc(sap_points: int | float):
return "G"
def extract_wall_details_summary(text):
"""
Extracts wall type, insulation, dry-lining, and thickness for each building part,
including any alternative wall details within the 7.0 Walls section of the summary PDF text.
"""
# Define data structure to hold all building part wall entries
wall_data = []
# Locate the entire 7.0 Walls section
wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1)
# Define pattern to match each building part's wall entry within the section
building_part_pattern = re.compile(
r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label
r"Type\s+(.*?)\n" # Matches main wall Type
r"Insulation\s+(.*?)\n" # Matches main wall Insulation
r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining
r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown
r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness
re.DOTALL
)
# Define pattern to capture alternative wall details, if present
alternative_wall_pattern = re.compile(
r"Alternative Wall Area.*?\n" # Matches start of alternative wall section
r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type
r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation
r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining
r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown
r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness
re.DOTALL
)
# Find all building part entries within the 7.0 Walls section
for match in building_part_pattern.finditer(wall_section):
wall_label = match.group(1).strip()
main_wall_type = match.group(2).strip()
main_wall_insulation = match.group(3).strip()
main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A"
main_wall_thickness_unknown = match.group(6).strip()
main_wall_thickness = int(match.group(7))
# Initialize dictionary for this wall entry
wall_entry = {
"Building Part": wall_label,
"Wall Type": main_wall_type,
"Wall Insulation": main_wall_insulation,
"Wall Dry-lining": main_wall_dry_lining,
"Wall Thickness Unknown": main_wall_thickness_unknown,
"Wall Thickness (mm)": main_wall_thickness,
"Alternative Wall Type": None,
"Alternative Wall Insulation": None,
"Alternative Wall Dry-lining": "N/A",
"Alternative Wall Thickness Unknown": None,
"Alternative Wall Thickness (mm)": None,
}
# Check if there's an alternative wall section following this wall entry
alt_match = alternative_wall_pattern.search(wall_section, match.end())
if alt_match:
wall_entry["Alternative Wall Type"] = alt_match.group(1).strip()
wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip()
wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A"
wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip()
wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6))
# Append each building part as a dictionary in the wall_data list
wall_data.append(wall_entry)
return wall_data
def extract_summary_report(pdf_path):
"""
Extracts specific data from the provided PDF file.
@ -45,6 +117,7 @@ def extract_summary_report(pdf_path):
- Fuel Bill
- Address
"""
data = {
"Address": None,
"Postcode": None,
@ -80,6 +153,14 @@ def extract_summary_report(pdf_path):
"Main Roof Type": None,
"Main Roof Insulation": None,
"Main Roof Insulation Thickness": None,
"Main Wall Type": None,
"Main Wall Insulation": None,
"Main Wall Dry-lining": None,
"Main Wall Thickness": None,
"Main Building Alternative Wall Type": None,
"Main Building Alternative Wall Insulation": None,
"Main Building Alternative Wall Dry-lining": None,
"Main Building Alternative Wall Thickness": None,
}
with (open(pdf_path, "rb") as file):
@ -229,6 +310,18 @@ def extract_summary_report(pdf_path):
insulation_thickness_match.strip() if insulation_thickness_match else None
)
walls_data = extract_wall_details_summary(text)
# Get the main building wall data
main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0]
data["Main Wall Type"] = main_building_walls["Wall Type"]
data["Main Wall Insulation"] = main_building_walls["Wall Insulation"]
data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"]
data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"]
data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"]
data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"]
data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"]
data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"]
return data
@ -498,10 +591,64 @@ def extract_roof_details_epr(text):
return roof_data
def extract_wall_details_epr(text):
"""
Extracts wall type, insulation, dry-lining, and thickness for each building part
in the provided EPR PDF text.
"""
# Define data structure to hold results
wall_data = []
# Locate each building part section
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
# Extract each building part's data, including wall details
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
# Clean up the building part name
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
# Extract Wall Type, Wall Insulation, Wall Dry-lining, and Wall Thickness
wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details)
wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
# Extract Alternative Wall information if available
alt_wall_type_match = re.search(r"Alternative Wall Type:\s*(.*?)(?=\n|$)", part_details)
alt_wall_insulation_match = re.search(r"Alternative Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
alt_wall_drylining_match = re.search(r"Alternative Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
alt_wall_thickness_match = re.search(r"Alternative Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
# Store results for this building part
wall_data.append({
"Building Part": cleaned_part_name,
"Wall Type": wall_type_match.group(1).strip() if wall_type_match else None,
"Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None,
"Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None,
"Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None,
"Alternative Wall Type": alt_wall_type_match.group(1).strip() if alt_wall_type_match else None,
"Alternative Wall Insulation": alt_wall_insulation_match.group(
1).strip() if alt_wall_insulation_match else None,
"Alternative Wall Dry-lining": alt_wall_drylining_match.group(
1).strip() if alt_wall_drylining_match else None,
"Alternative Wall Thickness": int(alt_wall_thickness_match.group(1)) if alt_wall_thickness_match else None,
})
return wall_data
def extract_epr(pdf_path):
"""
Extracts specific data from an Energy Report (EPR) PDF file.
"""
data = {
"Address": None,
"Postcode": None,
@ -539,6 +686,14 @@ def extract_epr(pdf_path):
"Main Roof Type": None,
"Main Roof Insulation": None,
"Main Roof Insulation Thickness": None,
"Main Wall Type": None,
"Main Wall Insulation": None,
"Main Wall Dry-lining": None,
"Main Wall Thickness": None,
"Main Building Alternative Wall Type": None,
"Main Building Alternative Wall Insulation": None,
"Main Building Alternative Wall Dry-lining": None,
"Main Building Alternative Wall Thickness": None,
}
with open(pdf_path, "rb") as file:
@ -664,6 +819,17 @@ def extract_epr(pdf_path):
data["Main Roof Insulation"] = main_roof_details[0]["Roof Insulation"]
data["Main Roof Insulation Thickness"] = main_roof_details[0]["Roof Insulation Thickness"]
wall_details = extract_wall_details_epr(text)
main_wall_details = [w for w in wall_details if "Main" in w["Building Part"]][0]
data["Main Wall Type"] = main_wall_details["Wall Type"]
data["Main Wall Insulation"] = main_wall_details["Wall Insulation"]
data["Main Wall Dry-lining"] = main_wall_details["Wall Dry-lining"]
data["Main Wall Thickness"] = main_wall_details["Wall Thickness"]
data["Main Building Alternative Wall Type"] = main_wall_details["Alternative Wall Type"]
data["Main Building Alternative Wall Insulation"] = main_wall_details["Alternative Wall Insulation"]
data["Main Building Alternative Wall Dry-lining"] = main_wall_details["Alternative Wall Dry-lining"]
data["Main Building Alternative Wall Thickness"] = main_wall_details["Alternative Wall Thickness"]
return data
@ -1411,5 +1577,46 @@ def find_remaining_surveys():
assert needed.shape[0] + costed.shape[0] == surveyed.shape[0]
def append_stonewater_id():
"""
This completes an adhoc request from Stonewater to add in their organisation Reference onto the model
:return:
"""
model_proposed_sample = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater - Bid Packages WIP 13.11.24.xlsx",
sheet_name="Modelled Packages",
header=13
)
model_proposed_sample = model_proposed_sample[~pd.isnull(model_proposed_sample["Address ID"])]
model_proposed_sample["Address ID"] = model_proposed_sample["Address ID"].astype(int)
z = model_proposed_sample["Archetype ID"].drop_duplicates().sort_values()
original_archetypes = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 "
"- Archetyped V3.1.xlsx",
header=4
)
original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])]
original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"]
original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int)
matched = model_proposed_sample.merge(
original_archetypes[["Address ID", 'Org. ref.']],
on="Address ID",
how="left"
)
if pd.isnull(matched["Org. ref."]).sum():
raise ValueError("Something went wrong")
# Save as CSV
matched.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater IDs.xlsx",
sheet_name="Proposed Wave 3 Sample",
index=False
)
# if __name__ == "__main__":
# main()

View file

@ -1,3 +1,4 @@
import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime
@ -25,7 +26,7 @@ class RetrieveFindMyEpc:
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
def retrieve_newest_find_my_epc_data(self):
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
@ -188,7 +189,7 @@ class RetrieveFindMyEpc:
raise ValueError(f"Missing key: {key}")
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations)
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
resulting_data = {
'epc_certificate': epc_certificate,
@ -204,11 +205,13 @@ class RetrieveFindMyEpc:
return resulting_data
def format_recommendations(self, recommendations):
@staticmethod
def format_recommendations(recommendations, assessment_data, sap_2012_date=None):
"""
This function converts the recommendations to a format that we can use in the engine as a non-intrusive survey
:param recommendations:
:return:
:param recommendations: The recommendations from the EPC
:param assessment_data: The assessment data from the EPC
:param sap_2012_date: The date of the SAP 2012 update
"""
measure_map = {
@ -217,6 +220,7 @@ class RetrieveFindMyEpc:
"Hot water cylinder thermostat": ["cylinder_thermostat"],
"High performance external doors": ["insulated_doors"],
"Floor insulation (solid floor)": ["solid_floor_insulation"],
"Floor insulation (suspended floor)": ["suspended_floor_insulation"],
"Double glazed windows": ["double_glazing"],
"Cavity wall insulation": ["cavity_wall_insulation"],
"Replace boiler with new condensing boiler": ["boiler_upgrade"],
@ -225,19 +229,42 @@ class RetrieveFindMyEpc:
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Low energy lighting": ["low_energy_lighting"],
"Increase loft insulation to 270 mm": ["loft_insulation"],
"Heating controls (thermostatic radiator valves)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Solar water heating": ["solar_water_heating"],
"Solar photovoltaic panels, 2.5 kWp": ["solar_pv"],
"Heating controls (room thermostat and TRVs)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Change heating to gas condensing boiler": ["boiler_upgrade"],
"Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heaters"],
"Flat roof or sloping ceiling insulation": ["flat_roof_insulation"],
"Heating controls (room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Band A condensing boiler": ["boiler_upgrade"],
"Double glazing": ["double_glazing"],
}
survey = True
if sap_2012_date is not None:
certificate_date = datetime.strptime(assessment_data["Date of certificate"], "%d %B %Y")
if certificate_date < pd.to_datetime(sap_2012_date):
survey = False
formatted_recommendations = []
for rec in recommendations:
mapped = measure_map[rec["measure"]]
for measure in mapped:
formatted_recommendations.append(
{
"type": measure,
"sap_points": rec["sap_points"],
"survey": True
}
)
to_append = {
"type": measure,
"sap_points": rec["sap_points"],
"survey": survey,
}
if measure == "solar_pv":
to_append["suitable"] = True
formatted_recommendations.append(to_append)
return formatted_recommendations

View file

@ -60,15 +60,21 @@ class HotwaterRecommendations:
# If there is no system present, but access to the mains, we
has_tank_recommendation = [r for r in self.recommendations if r["type"] == "hot_water_tank_insulation"]
if (
(self.property.hotwater["heater_type"] in ["electric immersion"]) &
(self.property.data["hot-water-energy-eff"] == "Very Poor") &
(self.property.hotwater["no_system_present"] is None)
(self.property.hotwater["no_system_present"] is None) &
len(has_tank_recommendation) == 0
):
self.recommend_tank_insulation(phase=phase)
return
if self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat":
has_cylinder_recommendation = [r for r in self.recommendations if r["type"] == "cylinder_thermostat"]
if ((self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat") &
(len(has_cylinder_recommendation) == 0)):
self.recommend_cylinder_thermostat(phase=phase)
return

View file

@ -10,8 +10,8 @@ class SecondaryHeating:
"""
# The list of existing heating systems that are accepted
ACCEPTED_MAINHEAT_DESCRIPTIONS = ["Boiler and radiators, mains gas"]
ACCEPTED_SECONDHEAT_DESCRIPTIONS = ["Room heaters, electric"]
ACCEPTED_MAINHEAT_DESCRIPTIONS = ["Boiler and radiators, mains gas", "Electric storage heaters"]
ACCEPTED_SECONDHEAT_DESCRIPTIONS = ["Room heaters, electric", 'Portable electric heaters (assumed)']
# These are the heaters where works are required to remove them
FIXED_HEATER_DESCRIPTIONS = ["Room heaters, electric"]
@ -34,7 +34,7 @@ class SecondaryHeating:
if self.property.data['secondheat-description'] in self.FIXED_HEATER_DESCRIPTIONS:
# We have an associated cost otherwise, there is no cost
n_rooms = self.property.data['number-heated-rooms']
n_rooms = self.property.data['number-habitable-rooms'] - self.property.data['number-heated-rooms']
else:
n_rooms = 0