Merge pull request #378 from Hestia-Homes/boreham-wood-sample

Boreham wood sample
This commit is contained in:
KhalimCK 2025-03-03 14:38:32 +00:00 committed by GitHub
commit 41f3998c1d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 4700 additions and 639 deletions

6
.idea/terraform.xml generated Normal file
View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="TerraformProjectSettings">
<option name="toolPath" value="/opt/homebrew/bin/terraform" />
</component>
</project>

1518
asset_list/AssetList.py Normal file

File diff suppressed because it is too large Load diff

480
asset_list/app.py Normal file
View file

@ -0,0 +1,480 @@
import os
import time
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from pprint import pprint
import msgpack
from utils.s3 import read_from_s3
from asset_list.AssetList import AssetList
from asset_list.mappings.property_type import PROPERTY_MAPPING
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(
df, manual_uprn_map, epc_api_only=False, row_id_name="row_id"
):
uprn_column = AssetList.STANDARD_UPRN
fulladdress_column = AssetList.STANDARD_FULL_ADDRESS
address1_column = AssetList.STANDARD_ADDRESS_1
postcode_column = AssetList.STANDARD_POSTCODE
# These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat"
}
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(df.iterrows(), total=len(df)):
try:
# If we have a block of flats, we cannot retrieve this data
if home[AssetList.STANDARD_PROPERTY_TYPE] == "block of flats":
no_epc.append(home[row_id_name])
continue
postcode = home[postcode_column]
house_number = str(home[address1_column]).strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
if uprn is None and home.get(uprn_column):
uprn = home[uprn_column]
if pd.isnull(uprn):
uprn = None
property_type = property_type_map.get(home[AssetList.STANDARD_PROPERTY_TYPE], None)
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
# As a final resort, we estimate the EPC
if property_type is not None and searcher.newest_epc is None:
searcher.ordnance_survey_client.property_type = property_type
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home[row_id_name])
continue
if epc_api_only:
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_data = {}
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home[row_id_name])
time.sleep(5)
return epc_data, errors, no_epc
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list["address1_extracted"] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
raise ValueError(f"Method {method} not recognized")
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
Data request contents:
Date of last EPC
Reason for EPC
SAP score on register
Property Type
Property Area
Property Age
Any Dimensions (HLP,PW,RH)
Property Wall Construction
Heating Type
Secondary Heating
Loft Insulation Depth
Additional if possible:
Heat loss calculations
EPC recommendations
Property UPRN
"""
# TODO:
# For cavity work:
# - Flag any entries that have a different wall type between non-intrusive data against EPC
# - Worth double checking entries that have a difference in wall construction
# - Look at anything that is flagged as an empty cavity but the EPC data says its a filled cavity
# - Look at the current EPC scores - Anything that is C75 or above, especially if its assumed no insulation
# - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats
# are less than C75
# - Flag anything pre SAP2012
# - Flag anything over 5 years old
# - Look at year built vs age band
#
# For Solar:
# - Discount any that have solar PV - based on non-intrusives and from the inspections team
# - In the heating, discount anything that isnt ashp, ghsp, hhrs, electric storage - possibly homes with
# electric room heaters but it might need to be an EPC E
# - Fabric - check the floor, wall and roof:
# - Filled or empty cavity is good
# - Insulated solid/timber/system built is good
# - SCIS/CEG needs solid floors
# - JJC dont care
# - Anything with a loft 200 or below
# - Anything C75 and above wont qualify
# - Insulated loft = 200mm
# - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
# - Or the insulation required is loft/cavity (floors should be solid)
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester"
data_filename = "Warmfront data- Colchester Borough Homes (Complete).xlsx"
sheet_name = "Sheet1"
postcode_column = 'Full Address.1'
fulladdress_column = "Full Address"
address1_column = None
address1_method = "first_word"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = "Build Date"
landlord_os_uprn = None
landlord_property_type = "Property Type"
landlord_wall_construction = "Wallinsul"
landlord_heating_system = "HeatSorc"
landlord_existing_pv = None
landlord_property_id = "Property Reference"
# For Westward
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward"
# data_filename = "WESTWARD - completed list..xlsx"
# sheet_name = "Sheet1"
# postcode_column = "WFT EDIT Postcode"
# fulladdress_column = "Address"
# address1_column = None
# address1_method = "house_number_extraction"
# address_cols_to_concat = []
# missing_postcodes_method = None
# landlord_year_built = "Build date"
# landlord_os_uprn = "UPRN"
# landlord_property_type = "Location type"
# landlord_wall_construction = "Wall Construction (EPC)"
# landlord_heating_system = "Heat Source"
# landlord_existing_pv = "PV (Y/N)"
# landlord_property_id = "Place ref"
# Maps addresses to uprn in problematic cases
manual_uprn_map = {}
asset_list = AssetList(
local_filepath=os.path.join(data_folder, data_filename),
header=0,
sheet_name=sheet_name,
address1_colname=address1_column,
postcode_colname=postcode_column,
landlord_property_id=landlord_property_id,
full_address_colname=fulladdress_column,
full_address_cols_to_concat=address_cols_to_concat,
missing_postcodes_method=missing_postcodes_method,
address1_extraction_method=address1_method,
landlord_year_built=landlord_year_built,
landlord_uprn=landlord_os_uprn,
landlord_property_type=landlord_property_type,
landlord_wall_construction=landlord_wall_construction,
landlord_heating_system=landlord_heating_system,
landlord_existing_pv=landlord_existing_pv
)
asset_list.init_standardise()
# We produce the new maps, which can be saved for future useage
new_property_type_map = PROPERTY_MAPPING.copy().update(
asset_list.variable_mappings[asset_list.landlord_property_type] if asset_list.landlord_property_type else {}
)
new_wall_map = WALL_CONSTRUCTION_MAPPINGS.copy().update(
asset_list.variable_mappings[asset_list.landlord_wall_construction] if
asset_list.landlord_wall_construction else {}
)
new_heating_map = HEATING_MAPPINGS.copy().update(
asset_list.variable_mappings[asset_list.landlord_heating_system] if asset_list.landlord_heating_system else {}
)
new_existing_pv_map = EXISTING_PV_MAPPINGS.copy().update(
asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {}
)
asset_list.apply_standardiation()
### We retrieve the EPC data
# We chunk up this data into 5000 rows at a time
# Create the chunks directory
force_retrieve_data = False
skip = None # Used to skip already completed chunks
chunk_size = 5000
filename = "Chunk {i}.csv"
download_folder = os.path.join(data_folder, "Chunks")
if not os.path.exists(download_folder):
os.makedirs(download_folder)
chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size))
downloaded_files = {filename.format(i=i) for i in chunk_indexes}
# We check if we have files associated to these files already and if we do, and we do not want to force the
# fetching of the data, we skip
folder_contents = os.listdir(download_folder)
if all(x in folder_contents for x in downloaded_files):
skip = max(chunk_indexes)
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
manual_uprn_map=manual_uprn_map,
)
# We now retrieve any failed properties
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
epc_data_failed, _, _ = get_data(
df=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
manual_uprn_map=manual_uprn_map,
epc_api_only=False
)
epc_data_chunk.extend(epc_data_failed)
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False)
# Store the errors and no-data locally
with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
with open(os.path.join(data_folder, f"Chunks/Chunk {i} nodata.csv"), "w") as f:
json.dump(no_epc_chunk, f)
# We read in and concatenate the created created chunks
# List the contents
epc_data = []
for file in downloaded_files:
csv_data = pd.read_csv(os.path.join(download_folder, file))
# We need to convert the recommendations back to a list
csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)
epc_data.append(csv_data)
epc_df = pd.concat(epc_data)
epc_df["estimated"] = epc_df["estimated"].fillna(False)
# We expand out the recommendations
recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]]
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
transformed_data = []
for _, row in recommendations_df.iterrows():
# Initialize a dictionary for this row with False for all recommendations
row_data = {col: False for col in columns}
row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID]
# Set True for each recommendation present in this row
for rec in row["recommendations"]:
recommendation_text = rec["improvement-summary-text"]
row_data[recommendation_text] = True
# Append the row data to transformed_data
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
transformed_df = transformed_df[
[
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
"Floor insulation", "Floor insulation (suspended floor)"
]
]
transformed_df["epc_has_floor_recommendation"] = (
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
transformed_df["Floor insulation (suspended floor)"]
)
# Get the find my epc data
find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop(
columns=["find_my_epc_data"]).join(
pd.json_normalize(epc_df["find_my_epc_data"])
)
find_my_epc_data = find_my_epc_data.merge(
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
how="left", on=asset_list.DOMNA_PROPERTY_ID
)
# We check if we get the solar pv column:
if "Solar photovoltaics" not in find_my_epc_data.columns:
find_my_epc_data["Solar photovoltaics"] = False
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
].rename(
columns=asset_list.EPC_API_DATA_NAMES
)
epc_df = epc_df.merge(
find_my_epc_data[
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
]
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
on=asset_list.DOMNA_PROPERTY_ID
)
asset_list.merge_data(epc_df)
asset_list.extract_attributes()
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
# TODO: We should break out the identification of work types to flag blocks of flats specifically
asset_list.identify_worktypes(cleaned)
pprint(asset_list.work_type_figures)
asset_list.flat_analysis()
# Store as an excel
filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx"
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
with pd.ExcelWriter(filename) as writer:
asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False)
asset_list.flat_data.to_excel(writer, sheet_name="Flat Data", index=False)

View file

@ -0,0 +1,12 @@
STANDARD_EXISTING_PV = {
"already has PV", "no PV", "unknown"
}
EXISTING_PV_MAPPINGS = {
"NO": "no PV",
"YES": "already has PV",
"no": "no PV",
"yes": "already has PV",
True: "already has PV",
False: "no PV",
}

View file

@ -0,0 +1,67 @@
import numpy as np
STANDARD_HEATING_SYSTEMS = {
"gas combi boiler",
"electric storage heaters",
"district heating",
"gas condensing boiler",
"oil boiler",
"gas condensing combi",
"air source heat pump",
"boiler - other fuel",
"ground source heat pump",
"electric radiators",
"other",
"electric boiler",
"unknown",
"communal gas boiler",
"high heat retention storage heaters",
}
HEATING_MAPPINGS = {
"Combi - GAS": "gas combi boiler",
"E7 Storage Heaters": "electric storage heaters",
"District heating system": "district heating",
"Condensing Boiler - GAS": "gas condensing boiler",
"Boiler Oil/other": "oil boiler",
"Condensing Combi - Gas": "gas condensing combi",
"Air Source Source Heat Pump": "air source heat pump",
"Biomass Boiler": "boiler - other fuel",
"Ground Source Heat Pump": "ground source heat pump",
"Electric Oil filled radiators": "electric radiators",
"Solid Fuel": "other",
"LPG Boiler": "boiler - other fuel",
"Electric Boiler": "electric boiler",
"No data": "unknown",
"Boiler Communal/Commercial - GAS": "communal gas boiler",
"Eco Electric Radiators": "electric radiators",
"Gas fire": "other",
"Backboiler - Solid fuel": "other",
'combi - gas': 'gas combi boiler',
'e7 storage heaters': 'electric storage heaters',
'district heating system': 'district heating',
'condensing boiler - gas': 'gas condensing boiler',
'boiler oil/other': 'oil boiler',
'condensing combi - gas': 'gas condensing combi',
'air source source heat pump': 'air source heat pump',
'biomass boiler': 'boiler - other fuel',
'ground source heat pump': 'ground source heat pump',
'electric oil filled radiators': 'electric radiators',
'solid fuel': 'other',
'lpg boiler': 'boiler - other fuel',
'electric boiler': 'electric boiler',
'no data': 'unknown', 'boiler communal/commercial - gas': 'communal gas boiler',
'eco electric radiators': 'electric radiators',
'gas fire': 'other', 'backboiler - solid fuel': 'other',
'ASHP': 'air source heat pump',
'COMMHEAT': 'communal gas boiler',
'GBB': 'gas combi boiler',
'GFS': 'gas condensing boiler',
'GWA': 'gas condensing boiler',
'GWM': 'gas condensing combi',
'HDU': 'district heating',
'OILBLR': 'oil boiler',
'SOLIDFUEL': 'boiler - other fuel',
'STORHTR': 'electric storage heaters',
np.nan: 'unknown',
}

View file

@ -0,0 +1,25 @@
# These are the standard categories for property types
STANDARD_PROPERTY_TYPES = {
"house", "flat", "maisonette", "bungalow", "park home", "block house", "bedsit", "coach house",
"unknown", "other", "block of flats"
}
# This is a basic mapping that we use to map values that we've seen commonly to standard values
PROPERTY_MAPPING = {
"HOUSE": "house",
"FLAT": "flat",
"MAISONET": "maisonette",
"BUNGALOW": "bungalow",
"BLKHOUS": "block house",
"blkhous": "block house",
"BEDSIT": "bedsit",
"COACHSE": "coach house",
"coachse": "coach house",
'Admin Unit Type': 'unknown',
'Block': 'block of flats',
'Bungalow': 'bungalow',
'Flat': 'flat',
'House': 'house',
'Maisonette': 'maisonette',
'Stairwell': 'other'
}

View file

@ -0,0 +1,92 @@
STANDARD_WALL_CONSTRUCTIONS = {
"uninsulated cavity", "filled cavity", "partial insulated cavity", "cavity unknown insulation",
"uninsulated solid brick", "insulated solid brick", "solid brick unknown insulation",
"timber frame",
"system built", "granite or whinstone", "other", "unknown", "sandstone or limestone",
"cob",
"new build - average thermal transmittance",
}
WALL_CONSTRUCTION_MAPPINGS = {
"New Build - Average Thermal Transmittance": "new build - average thermal transmittance",
'Average thermal transmittance 0.25 W/m?K': 'unknown',
'Cavity wall, as built, insulated (assumed)': 'filled cavity',
'Average thermal transmittance 0.31 W/m?K': 'unknown',
'Cavity wall, as built, no insulation (assumed)': 'uninsulated cavity',
'Average thermal transmittance 0.30 W/m?K': 'unknown', 'Average thermal transmittance 0.28 W/m-¦K': 'unknown',
'Average thermal transmittance 0.25 W/m-¦K': 'unknown', 'Average thermal transmittance 0.21 W/m-¦K': 'unknown',
'Average thermal transmittance 0.20 W/m-¦K': 'unknown', 'Average thermal transmittance 0.29 W/m?K': 'unknown',
'Average thermal transmittance 0.16 W/m?K': 'unknown',
'Average thermal transmittance 0.27 W/m&#0178;K': 'unknown',
'Average thermal transmittance 0.15 W/m-¦K': 'unknown', 'Average thermal transmittance 0.23 W/m-¦K': 'unknown',
'Average thermal transmittance 0.18 W/m?K': 'unknown',
'Granite or whin, with internal insulation': 'granite or whinstone',
"Granite or whinstone, as built, insulated (assumed)": "granite or whinstone",
'Average thermal transmittance 0.22 W/m-¦K': 'unknown', 'Average thermal transmittance 0.24 W/m?K': 'unknown',
'Average thermal transmittance 0.16 W/m-¦K': 'unknown', 'Average thermal transmittance 0.35 W/m?K': 'unknown',
'Average thermal transmittance 0.26 W/m-¦K': 'unknown', 'Average thermal transmittance 0.62 W/m?K': 'unknown',
'Average thermal transmittance 0.64 W/m?K': 'unknown', 'Average thermal transmittance 0.61 W/m?K': 'unknown',
'Sandstone or limestone, as built, no insulation (assumed)': 'sandstone or limestone',
'Average thermal transmittance 0.33 W/m?K': 'unknown',
'Cavity wall,': "cavity unknown insulation",
'Cavity wall, as built, partial insulation (assumed)': 'partial insulated cavity',
'Average thermal transmittance 0.29 W/m-¦K': 'unknown', 'Average thermal transmittance 0.32 W/m-¦K': 'unknown',
'Average thermal transmittance 0.19 W/m-¦K': 'unknown', 'Average thermal transmittance 0.27 W/m?K': 'unknown',
'Average thermal transmittance 0.22 W/m?K': 'unknown', 'Average thermal transmittance 0.38 W/m?K': 'unknown',
'Average thermal transmittance 0.26 W/m?K': 'unknown', 'Average thermal transmittance 0.27 W/m-¦K': 'unknown',
'Average thermal transmittance 0.18 W/m-¦K': 'unknown', 'Average thermal transmittance = 0.27 W/m?K': 'unknown',
'Cavity wall, with external insulation': 'filled cavity', 'Average thermal transmittance 0.21 W/m?K': 'unknown',
'Average thermal transmittance 0.23 W/m?K': 'unknown', 'Average thermal transmittance 0.20 W/m?K': 'unknown',
'Average thermal transmittance 0.32 W/m?K': 'unknown', 'Average thermal transmittance 0.24 W/m-¦K': 'unknown',
'Cavity wall, with internal insulation': 'filled cavity',
'Average thermal transmittance 0.17 W/m-¦K': 'unknown', 'Average thermal transmittance 0.28 W/m?K': 'unknown',
'new build - average thermal transmittance': 'new build - average thermal transmittance',
'average thermal transmittance 0.25 w/m?k': 'unknown',
'cavity wall, as built, insulated (assumed)': 'filled cavity',
'average thermal transmittance 0.31 w/m?k': 'unknown',
'cavity wall, as built, no insulation (assumed)': 'uninsulated cavity',
'average thermal transmittance 0.30 w/m?k': 'unknown', 'average thermal transmittance 0.28 w/m-¦k': 'unknown',
'average thermal transmittance 0.25 w/m-¦k': 'unknown', 'average thermal transmittance 0.21 w/m-¦k': 'unknown',
'average thermal transmittance 0.20 w/m-¦k': 'unknown', 'average thermal transmittance 0.29 w/m?k': 'unknown',
'average thermal transmittance 0.16 w/m?k': 'unknown', 'average thermal transmittance 0.27 w/m&#0178;k': 'unknown',
'average thermal transmittance 0.15 w/m-¦k': 'unknown', 'average thermal transmittance 0.23 w/m-¦k': 'unknown',
'average thermal transmittance 0.18 w/m?k': 'unknown',
'granite or whin, with internal insulation': 'granite or whinstone',
'average thermal transmittance 0.22 w/m-¦k': 'unknown', 'average thermal transmittance 0.24 w/m?k': 'unknown',
'average thermal transmittance 0.16 w/m-¦k': 'unknown', 'average thermal transmittance 0.35 w/m?k': 'unknown',
'average thermal transmittance 0.26 w/m-¦k': 'unknown', 'average thermal transmittance 0.62 w/m?k': 'unknown',
'average thermal transmittance 0.64 w/m?k': 'unknown', 'average thermal transmittance 0.61 w/m?k': 'unknown',
'sandstone or limestone, as built, no insulation (assumed)': 'sandstone or limestone',
'average thermal transmittance 0.33 w/m?k': 'unknown', 'cavity wall,': "cavity unknown insulation",
'cavity wall, as built, partial insulation (assumed)': 'partial insulated cavity',
'average thermal transmittance 0.29 w/m-¦k': 'unknown', 'average thermal transmittance 0.32 w/m-¦k': 'unknown',
'average thermal transmittance 0.19 w/m-¦k': 'unknown', 'average thermal transmittance 0.27 w/m?k': 'unknown',
'average thermal transmittance 0.22 w/m?k': 'unknown', 'average thermal transmittance 0.38 w/m?k': 'unknown',
'average thermal transmittance 0.26 w/m?k': 'unknown', 'average thermal transmittance 0.27 w/m-¦k': 'unknown',
'average thermal transmittance 0.18 w/m-¦k': 'unknown', 'average thermal transmittance = 0.27 w/m?k': 'unknown',
'cavity wall, with external insulation': 'filled cavity', 'average thermal transmittance 0.21 w/m?k': 'unknown',
'average thermal transmittance 0.23 w/m?k': 'unknown', 'average thermal transmittance 0.20 w/m?k': 'unknown',
'average thermal transmittance 0.32 w/m?k': 'unknown', 'average thermal transmittance 0.24 w/m-¦k': 'unknown',
'cavity wall, with internal insulation': 'filled cavity', 'average thermal transmittance 0.17 w/m-¦k': 'unknown',
'average thermal transmittance 0.28 w/m?k': 'unknown',
'Cavity wall, filled cavity': 'filled cavity',
'Cavity wall, filled cavity and external insulation': 'filled cavity',
'Granite or whinstone, as built, no insulation (assumed)': 'granite or whinstone',
'Solid brick, as built, insulated (assumed)': 'insulated solid brick',
'Solid brick, as built, no insulation (assumed)': 'uninsulated solid brick',
'Solid brick, with external insulation': 'insulated solid brick',
'Solid brick, with internal insulation': 'insulated solid brick',
'System built, as built, insulated (assumed)': 'system built',
'System built, as built, no insulation (assumed)': 'system built',
'System built, with external insulation': 'system built',
'System built, with internal insulation': 'system built',
'Timber frame, as built, insulated (assumed)': 'timber frame',
'Timber frame, as built, no insulation (assumed)': 'timber frame',
'Timber frame, as built, partial insulation (assumed)': 'timber frame',
'Timber frame, with additional insulation': 'timber frame',
'CAVITY': 'cavity unknown insulation',
'COMB': 'unknown',
'NONE': 'unknown',
'NOTKNOWN': 'unknown',
'SOLID': 'solid brick unknown insulation',
}

View file

@ -0,0 +1,12 @@
postal
pandas
usaddress
pydantic-settings==2.6.0
epc-api-python==1.0.2
fuzzywuzzy
boto3
openpyxl
openai
tiktoken
msgpack
beautifulsoup4

View file

@ -0,0 +1,5 @@
from asset_list.AssetList import AssetList
def test_multi_unit_address_flagging():
assert AssetList._identify_multi_address('Block (Rooms 1-4), 23 Clifton Hill, Newtown, Exeter, EX1 2DL')

View file

@ -149,7 +149,8 @@ class Funding:
:return:
"""
measure_table = pd.DataFrame([
m for m in self.recommendations if m in measures and m["default"]
m for m in self.recommendations if
(m["type"] in measures) or (m["measure_type"] in measures) and m["default"]
])
measure_table["post_install_sap"] = measure_table["sap_points"] + self.starting_sap
@ -180,13 +181,10 @@ class Funding:
measure_table["cost_minus_funding"] = measure_table["total"] - measure_table["estimated_funding"]
measure_table["cost_minus_funding_per_sap"] = measure_table["cost_minus_funding"] / measure_table["sap_points"]
measure_table = measure_table.sort_values(["cost_minus_funding_per_sap", "total"], ascending=[True, False])
# Recommend the measure, with estimated funding amount
recommended_measure = measure_table.head(1)
return {
"measure_type": recommended_measure["measure_type"],
"estimated_funding": recommended_measure["estimated_funding"]
}
return measure_table[
["type", "measure_type", "Cost Savings", "estimated_funding"]
].rename(columns={"Cost Savings": "project_score"}).to_dict("records")
def sap_to_eco_band(self, sap_points):
"""

View file

@ -395,6 +395,7 @@ class Property:
primary_recommendation_id=rec["recommendation_id"],
non_invasive_recommendations=self.non_invasive_recommendations,
)
self.recommendations_scoring_data.append(scoring_dict)
simulation_epc = self.epc_record.prepared_epc.copy()
@ -1258,6 +1259,12 @@ class Property:
if (self.building_id is not None) and (self.solar_panel_configuration is not None):
return True
# If the property is in a conservation area, is listed or is a heriage building, solar panels
# become a difficult measure to generally get through planning restrictions and so we do not recommend
# solar panels
if self.restricted_measures:
return False
is_valid_property_type = self.data["property-type"] in ["House", "Bungalow", "Maisonette"]
is_valid_roof_type = (
self.roof["is_flat"] or self.roof["is_pitched"] or self.roof["is_roof_room"]

View file

@ -208,9 +208,14 @@ class SearchEpc:
try:
# Updated regex to catch house numbers including alphanumeric ones
pattern = r'(?i)(?:flat|apartment)\s*(\d+\w*)|^\s*(\d+\w*)'
match = re.search(pattern, address)
if match:
return next(g for g in match.groups() if g is not None)
match1 = re.search(pattern, address)
if match1:
return next(g for g in match1.groups() if g is not None)
pattern2 = r'(?i)(flat|apartment)\s*([a-zA-Z]?\d+[a-zA-Z]?)'
match2 = re.search(pattern2, address)
if match2:
return match2.group(2)
parsed = usaddress.parse(address)
# First, try to get the 'OccupancyIdentifier' if 'OccupancyType' is detected
@ -221,7 +226,8 @@ class SearchEpc:
continue
if part == postcode.split(" ")[1]:
continue
return part # This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary
return part.rstrip(
",") # This assumes the first 'OccupancyIdentifier' after 'OccupancyType' is the primary
# number
# Fallback to 'AddressNumber' if no 'OccupancyIdentifier' is found
@ -331,6 +337,9 @@ class SearchEpc:
if row["lmk-key"] not in seen and not seen.add(row["lmk-key"])
]
if data["rows"]:
api_response["msg"] = self.SUCCESS
return api_response["msg"]
def filter_rows(self, rows, property_type=None, address=None):

View file

@ -54,4 +54,5 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
"Gas instantaneous at point of use": {"fuel": "Natural Gas", "cop": 0.85},
"Room heaters, wood logs": {"fuel": "Wood Logs", "cop": 1},
"Boiler and radiators, coal": {"fuel": "Coal", "cop": 0.85},
"From main system, no cylinderstat": {"fuel": "Natural Gas", "cop": 0.85},
}

View file

@ -338,7 +338,7 @@ def extract_property_request_data(
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else True
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
@ -370,7 +370,7 @@ def extract_property_request_data(
property_non_invasive_recommendations["recommendations"] = str(transformed)
# Check if the valuation data has uprn
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else True
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
if valuation_has_uprn:
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
@ -639,8 +639,10 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
columns=[
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"
]
)
all_predictions = await model_api.async_paginated_predictions(
@ -692,7 +694,8 @@ async def trigger_plan(body: PlanTriggerRequest):
Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
kwh_simulation_predictions=kwh_simulation_predictions,
property_recommendations=property_recommendations
property_recommendations=property_recommendations,
ashp_cop=body.ashp_cop
)
)
property_instance.current_energy_bill = property_current_energy_bill
@ -822,7 +825,7 @@ async def trigger_plan(body: PlanTriggerRequest):
property_recommendations=recommendations[p.id],
project_scores_matrix=eco_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
gbis_abs_rate=20,
gbis_abs_rate=15,
eco4_abs_rate=15,
)
funding_calulator.check_eligibiltiy()

View file

@ -80,3 +80,5 @@ class PlanTriggerRequest(BaseModel):
multi_plan: Optional[bool] = False
optimise: Optional[bool] = True
default_u_values: Optional[bool] = True
ashp_cop: Optional[float] = 2.8

View file

@ -1,5 +1,4 @@
import numpy as np
from scipy.constants import value
class PropertyValuation:
@ -216,6 +215,30 @@ class PropertyValuation:
cls.UPRN_VALUE_LOOKUP.get(property_instance.uprn)
)
current_epc = property_instance.data["current-energy-rating"]
if not current_value:
return {
"current_value": 0,
"lower_bound_increased_value": 0,
"upper_bound_increased_value": 0,
"average_increased_value": 0,
"average_increase": 0
}
return cls.estimate_valuation_improvement(current_value, current_epc, target_epc, total_cost)
@classmethod
def estimate_valuation_improvement(cls, current_value, current_epc, target_epc, total_cost=None):
"""
This function estimates the value of a property based on the current EPC rating and the target EPC rating
:param current_value:
:param current_epc:
:param target_epc:
:param total_cost:
:return:
"""
if not current_value:
return {
"current_value": 0,
@ -225,7 +248,6 @@ class PropertyValuation:
"average_increase": 0
}
current_epc = property_instance.data["current-energy-rating"]
# We get the spectrum of ratings between the current and target EPC
epc_band_range = cls.EPC_BANDS[cls.EPC_BANDS.index(current_epc): cls.EPC_BANDS.index(target_epc) + 1]

View file

@ -48,3 +48,12 @@ class TestSearchEpcIntegration:
assert epc_searcher.newest_epc["lmk-key"] == lmk_key
assert epc_searcher.newest_epc["uprn"] == uprn
assert len(epc_searcher.older_epcs) == n_old_epcs
def test_search_housenumber(self):
eg1 = 'Flat A11, Mortimer House, Grendon Road, Exeter'
res1 = SearchEpc.get_house_number(eg1, None)
assert res1 == "A11"
eg2 = 'Flat A9, Mortimer House, Grendon Road, Exeter, EX1 2NL'
res2 = SearchEpc.get_house_number(eg2, None)
assert res2 == "A9"

View file

@ -132,7 +132,7 @@ def get_data(portfolio_id, scenario_ids):
return properties_data, plans_data, recommendations_data
properties_data, plans_data, recommendations_data = get_data(portfolio_id=124, scenario_ids=[199])
properties_data, plans_data, recommendations_data = get_data(portfolio_id=124, scenario_ids=[205])
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
@ -240,4 +240,7 @@ df["Predicted Post Works SAP"] = df["Current SAP Points"] + df["sap_points"]
df["Predicted Post Works SAP"] = df["Predicted Post Works SAP"].round()
df["Predicted Post Works EPC"] = df["Predicted Post Works SAP"].apply(lambda x: sap_to_epc(x))
df["Recommendation: Air Source Heat Pump"].sum()
df["Cost: Air Source Heat Pump"].sum()
df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Basildon Data Export - 2.csv", index=False)

View file

@ -0,0 +1,23 @@
import pandas as pd
data = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/Lambeth Reknocks.xlsx", sheet_name="Possible Route",
header=1
)
data["Outcomes"].value_counts()
# Strip out: No
df = data[data["Outcomes"] == "See notes"]
notes_df = df[
("Notes (If 'no answer' under outcomes, have you checked around the property for access issues where "
"possible?)")].value_counts().to_frame()
example = df[df["Notes (If 'no answer' under outcomes, have you checked around the property for access issues where "
"possible?)"] == ('Access to rear of property only through number 10. Overgrown athe rear of property '
'installer wont be able to access')
]
# 18 did not attend
#

View file

@ -0,0 +1,61 @@
import os
import pandas as pd
from dotenv import load_dotenv
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.route_march_data_pull.app import get_data
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
addresses = [
{"address": "3 Willis Road", "postcode": "CB1 2AQ"},
{"address": "22 Catharine Street", "postcode": "CB1 3AW"},
{"address": "332 Mill Road", "postcode": "CB1 3NN"},
{"address": "330 Mill Road", "postcode": "CB1 3NN"},
{"address": "328 Mill Road", "postcode": "CB1 3NN"},
{"address": "71 Mill Road", "postcode": "CB1 2AS"},
{"address": "78 Argyle Street", "postcode": "CB1 3LZ"},
{"address": "9 Graham Road", "postcode": "CB4 2ZE"},
{"address": "217 Mill Road", "postcode": "CB1 3BE"},
{"address": "374 Mill Road", "postcode": "CB1 3NN"},
{"address": "174 Thoday Street", "postcode": "CB1 3AX"},
{"address": "37 Abbey Road", "postcode": "CB5 8HH"},
{"address": "18 Upper Gwydir Street", "postcode": "CB1 2LR"},
{"address": "21 Fulbourn Road Fulbourn", "postcode": "CB1 9JL"},
{"address": "108 Argyle Street", "postcode": "CB1 3LS"},
{"address": "115 Victoria Road", "postcode": "CB4 3BS"},
{"address": "55 Ross Street", "postcode": "CB1 3BP"},
{"address": "16 Kingston Street", "postcode": "CB1 2NU"},
{"address": "13 Thoday Street", "postcode": "CB1 3AS"},
{"address": "103 York Street", "postcode": "CB1 2PZ"},
]
asset_list = pd.DataFrame(addresses)
asset_list["row_id"] = asset_list.index
epc_data, _, _ = get_data(
asset_list=asset_list, fulladdress_column="address", postcode_column="postcode", address1_column="address",
manual_uprn_map={}, epc_api_only=True
)
epc_df = pd.DataFrame(epc_data)
epc_df.shape
asset_list = asset_list.merge(
epc_df, how="left", on="row_id"
)
asset_list = asset_list.rename(columns={"address_x": "Address", "postcode_x": "Postcode"})
asset_list["uprn"] = asset_list["uprn"].astype(str)
spatial_data = OpenUprnClient.get_spatial_data([x["uprn"] for x in epc_data], bucket_name="retrofit-data-dev")
spatial_data["UPRN"] = spatial_data["UPRN"].astype(str)
asset_list = asset_list.merge(
spatial_data, how="left", left_on="uprn", right_on="UPRN"
)
asset_list.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Panacap/Acquisitions EPC Data.csv",
index=False)

View file

@ -4,7 +4,7 @@ from dotenv import load_dotenv
from utils.s3 import save_csv_to_s3
from etl.find_my_epc.AssetListEpcData import AssetListEpcData
PORTFOLIO_ID = 126
PORTFOLIO_ID = 134
USER_ID = 8
load_dotenv(dotenv_path="backend/.env")
@ -19,22 +19,24 @@ def app():
asset_list = [
{
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308249,
"address": "Flat 2, 42 Malden Road, London NW5 3HG",
"postcode": "NW5 3HG",
"uprn": 5117165,
},
{
"address": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308251
"address": "15 Bournville Lane",
"postcode": "B30 2JY",
"uprn": 100070301128
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1,
"uprn": 308250,
"address": "34 Bournville Lane",
"postcode": "B30 2LN",
"uprn": 100070301140
},
{
"address": "36 Bournville Lane",
"postcode": "B30 2LN",
"uprn": 100070301142
}
]
asset_list = pd.DataFrame(asset_list)
@ -65,20 +67,21 @@ def app():
valuation_data = [
{
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
"uprn": 5117165,
"valuation": 467_000
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
"uprn": 100070301128,
"valuation": 335_000
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"valuation": 337_000
}
"uprn": 100070301140,
"valuation": 276_000
},
{
"uprn": 100070301142,
"valuation": 276_000
},
]
# Store valuation data to s3
valuation_filename = f"{USER_ID}/{PORTFOLIO_ID}/valuation.csv"

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,7 @@
import os
import shutil
from tqdm import tqdm
from etl.access_reporting.app import SharePointClient
def delete_large_files():
@ -66,13 +67,17 @@ def delete_large_files():
def download_data_from_sharepoint():
# Given a sharepoint location, this function will download the retrofit assessment folders from the locations
# specified in the sharepoint location
from etl.access_reporting.app import SharePointClient
SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None)
SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None)
OSMOSIS_SHAREPOINT_SITE_ID = os.getenv("OSMOSIS_SHAREPOINT_SITE_ID", None)
sharepoint_client = SharePointClient(
tenant_id="10d5af8b-2cfd-4882-9ccd-b96e4812dacf",
client_id="6832a4c5-fb8c-4082-a746-4f51e1020f0d",
client_secret="xpC8Q~Frww48SM1V-D8lGy5iOY7P_cJ7FF3jgarQ",
site_id="bc925a9a-ad0b-4de9-9a3c-e61014cc7489"
tenant_id=SHAREPOINT_TENANT_ID,
client_id=SHAREPOINT_CLIENT_ID,
client_secret=SHAREPOINT_CLIENT_SECRET,
site_id=OSMOSIS_SHAREPOINT_SITE_ID
)
# Retrieve the data from Sharepoint and write to local machine
@ -81,9 +86,14 @@ def download_data_from_sharepoint():
folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders"
)
len(contents["value"])
folders_to_keep = [
"1. Herefordshire", "2. Bedfordshire", "3. Wiltshire", "4. Bournemouth",
"5. Coventry", "6. West Sussex", "7. Dorset", "8. Cambridgeshire",
"9. Guildford", "10. Little Island", "11. CCS Dorset",
]
folders_to_pull = [
folder for folder in contents["value"] if folder["name"] in ["3. Wiltshire", "4. Bournemouth", "5. Coventry"]
folder for folder in contents["value"] if folder["name"] in folders_to_keep
]
for folder_to_pull in folders_to_pull:
# Get the contents
@ -103,35 +113,42 @@ def download_data_from_sharepoint():
folder_path="Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders" + "/" +
folder_to_pull["name"] + "/" + property_folder["name"]
)
# We look for the retrofit assessment folder:
if not property_folder_contents.get("value"):
continue
# We look for the retrofit assessment folder or mtp folders:
property_sub_folders = [
f for f in property_folder_contents["value"] if "ra coordinator info" in f["name"].lower()
f for f in property_folder_contents["value"] if
"ra coordinator info" in f["name"].lower() or
"retrofit assessment" in f["name"].lower() or
"ra info" in f["name"].lower() or
"mtp" in f["name"].lower() or
"mid-term" in f["name"].lower()
]
if not property_sub_folders:
continue
# if we have this, we download the folder and store it on my laptop!
property_sub_folder = property_sub_folders[0]
for property_sub_folder in property_sub_folders:
# if we have this, we download the folder and store it on my laptop!
property_folder_path = os.path.join(
"Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders",
folder_to_pull["name"],
property_folder["name"],
property_sub_folder["name"]
)
property_folder_path = os.path.join(
"Osmosis ACD/Osmosis ACD Projects/Stonewater/Stonewater Property ID Folders",
folder_to_pull["name"],
property_folder["name"],
property_sub_folder["name"]
)
download_dir = os.path.join(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Wave 2.1 Surveys",
folder_to_pull["name"],
property_folder["name"],
property_sub_folder["name"]
)
download_dir = os.path.join(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Wave 2.1 Surveys - 2",
folder_to_pull["name"],
property_folder["name"],
property_sub_folder["name"]
)
# We download the folder
sharepoint_client.download_sharepoint_folder(
drive_id=sharepoint_client.document_drive["id"],
folder_path=property_folder_path,
download_dir=download_dir,
excluded_file_types=["MOV"]
)
# We download the folder
sharepoint_client.download_sharepoint_folder(
drive_id=sharepoint_client.document_drive["id"],
folder_path=property_folder_path,
download_dir=download_dir,
excluded_file_types=["MOV", "jpg"]
)

View file

@ -217,78 +217,7 @@ def app():
)
)
# We get the EPC data
# epc_data = json.loads(
# read_from_s3(
# bucket_name="retrofit-data-dev",
# s3_file_name="customers/Stonewater/clustering/epc_data.json"
# )
# )
# epc_data = pd.DataFrame(epc_data)
#
# epc_data["uprn"] = np.where(
# epc_data["internal_id"] == 1091,
# 83143766,
# epc_data["uprn"]
# )
#
# epc_data_batch_2 = read_pickle_from_s3(
# s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
# bucket_name="retrofit-data-dev"
# )
# epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
#
# complete_epcs = pd.concat([epc_data, epc_data_batch_2])
#
# epcs_to_merge = complete_epcs[
# [
# "uprn",
# "address",
# "postcode",
# "property-type",
# "built-form",
# "inspection-date",
# "current-energy-rating",
# "current-energy-efficiency",
# "roof-description",
# "walls-description",
# "transaction-type",
# "secondheat-description",
# "total-floor-area",
# "construction-age-band",
# "floor-height",
# "number-habitable-rooms",
# "mainheat-description",
# "energy-consumption-current"
# ]
# ].rename(
# columns={
# "address": "Address",
# "postcode": "Postcode",
# "inspection-date": "Date of last EPC",
# "current-energy-efficiency": "SAP score on register",
# "current-energy-rating": "EPC rating on register",
# "property-type": "Property Type",
# "built-form": "Archetype",
# "total-floor-area": "Property Floor Area",
# "construction-age-band": "Property Age Band",
# "floor-height": "Property Floor Height",
# "number-habitable-rooms": "Number of Habitable Rooms",
# "walls-description": "Wall Construction",
# "roof-description": "Roof Construction",
# "mainheat-description": "Heating Type",
# "secondheat-description": "Secondary Heating",
# "transaction-type": "Reason for last EPC",
# "energy-consumption-current": "Heat Demand (kWh/m2)",
# }
# )
# # We de-dupe, taking the newest on the date the EPC was lod
# epcs_to_merge["Date of last EPC"] = pd.to_datetime(epcs_to_merge["Date of last EPC"])
# epcs_to_merge = epcs_to_merge.sort_values("Date of last EPC", ascending=False)
# epcs_to_merge = epcs_to_merge.drop_duplicates(subset="uprn")
stonewater_cavity_properties["UPRN"] = stonewater_cavity_properties["UPRN"].astype("Int64").astype(str)
stonewater_cavity_properties["Reason Included"].value_counts()
# Find the postcodes where an Osmosis survey revealed a need for CWI
postcodes_found_needing_cwi = stonewater_cavity_properties[
stonewater_cavity_properties["Reason Included"].isin(
@ -339,12 +268,7 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
) # .merge(
# epcs_to_merge,
# how="left",
# left_on="UPRN",
# right_on="uprn"
# )
)
# We now flag the additional properties in the as built list
@ -434,20 +358,20 @@ def app():
additional_properties["Suspected Needs CWI - not surveyed"] = (
(
additional_properties["Postcode"].isin(postcodes_found_needing_cwi)
additional_properties["Postcode"].isin(postcodes_found_needing_cwi) &
~additional_properties["Installed under ECO3"]
)
)
additional_properties["Same Postcode as Installed under ECO3"].value_counts()
# We drop Full Address
additional_properties = additional_properties.drop(columns=["Full Address"])
additional_properties2 = additional_properties[[
"Address", "Postcode", "Address ID", "SAP", "SAP Band", "Property Type", "Walls", "Roofs", "Glazing",
"Heating", "Main Fuel", "Hot Water", "Renewables", "Total Floor Area", 'Installed under ECO3',
'Same Postcode as Installed under ECO3'
'Same Postcode as Installed under ECO3', "Organisation Reference",
]].rename(
columns={
"Organisation Reference": "Org. ref.",
"SAP": "Parity - Predicted SAP",
"SAP Band": "Parity - Predicted SAP Band",
"Age": "Parity - Build Age",
@ -461,65 +385,62 @@ def app():
"Renewables": "Parity - Renewables",
"Total Floor Area": "Parity - Total Floor Area"
}
) # .merge(
# pd.DataFrame(additional_properties_epcs)[
# [
# "row_id",
# "property-type",
# "built-form",
# "inspection-date",
# "current-energy-rating",
# "current-energy-efficiency",
# "roof-description",
# "walls-description",
# "transaction-type",
# "secondheat-description",
# "total-floor-area",
# "construction-age-band",
# "floor-height",
# "number-habitable-rooms",
# "mainheat-description",
# "energy-consumption-current"
# ]
# ].rename(
# columns={
# "inspection-date": "Date of last EPC",
# "current-energy-efficiency": "SAP score on register",
# "current-energy-rating": "EPC rating on register",
# "property-type": "Property Type",
# "built-form": "Archetype",
# "total-floor-area": "Property Floor Area",
# "construction-age-band": "Property Age Band",
# "floor-height": "Property Floor Height",
# "number-habitable-rooms": "Number of Habitable Rooms",
# "walls-description": "Wall Construction",
# "roof-description": "Roof Construction",
# "mainheat-description": "Heating Type",
# "secondheat-description": "Secondary Heating",
# "transaction-type": "Reason for last EPC",
# "energy-consumption-current": "Heat Demand (kWh/m2)",
# }
# ),
# how="left",
# on="row_id"
# )
)
# Combine the data:
stonewater_cavity_properties2 = stonewater_cavity_properties.merge(
features[["Address", "Organisation Reference"]], how="left", on="Organisation Reference"
)
full_dataset = pd.concat([stonewater_cavity_properties2, additional_properties2])
full_dataset = full_dataset.drop(columns=['Osm. ID'])
# We not define the priority list for non-intrusives
full_dataset["Postal Region"] = full_dataset["Postcode"].str.split(" ").str[0].str[0:2]
full_dataset["Postal Region 2"] = full_dataset["Postcode"].str.split(" ").str[0]
# Strip out anything we definitely don't want
full_dataset = full_dataset[~full_dataset["Installed under ECO3"]]
areas = full_dataset[full_dataset["Suspected Needs CWI - not surveyed"] == True]["Postal Region 2"].unique()
priorities = full_dataset[
full_dataset["Postal Region 2"].isin(areas)
]
region_prevalance = priorities["Postal Region 2"].value_counts().to_frame().reset_index()
region_prevalance = region_prevalance[region_prevalance["count"] > 100]
df = priorities[priorities["Postal Region 2"].isin(region_prevalance["Postal Region 2"].values)]
df["Postal Region"].value_counts()
df["Postal Region 2"].value_counts()
if df["Installed under ECO3"].sum():
raise ValueError("There are properties in the priority list that were installed under ECO3")
df.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/10022025 Non-Intrusives - "
"revised list.csv",
index=False
)
# We save the data locally
stonewater_cavity_properties.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties - priority "
"postcodes.csv",
index=False
)
additional_properties2.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties - "
"non-priority postcodes.csv",
index=False
)
# Save the survey findings
needs_cwi.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI - WIP.csv",
index=False
)
# stonewater_cavity_properties.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Cavity Properties - priority "
# "postcodes.csv",
# index=False
# )
# additional_properties2.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Additional Cavity Properties - "
# "non-priority postcodes.csv",
# index=False
# )
# # Save the survey findings
# needs_cwi.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI -
# WIP.csv",
# index=False
# )
def cross_reference_epc_programme():
@ -528,6 +449,12 @@ def cross_reference_epc_programme():
"SURVEYED - ECO3 NOT COMPLETED.xlsx"
)
for _, x in eco3_fallout.iterrows():
house_no = SearchEpc.get_house_number(x["ADDRESS"], "")
if house_no is None:
house_no = x["ADDRESS"].split(",")[0]
x["house_number"] = house_no
eco3_fallout["house_number"] = eco3_fallout.apply(
lambda x: SearchEpc.get_house_number(x["ADDRESS"], ""), axis=1
)
@ -558,3 +485,58 @@ def cross_reference_epc_programme():
stonewater_modelled_above_c["Address"].apply(lambda x: fuzz.ratio(x, property["ADDRESS"]) > 90)
]
match.head()
def finalise_list_for_non_intrusives():
non_intrusives_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/20250207 Stonewater "
"Non-Intrusives.xlsx"
)
# Remove anything installed under ECO3
non_intrusives_list = non_intrusives_list[~non_intrusives_list["Installed under ECO3"]]
# We make any properties that were surveyed by Osmosis
packages = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/Stonewater - Bid Packages WIP 14.11.20 V2 "
"(1).xlsx",
header=13,
sheet_name="Modelled Packages"
)
non_intrusives_list["Surveyed by Osmosis"] = non_intrusives_list["Address ID"].isin(
packages["Address ID"].values
)
# Removed 54 addresses
final_non_intrusives = non_intrusives_list[
~non_intrusives_list["Surveyed by Osmosis"]
]
features = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Osmosis Reviewed - Parity Download 18.7 - "
"master sheet.csv",
encoding='latin1'
)
# Add on the orgnisaion reference
final_non_intrusives = final_non_intrusives.merge(
features[["Organisation Reference", "Address ID"]],
how="left",
on="Address ID"
)
final_non_intrusives["Postal Region"] = final_non_intrusives["Postcode"].str.split(" ").str[0].str[0:2]
selected_regions = final_non_intrusives[
final_non_intrusives["Include in non-intrusives"]
]["Postcode"].unique()
final_non_intrusives["Is in region"] = final_non_intrusives["Postcode"].isin(selected_regions)
# Filter down:
final_non_intrusives = final_non_intrusives[
final_non_intrusives["Is in region"]
]
final_non_intrusives.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Non-intrusives/10022025 Non-Intrusives "
"List - final.xlsx")

View file

@ -72,12 +72,20 @@ class AssetListEpcData:
epc_searcher.find_property(skip_os=True)
if epc_searcher.newest_epc is None:
continue
find_epc_searcher = RetrieveFindMyEpc(
address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
# Attempt both methods:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=epc_searcher.newest_epc["address1"] + ", " + epc_searcher.newest_epc["address2"],
postcode=epc_searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except Exception as e:
logger.error(f"Error retrieving find my epc data: {e}")
find_epc_searcher = RetrieveFindMyEpc(
address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
time.sleep(0.5)
# We need uprn

View file

@ -25,6 +25,7 @@ class RetrieveFindMyEpc:
self.postcode = postcode
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
self.walls = []
@staticmethod
def extract_low_carbon_sources(soup):
@ -102,6 +103,8 @@ class RetrieveFindMyEpc:
# 2) Bills estimates
# 3) Recommendations and SAP points
# 4) Low and zero carbon energy sources
# 5) The wall types of the property - used for determining if we have an extension wall insulation#
# recommendation
ratings = address_res.find('desc', {'id': 'svg-desc'}).text
current_rating = ratings.split(".")[0]
@ -208,6 +211,17 @@ class RetrieveFindMyEpc:
if key not in assessment_data:
raise ValueError(f"Missing key: {key}")
# The wall types of the property
property_features_table = address_res.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
# Extract wall types
self.walls = []
for row in property_features_table:
cells = row.find_all("td")
if row.find("th").text.strip() == "Wall":
self.walls.append(cells[0].text.strip())
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
@ -229,8 +243,7 @@ class RetrieveFindMyEpc:
return resulting_data
@staticmethod
def format_recommendations(recommendations, assessment_data, sap_2012_date=None):
def format_recommendations(self, recommendations, assessment_data, sap_2012_date=None):
"""
This function converts the recommendations to a format that we can use in the engine as a non-intrusive survey
:param recommendations: The recommendations from the EPC
@ -317,7 +330,8 @@ class RetrieveFindMyEpc:
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Replacement warm air unit": [],
"Secondary glazing": ["secondary_glazing"]
"Secondary glazing": ["secondary_glazing"],
"Condensing heating unit": ["boiler_upgrade"],
}
survey = True
@ -330,6 +344,8 @@ class RetrieveFindMyEpc:
for rec in recommendations:
mapped = measure_map[rec["measure"]]
for measure in mapped:
if measure == "cavity_wall_insulation" and "solid brick" in self.walls[0].lower():
measure = "extension_cavity_wall_insulation"
to_append = {
"type": measure,
"sap_points": rec["sap_points"],

View file

@ -1,396 +0,0 @@
import os
import time
import pandas as pd
import numpy as np
from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map):
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
house_number = home[address1_column].strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home["row_id"])
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
"row_id": home["row_id"],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home["row_id"])
time.sleep(5)
return epc_data, errors, no_epc
def extract_address1(asset_list, full_address_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
raise ValueError(f"Method {method} not recognized")
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
Data request contents:
Date of last EPC
Reason for EPC
SAP score on register
Property Type
Property Area
Property Age
Any Dimensions (HLP,PW,RH)
Property Wall Construction
Heating Type
Secondary Heating
Loft Insulation Depth
Additional if possible:
Heat loss calculations
EPC recommendations
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Southern"
DATA_FILENAME = "January 2025 Additions Query.xlsx"
SHEET_NAME = "Jan 2025 additions"
POSTCODE_COLUMN = "Post Code"
FULLADDRESS_COLUMN = "Street / Block Name"
ADDRESS1_COLUMN = None
ADDRESS1_METHOD = "first_word"
ADDRESS_COLS_TO_CONCAT = []
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {
"Ardelagh Ardelagh Faris Lane Woodham Addlestone KT15 3DJ": 100061484560
}
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
asset_list["row_id"] = asset_list.index
# We clean up portential non-breaking spaces, and double spaces
for col in [c for c in [POSTCODE_COLUMN, FULLADDRESS_COLUMN, ADDRESS1_COLUMN] if c is not None]:
asset_list[col] = asset_list[col].astype(str)
asset_list[col] = asset_list[col].str.replace('\xa0', ' ', regex=False)
asset_list[col] = asset_list[col].str.replace(' ', ' ', regex=False)
if ADDRESS1_COLUMN is None:
ADDRESS1_COLUMN = "address1_extracted"
asset_list = extract_address1(
asset_list=asset_list, full_address_col=FULLADDRESS_COLUMN, method=ADDRESS1_METHOD
)
if FULLADDRESS_COLUMN is None:
FULLADDRESS_COLUMN = "fulladdress_extracted"
# We concatenate the columns in ADDRESS_COLS_TO_CONCAT, on commas
asset_list[FULLADDRESS_COLUMN] = asset_list[ADDRESS_COLS_TO_CONCAT].apply(lambda x: ", ".join(x), axis=1)
# We check for duplicated addresses
asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN]
if asset_list["deduper"].duplicated().sum():
# Drop the dupes
print(f"There are {asset_list['deduper'].duplicated().sum()} duplicated addresses - dropping")
asset_list = asset_list[~asset_list["deduper"].duplicated()]
asset_list = asset_list.drop(columns=["deduper"])
epc_data, errors, no_epc = get_data(
asset_list=asset_list,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# We now retrieve any failed properties
asset_list_failed = asset_list[asset_list["row_id"].isin(errors)]
epc_data_failed, _, _ = get_data(
asset_list=asset_list_failed,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
no_data = asset_list[asset_list["row_id"].isin(no_epc)]
print(no_data[[FULLADDRESS_COLUMN, POSTCODE_COLUMN]])
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
epc_df = pd.DataFrame(epc_data)
# We expand out the recommendations
recommendations_df = epc_df[["row_id", "recommendations"]]
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = ["row_id"] + list(unique_recommendations)
transformed_data = []
for _, row in recommendations_df.iterrows():
# Initialize a dictionary for this row with False for all recommendations
row_data = {col: False for col in columns}
row_data["row_id"] = row["row_id"]
# Set True for each recommendation present in this row
for rec in row["recommendations"]:
recommendation_text = rec["improvement-summary-text"]
row_data[recommendation_text] = True
# Append the row data to transformed_data
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
# Drop the column that is ""
if "" in transformed_df.columns:
transformed_df = transformed_df.drop(columns=[""])
# Get the find my epc data
find_my_epc_data = epc_df[["row_id", "find_my_epc_data"]].drop(columns=["find_my_epc_data"]).join(
pd.json_normalize(epc_df["find_my_epc_data"])
)
# We check if we get the solar pv column:
if "Solar photovoltaics" not in find_my_epc_data.columns:
find_my_epc_data["Solar photovoltaics"] = False
# Retrieve just the data we need
epc_df = epc_df[
[
"row_id",
"uprn",
"address1",
"address",
"postcode",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"floor-description",
"transaction-type",
# New fields needed
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description",
#
"energy-consumption-current", # kwh/m2
"photo-supply",
]
].rename(columns={"address1": "Address1 on EPC", "address": "Address on EPC", "postcode": "Postcode on EPC"})
asset_list = asset_list.merge(
epc_df,
how="left",
on="row_id"
).merge(
find_my_epc_data[
[
"row_id", "heating_text", "hot_water_text", 'Assessors name',
"Assessor's Telephone", "Assessor's Email", "Accreditation scheme",
"Assessors ID", "Solar photovoltaics"
]
].rename(
columns={
"Solar photovoltaics": "Has Solar PV",
"heating_text": "Heating Estimated kWh",
"hot_water_text": "Hot Water Estimated kWh",
}
),
how="left",
on="row_id"
)
asset_list["Has Solar PV"] = asset_list["Has Solar PV"] | ~asset_list["photo-supply"].isin(["0.0", 0, None, ""])
asset_list = asset_list.drop(columns=["photo-supply"])
# Rename the columns
asset_list = asset_list.rename(columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"floor-description": "Floor Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
"energy-consumption-current": "Heat Demand (kWh/m2)",
})
asset_list["Estimated Number of Floors"] = asset_list.apply(
lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull(
x["Property Type"]) else None, axis=1
)
asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float)
# Replace "" value with None
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None)
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float)
asset_list["Estimated Perimeter (m)"] = asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"],
num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"],
), axis=1
)
asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype"]
),
axis=1
)
asset_list["Roof Insulation Thickness"] = asset_list.apply(
lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull(
x["Roof Construction"]) else None,
axis=1
)
# For all of the columns in transformed_df, prefix with "Recommendation: "
for col in transformed_df.columns:
if col == "row_id":
continue
transformed_df = transformed_df.rename(columns={col: f"Recommendation: {col}"})
asset_list = asset_list.merge(
transformed_df,
how="left",
on="row_id"
)
asset_list = asset_list.drop(columns=["row_id", "index"])
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"
asset_list.to_excel(filename, index=False)
matches_review = asset_list[
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
]

View file

@ -852,6 +852,8 @@ class HeatingRecommender:
else:
heating_simulation_config["mainheat_energy_eff_ending"] = self.property.data["mainheat-energy-eff"]
# TODO:We possibly shouldn't touch the hot water energy efficiency if we aren't recommending dual immersion
# we'll keep this for the moment though
if self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor"]:
heating_simulation_config["hot_water_energy_eff_ending"] = "Average"
else:
@ -993,7 +995,7 @@ class HeatingRecommender:
# We check if there's a mains connection and the hot water is inefficient, as this will improve with a boiler
has_inefficient_water = (
self.property.data["mains-gas-flag"] and
self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor", "Average"]
self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor"]
)
non_invasive_recommendation = next((

View file

@ -503,7 +503,9 @@ class Recommendations:
impact_summary.append(
{
"phase": rec["phase"],
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
"sap": sap + rec["sap_points"],
"carbon": carbon - rec["co2_equivalent_savings"],
"heat_demand": heat_demand - rec["heat_demand"],
@ -621,6 +623,13 @@ class Recommendations:
if li_sap_limit is not None:
property_phase_impact["sap"] = min(property_phase_impact["sap"], li_sap_limit)
if rec["type"] == "solar_pv":
# We use the SAP points in the recommendation as a minimum
property_phase_impact["sap"] = (
rec["sap_points"] if property_phase_impact["sap"] < rec["sap_points"] else
property_phase_impact["sap"]
)
# Insert this information into the recommendation.
if not rec.get("survey", False):
rec["sap_points"] = property_phase_impact["sap"]
@ -647,7 +656,9 @@ class Recommendations:
return property_recommendations, impact_summary
@staticmethod
def map_descriptions_to_fuel(heating_description, hotwater_description, main_fuel_description):
def map_descriptions_to_fuel(
heating_description, hotwater_description, main_fuel_description, descriptions_to_fuel_types
):
# Handle the case of community schemes
if (heating_description == "Community scheme") or (hotwater_description == "Community scheme"):
@ -660,7 +671,7 @@ class Recommendations:
}
raise NotImplementedError("Handle this case")
mapped = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[heating_description]
mapped = descriptions_to_fuel_types[heating_description]
heating_fuel = mapped["fuel"]
if hotwater_description in [
@ -680,7 +691,7 @@ class Recommendations:
"heating_cop": mapped["cop"], "hotwater_cop": 1
}
mapped_hotwater = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[hotwater_description]
mapped_hotwater = descriptions_to_fuel_types[hotwater_description]
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": mapped_hotwater["fuel"],
@ -689,7 +700,7 @@ class Recommendations:
@classmethod
def calculate_recommendation_tenant_savings(
cls, property_instance, kwh_simulation_predictions, property_recommendations
cls, property_instance, kwh_simulation_predictions, property_recommendations, ashp_cop=None
):
"""
This method inserts the kwh savings and the bill savings that the customer will make from the recommendations
@ -701,9 +712,12 @@ class Recommendations:
:param property_instance: Instance of the Property class, for the home associated to property_id
:param kwh_simulation_predictions: dictionary of predictions from the model apis
:param property_recommendations: dictionary of recommendations for the property
:param ashp_cop: The coefficient of performance for the air source heat pump.
:return:
"""
ashp_cop = ashp_cop if ashp_cop else assumptions.AVERAGE_ASHP_EFFICIENCY
kwh_impact_table = kwh_simulation_predictions["heating_kwh_predictions"][
kwh_simulation_predictions["heating_kwh_predictions"]["property_id"] == str(property_instance.id)
].merge(
@ -772,12 +786,19 @@ class Recommendations:
if kwh_impact_table.loc[i, col] > previous_phase[col].max():
kwh_impact_table.loc[i, col] = previous_phase[col].max()
descriptions_to_fuel_types = assumptions.DESCRIPTIONS_TO_FUEL_TYPES
# We will the air source heat pump efficiencies
ashp_keys = [k for k in descriptions_to_fuel_types.keys() if "air source heat pump" in k.lower()]
for k in ashp_keys:
descriptions_to_fuel_types[k]["cop"] = ashp_cop
# For heating system recommendations, this could result in a fuel type change so we reflect that
fuel_mapping = pd.DataFrame([
{
"id": epc["id"],
**cls.map_descriptions_to_fuel(
epc["mainheat-description"], epc["hotwater-description"], epc["main-fuel"]
epc["mainheat-description"], epc["hotwater-description"], epc["main-fuel"],
descriptions_to_fuel_types
)
} for epc in property_instance.updated_simulation_epcs
])
@ -791,7 +812,8 @@ class Recommendations:
**cls.map_descriptions_to_fuel(
property_instance.data["mainheat-description"],
property_instance.data["hotwater-description"],
property_instance.data["main-fuel"]
property_instance.data["main-fuel"],
descriptions_to_fuel_types
)
}
]

View file

@ -14,11 +14,16 @@ class SolarPvRecommendations:
# This was previously set to 250w, but has been upped to 400 based on the systems used by Cotswolrd Energy Group
SOLAR_PANEL_WATTAGE = 400
# For domestic properties, we don't recommend a solar PV system with wattage outside of these
# bounds
MAX_SYSTEM_WATTAGE = 6000
MIN_SYSTEM_WATTAGE = 1000
# the maximum area of root we allow to be covered in solar panels for our recommendations.
MAX_ROOF_AREA_PERCENTAGE = 0.7
SAP_POINTS_PER_5_PERCENT_ROOF_COVERAGE = 1
def __init__(self, property_instance):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
@ -212,6 +217,20 @@ class SolarPvRecommendations:
roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / roof_area * 100)
# We round up to the nearest 5
roof_coverage_percent = np.ceil(roof_coverage_percent / 5) * 5
# Typically, we've observed that every 5% of additional roof coverage will result in at least
# an additional 1 SAP points (though often 2 points) Given this, we can add a reasonable minimum
# for the number of SAP points we might expect. We've observed that for some cases where properties
# are hitting the higher SAP scores (e.g. EPC A and above), the model can sometimes under-predict
# the number of SAP points. This appears to be due to a relatively small number of properties
# actually achieving the upper echelons of EPC rating. This can be the case if we're simulating a
# whole house retrofit where the home is getting complete insulation, a heat pump and solar panels.
# Because panels are the final recommendation, they are often the measure that takes the home
# into the medium to high EPC A ranges and so because of a lack of training data, this means that
# we might sometime under-predict. This minimum is intended to try and reduce the negative impact
# of this. This minimum is used in Recommendations.calculate_recommendation_impact
minimum_sap_points = (roof_coverage_percent / 5) * self.SAP_POINTS_PER_5_PERCENT_ROOF_COVERAGE
for has_battery in [False, True]:
cost_result = self.costs.solar_pv(
has_battery=has_battery,
@ -240,7 +259,7 @@ class SolarPvRecommendations:
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"sap_points": minimum_sap_points,
"already_installed": already_installed,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we

View file

@ -215,21 +215,29 @@ class WindowsRecommendations:
"glazed-type": glazed_type_ending,
}
measure_type = "double_glazing" if not is_secondary_glazing else "secondary_glazing"
non_invasive_recommendation = next(
(r for r in self.property.non_invasive_recommendations if r["type"] in ["windows_glazing", measure_type]),
{}
)
self.recommendation = [
{
"phase": phase,
"parts": [],
"type": "windows_glazing",
"measure_type": "double_glazing" if not is_secondary_glazing else "secondary_glazing",
"measure_type": measure_type,
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"sap_points": non_invasive_recommendation.get("sap_points", None),
"already_installed": already_installed,
**cost_result,
"is_secondary_glazing": is_secondary_glazing,
"description_simulation": description_simulation,
"simulation_config": simulation_config,
"survey": non_invasive_recommendation.get("survey", None),
}
]

View file

@ -205,7 +205,7 @@ def get_wall_u_value(
mapped_value = wall_uvalues_df[
wall_uvalues_df["Wall_type"] == mapped_description
][age_band].values[0]
][age_band].values[0]
if pd.isnull(mapped_value) and "Park home" in mapped_description:
# We don't know enough in this case so we default to 0
@ -428,6 +428,9 @@ def estimate_number_of_floors(property_type):
Using the property type, we estimate the number of floors in the property
"""
if property_type is None:
return None
if property_type == "House":
number_of_floors = 2
elif property_type in ["Flat", "Bungalow"]:
@ -560,7 +563,7 @@ def get_floor_u_value(
insulation_lookup = s11[
s11["Age_band"].str.contains(age_band) & s11["Floor_construction"]
== floor_type
]
]
if insulation_lookup.empty:
insulation_thickness = 0
else:

270
survey_report/app.py Normal file
View file

@ -0,0 +1,270 @@
import os
import requests
import PyPDF2
from string import Template
import pandas as pd
from survey_report.extraction.detect_report_type import detect_report_type
from survey_report.extraction.quidos import SiteNotesExtractor, EPRExtractor
def generate_html_report(template_path, output_path, data):
"""
Reads an HTML template file, injects dynamic values, and generates a final HTML report.
Args:
- template_path (str): Path to the HTML template file.
- output_path (str): Path to save the generated HTML file.
- data (dict): Dictionary containing dynamic values for the report.
"""
# Read the template file
with open(template_path, "r", encoding="utf-8") as f:
html_template = Template(f.read()) # Use Template from string module
# Replace placeholders with actual data
final_html = html_template.safe_substitute(data) # Use safe_substitute to prevent missing key errors
# Save the generated HTML file
with open(output_path, "w", encoding="utf-8") as f:
f.write(final_html)
print(f"HTML report generated successfully: {output_path}")
def stringify_number(num: int, rounding: bool = True) -> str:
if num < 100000: # 5 figures or fewer
rounded_num = ((num + 99) // 100) * 100 if rounding else num
return f"{rounded_num:,}"
else: # More than 5 figures
rounded_num = ((num + 999) // 1000) * 1000 if rounding else num
return f"{rounded_num // 1000}k"
class PlacidApi:
# Errors as defined by docs: https://placid.app/docs/2.0/rest/errors
ERROR_CODES = {
400: "Bad request",
401: "Unauthorized",
404: "Template Not found",
422: "Validation error",
429: "Rate limit exceeded",
500: "Internal server error",
}
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def create_pdf(
self,
template_uuid: str,
current_epc_rating: str,
current_epc_rating_colour: str,
post_retrofit_epc_rating: str,
post_retrofit_epc_rating_colour: str,
):
url = "https://api.placid.app/api/rest/pdfs"
body = {
"webhook_success": None,
"passthrough": None,
"pages": [
{
"template_uuid": template_uuid,
"layers": {
"current_epc_rating": {
"text": current_epc_rating,
"text_color": current_epc_rating_colour,
},
"post_retrofit_epc_rating": {
"text": post_retrofit_epc_rating,
"text_color": post_retrofit_epc_rating_colour,
}
},
},
]
}
response = requests.post(
url,
headers=self.headers,
json=body
)
response_body = response.json()
return response_body
def get_pdf(self, pdf_id: str):
"""
Poll the API every 5 seconds until the PDF is ready
"""
url = f"https://api.placid.app/api/rest/pdfs/{pdf_id}"
response = requests.get(
url,
headers=self.headers
)
response_body = response.json()
url = response_body["pdf_url"]
# Download the PDF form this uurl
pdf_download = requests.get(url)
with open("survey_report/example_data/output.pdf", "wb") as f:
f.write(pdf_download.content)
def handler():
"""
Performs the data extraction process for the survey report
:return:
"""
PLACID_API_KEY = "placid-mpkwidzer2mens9h-hifa3dmbxpfeghpa"
TEMPLATE_UUID = "5bst9mh1q9lk9"
placid_api = PlacidApi(PLACID_API_KEY)
current_property_value = 250000 # Needs to be an input
EPC_COLOURS = {
"A": "#117d58",
"B": "#2da55c",
"C": "#8dbd40",
"D": "#f7cd14",
"E": "#f3a96a",
"F": "#ef8026",
"G": "#e41e3b",
}
folders = [
{
"site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 1/3 "
"WILLIS ROAD FLAT 1 PRE EPR SITE NOTES.pdf",
"epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 1/3 WILLIS "
"ROAD FLAT 1 PRE EPR PDF.pdf",
"scenario_site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data"
"/Flat 1/3 WILLIS ROAD FLAT 1 POST EPR SITE NOTES.pdf"
},
{
"site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 2/3 "
"WILLIS ROAD FLAT 2 PRE EPR SITE NOTES.pdf",
"epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 2/3 WILLIS "
"ROAD FLAT 2 PRE EPR PDF.pdf",
"scenario_site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data"
"/Flat 2/3 WILLIS ROAD FLAT 2 POST EPR SITE NOTES.pdf"
},
{
"site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 3/3 "
"WILLIS ROAD FLAT 3 PRE EPR SITE NOTES.pdf",
"epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 3/3 WILLIS "
"ROAD FLAT 3 PRE EPR PDF.pdf",
"scenario_site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data"
"/Flat 3/3 WILLIS ROAD FLAT 3 POST EPR SITE NOTES.pdf"
},
]
data = []
for data_config in folders:
file_mapping = {}
for filename, filepath in data_config.items():
with (open(filepath, "rb") as f):
pdf = PyPDF2.PdfReader(f)
first_page = pdf.pages[0].extract_text()
text = ""
for page in pdf.pages:
text += page.extract_text()
# Check the report type
report_type = detect_report_type(first_page)
if report_type is not None:
file_mapping[filename] = text
# This is only set up to work with quido site notes so we must have it
site_notes_extractor = SiteNotesExtractor(file_mapping["site_notes"])
site_notes = site_notes_extractor.extract_all()
# We also must have an EPR
epr_extractor = EPRExtractor(file_mapping["epr"])
epr = epr_extractor.extract_all()
# Valuation simulation
scenario_site_notes_extractor = SiteNotesExtractor(file_mapping["scenario_site_notes"])
scenario_site_notes = scenario_site_notes_extractor.extract_all()
from backend.ml_models.Valuation import PropertyValuation
valuation_uplift = PropertyValuation.estimate_valuation_improvement(
current_value=current_property_value,
current_epc=site_notes["Current EPC Band"],
target_epc=scenario_site_notes["Current EPC Band"],
)
# TODO - should convert this, when it's more than 5 figures and we should certainly stringify this
valuation_difference = round(valuation_uplift["average_increased_value"] - current_property_value)
# Prepare the data for output
bill_savings = round(
site_notes['Estimated Annual Energy Cost (£)'] - scenario_site_notes['Estimated Annual Energy Cost (£)']
)
carbon_savings = round(
site_notes["Current Carbon Emissions (TCO2)"] - scenario_site_notes["Current Carbon Emissions (TCO2)"],
2
)
payback_period = None
if payback_period is None:
raise NotImplementedError("Implement me")
# We extract the measures from the site notes
report_data = {
"current_epc_rating": site_notes["Current EPC Band"],
"current_epc_rating_colour": EPC_COLOURS[site_notes["Current EPC Band"]],
"post_retrofit_epc_rating": scenario_site_notes["Current EPC Band"],
"post_retrofit_epc_rating_colour": EPC_COLOURS[scenario_site_notes["Current EPC Band"]],
"bill_savings": stringify_number(bill_savings),
"valuation_improvement": stringify_number(valuation_difference),
"carbon_savings": carbon_savings,
}
# We now produce the combined data sheet which is the starting figure:
# data_sheet = {**epr, **site_notes}
# del data_sheet['Building Dimensions']
# # We unnest the Total Building Dimensions
# data_sheet["Total Building Floor Area (m2)"] = data_sheet["Total Building Dimensions"]["floor_area"]
# data_sheet["Total Building Heat Loss Area (m2)"] = data_sheet["Total Building Dimensions"]["heat_loss_area"]
# del data_sheet["Total Building Dimensions"]
create_pdf_response = placid_api.create_pdf(
template_uuid=TEMPLATE_UUID, **report_data
)
# {'id': 769832, 'type': 'pdf', 'status': 'queued', 'pdf_url': None, 'transfer_url': None, 'passthrough': None}
# Download locally
placid_api.get_pdf(create_pdf_response["id"])
data = pd.DataFrame(data)
# Generate the HTML report
# Placeholder locations
template_path = "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/template.html"
output_path = "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/output/report.html"
logo_path = "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/assets/logo.png"
generate_html_report(
template_path, output_path,
data={
"address": data_sheet["Address"],
"logo_path": logo_path,
"current_epc": data_sheet["Current EPC Band"],
"current_sap": data_sheet["Current SAP Rating"],
"potential_epc": "A", # TODO PLACEHOLDER
"potential_sap": 91, # TODO PLACEHOLDER
}
)

View file

@ -0,0 +1,22 @@
import re
def detect_report_type(first_page):
"""
Detects the type of report based on the first page of the report
:param first_page:
:return:
"""
# Set up for the minute to handle quidos files. We have the Elmhurst logic so we can introduce
# this when we need
if re.match(
r"^Created \d{2}/\d{2}/\d{4} for Quidos Ltd using Argyle software BRE approved calculator",
first_page
):
return "quidos_site_notes"
if re.search(r"\nIQ-Energy\nEnergy Performance Report\nPage 1 of 1", first_page):
return "quidos_epr"
return None

View file

@ -0,0 +1,256 @@
import re
class SiteNotesExtractor:
"""
Extracts SAP rating, carbon emissions, and building dimensions from an EPC summary report.
"""
def __init__(self, pdf_text):
"""
Initializes the SiteNotesExtractor with the extracted PDF text.
"""
self.text = pdf_text
self.data = {}
def extract_sap_rating(self):
"""
Extracts the current and potential SAP rating from the report.
"""
pattern = re.search(r"Current SAP rating\s*([A-G])\s*(\d+)\s*Potential SAP rating\s*([A-G])\s*(\d+)", self.text)
if not pattern:
raise ValueError("No SAP rating found in the report")
self.data.update({
"Current EPC Band": pattern.group(1),
"Current SAP Rating": int(pattern.group(2)),
"Potential EPC Band": pattern.group(3),
"Potential SAP Rating": int(pattern.group(4)),
})
def extract_carbon_emissions(self):
"""
Extracts the current and adjusted annual carbon emissions (TCO2).
"""
pattern = re.search(r"Current annual emissions\s*([\d.]+)\s*\(TCO2\)", self.text)
if not pattern:
raise ValueError("No carbon emissions found in the report")
self.data.update({
"Current Carbon Emissions (TCO2)": float(pattern.group(1)),
})
def extract_building_dimensions(self):
"""
Extracts dimensions for each building part and stores them in a list.
Handles Main Property and multiple extensions.
"""
# Locate the Dimensions section
dimensions_section = re.search(
r"Dimension Type (?:internal|external)\nPart Floor Area \(m2\) Room Height \(m\) Loss Perimeter \(m\) "
r"Party Wall "
r"Length \(m\)\n"
r"(.*?)\n5\.0 Conservatory", self.text, re.DOTALL
)
if not dimensions_section:
raise ValueError("Failed to locate the dimensions section in the text.")
dimensions_text = dimensions_section.group(1)
# Pattern to match each building part (Main Property, Extension 1, Extension 2, etc.)
building_part_pattern = re.compile(
r"(Main Property|Extension \d+)\s*(?:Property)?\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
building_parts = []
for match in building_part_pattern.finditer(dimensions_text):
to_append = {
"Building Part": match.group(1).strip(),
"Part Floor Area (m2)": float(match.group(2)),
"Room Height (m)": float(match.group(3)),
"Loss Perimeter (m)": float(match.group(4)),
"Party Wall Length (m)": float(match.group(5)),
}
# We calculate the heat loss area
to_append["Heat Loss Area (m2)"] = to_append["Loss Perimeter (m)"] * to_append["Room Height (m)"]
building_parts.append(to_append)
if not building_parts:
raise ValueError("No building dimensions found in the report")
self.data["Building Dimensions"] = building_parts
# We calculate some totals
self.data["Total Building Dimensions"] = {
"floor_area": sum([part["Part Floor Area (m2)"] for part in building_parts]),
"heat_loss_area": sum([part["Heat Loss Area (m2)"] for part in building_parts]),
}
def extract_bills_estimate(self):
"""
Extracts the estimated annual energy costs (£) from the report.
"""
pattern = re.search(r"Current annual energy costs £\s*([\d,.]+)", self.text)
if not pattern:
raise ValueError("No bills estimate found in the report")
self.data["Estimated Annual Energy Cost (£)"] = float(pattern.group(1).replace(",", ""))
def extract_all(self):
"""
Runs all extraction methods and returns a dictionary with extracted data.
"""
self.extract_sap_rating()
self.extract_carbon_emissions()
self.extract_bills_estimate()
self.extract_building_dimensions()
# Extract specific measures
# Primary wall
# Secondary wall
# Roof
# Floor
# Heating system
# Hot water system
# Windows
# Doors
# Lighting
# Ventilation
# Solar
return self.data
def extract_walls(self):
"""
Extracts wall type, insulation, dry-lining, and thickness for each building part,
including any alternative wall details within the 7.0 Walls section of the summary PDF text.
"""
text = self.text
wall_data = []
# Isolate the 7.0 Walls section
wall_section_match = re.search(r"7\.0 Walls\n(.*?)\n8\.0 Roofs", text, re.DOTALL)
if not wall_section_match:
raise ValueError("Failed to locate the walls section in the text.")
wall_section = wall_section_match.group(1)
# Define patterns to match walls for each building part
wall_pattern = re.compile(
r"(?P<section>Main Property(?: Alternative)?|Extension \d+)\s*\n"
r"(?:Construction\s*(?P<construction>[^\n]*)\n)?"
r"(?:Insulation\s*(?P<insulation>[^\n]*)\n)?"
r"(?:Insulation Thickness\(mm\)\s*(?P<insulation_thickness>[^\n]*)\n)?"
r"(?:Wall Thickness Measured\?\s*(?P<thickness_measured>[^\n]*)\n)?"
r"(?:Wall Thickness\(mm\)\s*(?P<thickness>\d+))?",
re.MULTILINE
)
# TODO: We aren't effectively picking up alternative walls
# alt_wall_pattern = re.compile(
# r"Alternative Wall Sheltered\s*.*?\n"
# r".*?Construction\s*(?P<alt_construction>[^\n]*)\n"
# r"Insulation\s*(?P<alt_insulation>[^\n]*)\n"
# r"Insulation Thickness\(mm\)\s*(?P<alt_insulation_thickness>[^\n]*)\n"
# r"Wall Thickness Measured\?\s*(?P<alt_thickness_measured>[^\n]*)\n"
# r"Wall Thickness\(mm\)\s*(?P<alt_thickness>\d+)?",
# re.MULTILINE
# )
for match in wall_pattern.finditer(wall_section):
building_part = match.group("section")
# has_alternative_wall = "Alternative" in building_part
building_part = "Main Property" if "Main Property" in building_part else building_part
wall_entry = {
"Building Part": building_part,
"Wall Type": match.group("construction") or "Unknown",
"Wall Insulation": match.group("insulation") or "Unknown",
"Insulation Thickness (mm)": match.group("insulation_thickness") or "Unknown",
"Wall Thickness Measured": match.group("thickness_measured") or "Unknown",
"Wall Thickness (mm)": int(match.group("thickness")) if match.group("thickness") and match.group(
"thickness").isdigit() else None,
"Alternative Wall Type": None,
"Alternative Wall Insulation": None,
"Alternative Insulation Thickness (mm)": None,
"Alternative Wall Thickness Measured": None,
"Alternative Wall Thickness (mm)": None,
}
# Check if an alternative wall section exists
# if has_alternative_wall:
# alt_match = alt_wall_pattern.search(wall_section, match.end())
# if alt_match:
# wall_entry["Alternative Wall Type"] = alt_match.group("alt_construction") or "Unknown"
# wall_entry["Alternative Wall Insulation"] = alt_match.group("alt_insulation") or "Unknown"
# wall_entry["Alternative Insulation Thickness (mm)"] = alt_match.group(
# "alt_insulation_thickness") or "Unknown"
# wall_entry["Alternative Wall Thickness Measured"] = alt_match.group(
# "alt_thickness_measured") or "Unknown"
# wall_entry["Alternative Wall Thickness (mm)"] = int(
# alt_match.group("alt_thickness")) if alt_match.group("alt_thickness") and alt_match.group(
# "alt_thickness").isdigit() else None
wall_data.append(wall_entry)
return wall_data
class EPRExtractor:
"""
Extracts space heating, water heating, and address from an Energy Performance Report (EPR).
"""
def __init__(self, pdf_text):
"""
Initializes the EPRExtractor with the extracted PDF text.
"""
self.text = pdf_text
self.data = {}
def extract_heating_consumption(self):
"""
Extracts space heating and water heating values from the report.
"""
pattern = re.search(
r"Space Heating\(KWH\)\s*([\d,]+).*?\nWater Heating\(KWH\)\s*([\d,]+)",
self.text,
re.DOTALL
)
if not pattern:
raise ValueError("No heating data found in the report")
self.data.update({
"Space Heating (KWH)": int(pattern.group(1).replace(",", "")),
"Water Heating (KWH)": int(pattern.group(2).replace(",", ""))
})
def extract_address(self):
"""
Extracts the full address from the report.
"""
pattern = re.search(
r"Address\s*(.*?)\nTown\s*(.*?)\n",
self.text,
re.DOTALL
)
if not pattern:
raise ValueError("No address found in the report")
full_address = pattern.group(1).strip()
self.data["Address"] = full_address
def extract_all(self):
"""
Runs all extraction methods and returns a dictionary with extracted data.
"""
self.extract_address()
self.extract_heating_consumption()
return self.data

123
survey_report/template.html Normal file
View file

@ -0,0 +1,123 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Domna Energy Report</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #ffffff;
color: #333;
margin: 0;
padding: 0;
display: flex;
justify-content: center;
}
.container {
width: 100%;
max-width: 1300px;
margin: 20px auto;
}
.header {
background-color: #1B1F3B;
color: white;
padding: 30px;
display: flex;
justify-content: space-between;
align-items: center;
border-radius: 12px;
}
.header h1 {
margin: 5;
font-size: 24px;
}
.header p {
margin: 5px 0 0;
font-size: 16px;
color: #d1d5db;
}
.logo img {
height: 60px;
}
/* EPC Rating Cards */
.epc-container {
display: flex;
justify-content: space-between;
gap: 20px;
margin-top: 30px;
}
.epc-card {
background-color: white;
border: 2px solid #ccc;
border-radius: 10px;
padding: 20px;
flex: 1;
display: flex;
flex-direction: column;
justify-content: space-between; /* Pushes SAP to bottom */
align-items: center;
text-align: center;
box-shadow: 2px 2px 10px rgba(0, 0, 0, 0.1);
position: relative;
height: 160px;
}
.epc-title {
font-size: 18px;
font-weight: bold;
color: #666;
}
.epc-rating {
font-size: 50px;
font-weight: bold;
}
.sap-rating {
font-size: 18px;
color: #555;
position: absolute;
bottom: 10px;
right: 20px;
}
.before .epc-rating {
color: #1B1F3B; /* Medium Blue */
}
.after .epc-rating {
color: #D4AF37; /* Gold */
}
</style>
</head>
<body>
<div class="container">
<!-- Header Section -->
<div class="header">
<div>
<h1>Domna Energy Report</h1>
<p>${address}</p> <!-- Address Placeholder -->
</div>
<div class="logo">
<img src="${logo_path}" alt="Domna Logo">
</div>
</div>
<!-- EPC Rating Cards -->
<div class="epc-container">
<div class="epc-card before">
<div class="epc-title">Current EPC Rating</div>
<div class="epc-rating">${current_epc}</div>
<div class="sap-rating">SAP ${current_sap}</div>
</div>
<div class="epc-card after">
<div class="epc-title">Potential EPC Rating</div>
<div class="epc-rating">${potential_epc}</div>
<div class="sap-rating">SAP ${potential_sap}</div>
</div>
</div>
</div>
</body>
</html>