working on sfr updates for principal

This commit is contained in:
Khalim Conn-Kowlessar 2025-06-25 14:08:22 +01:00
parent 49b1baa4a9
commit e7eb9b7aed
12 changed files with 730 additions and 26 deletions

View file

@ -1104,7 +1104,7 @@ class AssetList:
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
floor_height=(
float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if
x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5
not pd.isnull(x[self.EPC_API_DATA_NAMES["floor-height"]]) else 2.5
),
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
built_form=x[self.EPC_API_DATA_NAMES["built-form"]]

View file

@ -63,6 +63,7 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
'Room heaters, electric, Boiler and radiators, mains gas': {"fuel": "Natural Gas", "cop": 0.85},
'Boiler and radiators, mains gas, Boiler and radiators, mains gas': {"fuel": "Natural Gas", "cop": 0.85},
'Room heaters, electric, Electric storage heaters': {"fuel": "Electricity", "cop": 1},
"Boiler and radiators, mains gas, Electric storage heaters": {"fuel": "Natural Gas", "cop": 0.85},
}
# These are the measure types where if there is a ventilation recommendation, we force the inclusion of it

View file

@ -0,0 +1,144 @@
import os
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from etl.find_my_epc.AssetListEpcData import AssetListEpcData
from backend.Funding import Funding
from backend.app.utils import sap_to_epc
from recommendations.recommendation_utils import estimate_external_wall_area
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
abs_matrix = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/ECO4 Full Project Scores Matrix.csv"
)
pps_matrix = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/ECO4 Partial Project Scores Matrix v5.xlsx",
header=1
)
pps_matrix.columns = [c.strip() for c in pps_matrix.columns]
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/Solid Wall Properties - Standardised_2.xlsx",
sheet_name="Standardised Asset List"
)
asset_list = asset_list.rename(
columns={"domna_address_1": "address", "domna_postcode": "postcode"}
)
asset_list["address"] = asset_list["address"].astype(str)
# Pull the find my EPC data and get the SAP points for solid wall
asset_list_epc_client = AssetListEpcData(
asset_list=asset_list,
epc_auth_token=EPC_AUTH_TOKEN
)
asset_list_epc_client.get_data()
asset_list_epc_client.get_non_invasive_recommendations()
# We pull out solid wall insulation
solid_wall_sap_points = []
for r in asset_list_epc_client.non_invasive_recommendations:
solid_recommendations = [
x for x in r["recommendations"] if ("internal_wall_insulation" in x["type"]) or (
"external_wall_insulation" in x["type"]
)
]
if solid_recommendations:
solid_recommendations = solid_recommendations[0]
else:
continue
address = r["address"]
postcode = r["postcode"]
solid_wall_sap_points.append(
{
"address": address,
"postcode": postcode,
"sap_points": solid_recommendations["sap_points"]
}
)
solid_wall_sap_points = pd.DataFrame(solid_wall_sap_points)
avg_points = solid_wall_sap_points["sap_points"].median()
asset_list = asset_list.merge(solid_wall_sap_points, how="left", on=["address", "postcode"])
asset_list["sap_points"] = asset_list["sap_points"].fillna(avg_points)
asset_list["post_works_sap"] = asset_list["epc_sap_score_on_register"] + asset_list["sap_points"]
asset_list["post_works_epc"] = asset_list["post_works_sap"].apply(lambda x: sap_to_epc(x))
asset_list["starting_half_band"] = asset_list["epc_sap_score_on_register"].apply(lambda x: Funding.get_sap_band(x))
asset_list["ending_half_band"] = asset_list["post_works_sap"].apply(lambda x: Funding.get_sap_band(x))
asset_list["floor_area_band"] = asset_list["epc_total_floor_area"].apply(lambda x: Funding.get_floor_area_band(x))
asset_list["funding_scheme"] = np.where(
(
(asset_list["post_works_epc"] == asset_list["epc_rating_on_register"])
),
"GBIS",
"ECO4"
)
# Merge on the ABS matrix
asset_list = asset_list.merge(
abs_matrix, how="left", left_on=["starting_half_band", "ending_half_band", "floor_area_band"],
right_on=['Starting Band', 'Finishing Band', 'Floor Area Segment', ]
)
asset_list = asset_list.drop(columns=['Starting Band', 'Finishing Band', 'Floor Area Segment'])
# store for backup
# asset_list.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/Solid Wall Properties -
# Standardised_2_with_funding.csv",
# index=False
# )
# For GBIS, we use the PPS
# Almost all properties are gas
# Using IWI solid 1.7 -> 0.3 rates
pps_matrix = pps_matrix[
pps_matrix["Measure_Type"].isin(["IWI_solid_1.7_0.3"])
]
# Merge on
asset_list = asset_list.merge(
pps_matrix[['Starting Band', 'Total Floor Area Band', 'Cost Savings']].rename(
columns={
"Cost Savings": "partial_project_score",
"Starting Band": "starting_half_band",
"Total Floor Area Band": "floor_area_band"
}
),
how="left",
on=["starting_half_band", "floor_area_band"],
)
asset_list["partial_project_score"] = np.where(
asset_list["starting_half_band"].isin(["Low_C", "High_C"]),
None,
asset_list["partial_project_score"]
)
asset_list["funding_abs"] = np.where(
asset_list["funding_scheme"] == "GBIS",
asset_list["partial_project_score"],
asset_list["Cost Savings"]
)
asset_list["heat_loss_area"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["attribute_est_number_floors"],
floor_height=(
float(x["epc_floor_height"]) if
not pd.isnull(x["epc_floor_height"]) else 2.5
),
perimeter=x["attribute_est_perimter"],
built_form=x["epc_archetype"]
),
axis=1
)
filename = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/20250624 ACIS solid wall - standardised.xlsx"
with pd.ExcelWriter(filename) as writer:
asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False)

View file

@ -27,6 +27,7 @@ class AssetListEpcData:
self.extracted_data = None
self.non_invasive_recommendations = None
self.patches = None
self.epc_data = None
@staticmethod
def check_asset_list(asset_list):
@ -74,7 +75,9 @@ class AssetListEpcData:
# Pull the additional data
extracted_data = []
epc_data = []
for _, home in tqdm(self.asset_list.iterrows(), total=len(self.asset_list)):
add1 = home["address"]
pc = home["postcode"]
# Retrieve the EPC data
@ -92,9 +95,6 @@ class AssetListEpcData:
if epc_searcher.newest_epc is None:
continue
if not pd.isnull(home.get("patch")):
epc_searcher.newest_epc["address1"] = add1
# Attempt both methods:
try:
find_epc_searcher = RetrieveFindMyEpc(
@ -104,6 +104,8 @@ class AssetListEpcData:
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except Exception as e:
logger.error(f"Error retrieving find my epc data: {e}")
if not pd.isnull(home.get("patch")):
epc_searcher.newest_epc["address1"] = add1
find_epc_searcher = RetrieveFindMyEpc(
address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"]
@ -113,7 +115,7 @@ class AssetListEpcData:
# We need uprn
to_append = {
"uprn": home.get("uprn"),
"uprn": home.get("uprn", epc_searcher.newest_epc["uprn"]),
"address": home["address"],
"postcode": home["postcode"],
**find_epc_data,
@ -128,6 +130,8 @@ class AssetListEpcData:
}
extracted_data.append(to_append)
epc_data.append(epc_searcher.newest_epc)
self.extracted_data = extracted_data
self.epc_data = epc_data
logger.info("Data Extrction complete")

View file

@ -1,3 +1,4 @@
import time
import re
import pandas as pd
import requests
@ -125,6 +126,243 @@ class RetrieveFindMyEpc:
return results
def _extract_epc_from_soup(self, soup, epc_certificate, sap_2012_date=None):
ratings = soup.find('desc', {'id': 'svg-desc'}).text
current_rating = ratings.split(".")[0]
potential_rating = ratings.split(".")[1]
current_sap = int(current_rating.split(' ')[-1])
# Retrieve the energy consumption
bills = soup.find('div', {'id': 'bills-affected'})
bills_list = bills.find_all('li')
if not bills_list:
# If this is the case, it's usually becaue the EPC was very old. Early EPCs did not have this information
heating_text = None
hot_water_text = None
else:
heating_text = bills_list[0].text
hot_water_text = bills_list[1].text
# Retrieve the recommendations and SAP points
recommendations = []
recommendations_div = soup.find('div', class_='epb-recommended-improvements')
if recommendations_div:
# Find all h3 headers for each step and extract their related information
step_headers = recommendations_div.find_all('h3', class_='govuk-heading-m')
previous_sap_score = current_sap
previous_epc = current_rating.split(' ')[-6]
for step_num, step_header in enumerate(step_headers, start=1):
# Extract the step title (the measure)
measure_title = step_header.text.strip().replace(f"Step {step_num}: ", "")
# Find the div containing the potential rating within the same section
potential_rating_div = step_header.find_next(
'div', class_='epb-recommended-improvements__potential-rating'
)
# Check if the potential rating div is found
if potential_rating_div:
# Extract the rating text within the SVG text element
extracted_rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold')
if extracted_rating_text is not None:
rating_text = extracted_rating_text.text.strip()
else:
rating_text = " ".join([str(previous_sap_score), previous_epc])
# Parse the rating text to separate the numeric rating and EPC letter
new_rating = int(rating_text.split()[0])
new_epc = rating_text.split()[1]
# Append the information as a dictionary to the recommendations list
recommendations.append({
"step": step_num,
"measure": measure_title,
"new_rating": new_rating,
"new_epc": new_epc,
"sap_points": new_rating - previous_sap_score
})
previous_sap_score = new_rating
previous_epc = new_epc
# Search for the assessment informaton
assessment_information = soup.find('div', {'id': 'information'})
# Parse this information
rows = assessment_information.find_all('div', class_='govuk-summary-list__row')
# Create a dictionary to hold the parsed information
assessment_data = {}
for row in rows:
key = row.find('dt').text.strip()
if key == "Type of assessment":
# We dont reliably extract this
continue
value_tag = row.find('dd')
# Check if value contains a link (email)
if value_tag.find('a'):
value = value_tag.find('a').text.strip()
elif value_tag.find('summary'):
value = value_tag.find('span').text.strip()
else:
value = value_tag.text.strip()
# These are keys that we have for both the surveyor and the acreditation scheme. Firstly, we'll
# get the surveyor's name and email so we make that information clear
if key in ["Telephone", "Email"]:
if "Assessor's " + key not in assessment_data:
assessment_data["Assessor's " + key] = value
else:
assessment_data["Accreditation Scheme's " + key] = value
continue
assessment_data[key] = value
expected_keys = [
'Assessors name',
"Assessor's Telephone",
"Assessor's Email",
'Assessors ID',
'Accreditation scheme',
'Assessors declaration',
"Accreditation Scheme's Telephone",
"Accreditation Scheme's Email",
'Date of assessment',
'Date of certificate'
]
# Check we have all the expected keys
for key in expected_keys:
if key not in assessment_data:
raise ValueError(f"Missing key: {key}")
# The wall types of the property
property_features_table = soup.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
# Extract wall types
self.walls = []
for row in property_features_table:
cells = row.find_all("td")
if row.find("th").text.strip() == "Wall":
self.walls.append(cells[0].text.strip())
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
# 4) Low and zero carbon energy sources
low_carbon_energy_sources = self.extract_low_carbon_sources(soup)
# 5) Pull out the EPC data
epc_data = self.extract_epc_data(soup)
resulting_data = {
'epc_certificate': epc_certificate,
'current_epc_rating': current_rating.split(' ')[-6],
'current_epc_efficiency': current_sap,
'potential_epc_rating': potential_rating.split(' ')[-6],
"potential_epc_efficiency": int(potential_rating.split(' ')[-1]),
"heating_text": heating_text,
"hot_water_text": hot_water_text,
"recommendations": recommendations,
"epc_data": epc_data,
**assessment_data,
**low_carbon_energy_sources,
}
return resulting_data
def retrieve_all_find_my_epc_data(self, sap_2012_date=None):
"""
This is a quick function to retrieve all the data from the find my epc website for a given postcode and address.
Using this to fulfill a short term need to retrieve all history for a property
:param sap_2012_date:
:return:
"""
postcode_input = self.postcode.replace(" ", "+")
postcode_search = self.SEARCH_POSTCODE_URL.format(postcode_input=postcode_input)
postcode_response = requests.get(postcode_search, headers=self.HEADERS)
postcode_res = BeautifulSoup(postcode_response.text, features="html.parser")
rows = postcode_res.find_all('tr', class_='govuk-table__row')
extracted_table = []
for row in rows:
# Extract the address and URL
address_tag = row.find('a', class_='govuk-link')
if address_tag is None:
continue
extracted_address = None
extracted_address_url = None
if address_tag:
extracted_address = address_tag.text.strip()
extracted_address_url = address_tag['href']
extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower()
if not extracted_address_cleaned.startswith(self.address_cleaned):
continue
# If the address is a match, we can extract the data
# Extract the expiry date
expiry_date_tag = row.find('td', class_='govuk-table__cell date')
expiry_date = None
if expiry_date_tag is not None:
expiry_date = expiry_date_tag.parent.find('span').text.strip()
extracted_table.append(
{
"extracted_address": extracted_address,
"extracted_address_url": extracted_address_url,
"expiry_date": datetime.strptime(expiry_date, '%d %B %Y'),
}
)
if not extracted_table:
raise ValueError("No EPC found")
if len(extracted_table) > 1:
# We take the one with the most recent expiry date
extracted_table = sorted(extracted_table, key=lambda x: x['expiry_date'], reverse=True)
chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url']
epc_certificate = chosen_epc.split('/')[-1]
address_response = requests.get(chosen_epc, headers=self.HEADERS)
address_res = BeautifulSoup(address_response.text, features="html.parser")
# We check the section on "Other cerificates for this property and get the url"
# Find the section for other certificates
other_cert_section = address_res.find('div', id='other_certificates_and_reports')
# Extract all certificate number rows (anchor tags within a govuk-summary-list)
other_cert_links = other_cert_section.select('dd.govuk-summary-list__value a')
other_certificates = []
for link in other_cert_links:
cert_number = link.text.strip()
cert_url = link['href'].strip()
other_certificates.append({
"certificate_number": cert_number,
"certificate_url": f"https://find-energy-certificate.service.gov.uk{cert_url}"
})
# Always include the currently selected EPC first
soup_list = [address_res]
# Add additional historic certificates
for link in other_cert_links:
cert_url = f"https://find-energy-certificate.service.gov.uk{link['href'].strip()}"
response = requests.get(cert_url, headers=self.HEADERS)
time.sleep(0.3)
soup_list.append(BeautifulSoup(response.text, features="html.parser"))
all_find_my_epc_data = []
for soup in soup_list:
# Start with the primary one
all_find_my_epc_data.append(self._extract_epc_from_soup(soup, epc_certificate, sap_2012_date))
return all_find_my_epc_data
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None):
"""
For a post code and address, we pull out all the required data from the find my epc website
@ -195,9 +433,6 @@ class RetrieveFindMyEpc:
potential_rating = ratings.split(".")[1]
current_sap = int(current_rating.split(' ')[-1])
# Floor area
address_res.find()
# Retrieve the energy consumption
bills = address_res.find('div', {'id': 'bills-affected'})
bills_list = bills.find_all('li')

View file

@ -194,7 +194,7 @@ class Costs:
IWI_CONTINGENCY = 0.2
# For air source heat pumps, we inflate the assume cost by quite a bit to account for design and installation
ASHP_CONTINGENCY = 0.35
ASHP_CONTINGENCY = 0.25
# Where there is more uncertainty, a higher contingency rate is used
HIGH_RISK_CONTINGENCY = 0.2
# When there is less uncertainty, a lower contingency rate is used

View file

@ -517,17 +517,30 @@ class HeatingRecommender:
]
# This is a map from the heating controls description to the description of the air source heat pump set up
ashp_descriptions = {
"Time and temperature zone control": (
f"Install a {ashp_size}KW air source heat pump, and upgrade heating controls to Smart Thermostats, "
"room sensors and smart radiator valves (time & temperature zone control). Ensure you have an 18 or "
"24 hour tariff"
),
"Programmer, TRVs and bypass": (
f"Install a {ashp_size}KW air source heat pump, with programmer, TRVs and a Bypass valve. Ensure you "
"have an 18 or 24 hour tariff"
),
}
if ashp_size is None:
ashp_descriptions = {
"Time and temperature zone control": (
f"Install two cascaded air source heat pumps, and upgrade heating controls to Smart Thermostats, "
"room sensors and smart radiator valves (time & temperature zone control). Ensure you have an 18 "
"or "
"24 hour tariff"
)
}
else:
ashp_descriptions = {
"Time and temperature zone control": (
f"Install a {ashp_size}KW air source heat pump, and upgrade heating controls to Smart Thermostats, "
"room sensors and smart radiator valves (time & temperature zone control). Ensure you have an 18 "
"or "
"24 hour tariff"
),
"Programmer, TRVs and bypass": (
f"Install a {ashp_size}KW air source heat pump, with programmer, TRVs and a Bypass valve. Ensure "
f"you "
"have an 18 or 24 hour tariff"
),
}
new_heating_description = "Air source heat pump, radiators, electric"
new_hot_water_description = "From main system"

View file

@ -191,11 +191,22 @@ class RoofRecommendations:
non_invasive_recommendations = self.property.non_invasive_recommendations
# We check a specific condition - which will imply loft insulation isn't appropriate but room in roof
# insulation is
# 1) We have an uninsulated loft (assumed)
# 2) We have a non-intrusive recommendation for room in roof insulation
rir_over_loft = (
self.property.roof["is_pitched"] and
self.property.roof["insulation_thickness"] == "none" and
"room_in_roof_insulation" in [x["type"] for x in non_invasive_recommendations]
)
# We firstly handle non-intrusive recommendations, which may override the normal roof insulation recommendations
if ("loft_insulation" in [x["type"] for x in non_invasive_recommendations]) or (
self.property.roof["is_pitched"] and "loft_insulation" in measures and
not self.property.roof["is_at_rafters"]
):
) and not rir_over_loft:
self.recommend_roof_insulation(
u_value=u_value,
insulation_thickness=self.insulation_thickness,
@ -223,7 +234,8 @@ class RoofRecommendations:
# There are cases where the property might have a room roof as the second roof, but we have a recommendation for
# it, so we allow this override
if self.property.roof["is_roof_room"] and ("room_roof_insulation" in measures) or (
"room_roof_insulation" in [x["type"] for x in non_invasive_recommendations]
"room_roof_insulation" in [x["type"] for x in non_invasive_recommendations] or
rir_over_loft
):
self.recommend_room_roof_insulation(u_value, phase, default_u_values)
return
@ -502,7 +514,7 @@ class RoofRecommendations:
# and the cost of the materials
rir_non_invasive_recommendation = next(
(x for x in self.property.non_invasive_recommendations if x["type"] == "room_roof_insulation"), {}
(x for x in self.property.non_invasive_recommendations if x["type"] == "room_in_roof_insulation"), {}
)
insulation_materials = pd.DataFrame(self.room_roof_insulation_materials)

View file

@ -31,7 +31,7 @@ class VentilationRecommendations(Definitions):
"""
self.property.identify_ventilation()
if self.property.has_ventilaion:
if self.property.has_ventilation:
return
if len(self.materials) != 1:

View file

@ -4,6 +4,7 @@ data, we know it will work.
"""
import pandas as pd
from utils.s3 import read_csv_from_s3
birmingham_epcs = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/domestic-E08000025-Birmingham/certificates.csv"
@ -17,6 +18,29 @@ birmingham_epcs = birmingham_epcs.sort_values(
ascending=[True, False]
).drop_duplicates(subset='UPRN')
birmingham_epcs["postal_region"] = birmingham_epcs["POSTCODE"].str.split(" ").str[0]
addressable_market = birmingham_epcs[
(birmingham_epcs['CURRENT_ENERGY_RATING'].isin(['F', 'G', 'E'])) &
(birmingham_epcs['LODGEMENT_DATE'] >= '2020-01-01') &
(birmingham_epcs['PROPERTY_TYPE'].isin(['House', 'Bungalow'])) &
(birmingham_epcs['TENURE'].isin(
['rental (private)', 'Rented (private)']
))
]
# We take the Spring portfolio and remove the properties in their sample
asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv')
asset_list = pd.DataFrame(asset_list)
asset_list["postal_region"] = asset_list["postcode"].str.split(" ").str[0]
addressable_market = addressable_market[
~addressable_market["UPRN"].astype(int).astype(str).isin(asset_list["uprn"].values)
]
addressable_market = addressable_market[
addressable_market["postal_region"].isin(asset_list["postal_region"].unique())
]
# Take a sample of properties, EPC F or G, EPC lodged in 2025. We focus on houses/bingalows
sample = birmingham_epcs[
(birmingham_epcs['CURRENT_ENERGY_RATING'].isin(['F', 'G'])) &

View file

@ -16,8 +16,23 @@ EPC_TARGET = "C"
# Read the input file
properties = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/birmingham_sample.xlsx"
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Birmingham_price_top300.xlsx"
)
# Keep just the D's and below
properties = properties[properties["current_energy_rating"].isin(["D", "E", "F", "G"])].copy()
# Focus on houses
properties = properties[properties["property_type_std"] != "Flat"]
properties = properties[properties["property_type"] != "flat"]
# Rename the key columns
properties = properties.rename(
columns={
"address1": "address",
"number_of_bathrooms": "n_bathrooms",
"num_beds": "n_bedrooms"
}
)
properties["patch"] = True
# Pull the non-invasive recommendations
asset_list_epc_client = AssetListEpcData(
@ -27,7 +42,39 @@ asset_list_epc_client = AssetListEpcData(
asset_list_epc_client.get_data()
asset_list_epc_client.get_non_invasive_recommendations()
asset_list_epc_client.get_patch()
# TODO; Find some new, on-market opportunities that aren't on the EPC API, so we definitely have a patch
extracted_df = pd.DataFrame(asset_list_epc_client.extracted_data)
epc_df = pd.DataFrame(asset_list_epc_client.epc_data)
# Find examples where patches are different to the api
compare_epc = []
for patch in asset_list_epc_client.patches:
extracted = extracted_df[extracted_df["uprn"] == patch["uprn"]].squeeze()
epc = epc_df[epc_df["uprn"] == patch["uprn"]].squeeze()
compare_epc.append(
{
"uprn": extracted["uprn"],
"address": extracted["address"],
"postcode": extracted["postcode"],
"api_epc": int(extracted["current_epc_efficiency"]),
"fme_epc": int(epc["current-energy-efficiency"]),
}
)
compare_epc = pd.DataFrame(compare_epc)
diff = compare_epc[compare_epc["api_epc"] != compare_epc["fme_epc"]]
# Compare matched addresses to make sure they are the same
compare_addresses = extracted_df[["address", "postcode", "uprn"]].merge(
epc_df[["uprn", "address1", "postcode"]].rename(columns={"address1": "epc_address1", "postcode": "epc_postcode"}),
how="left",
on=["uprn"]
)
# Add on uprn
properties = properties.merge(
extracted_df[["address", "postcode", "uprn"]],
how="left",
on=["address", "postcode"]
)
# Store the asset list in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"

View file

@ -0,0 +1,224 @@
"""
This script prepares the data for the financial model
"""
import pandas as pd
from backend.app.utils import sap_to_epc
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
PORTFOLIO_ID = 206
SCENARIOS = [389]
def get_data(portfolio_id, scenario_ids):
session = sessionmaker(bind=db_engine)()
session.begin()
# Get properties and their details for a specific portfolio
properties_query = session.query(
PropertyModel,
PropertyDetailsEpcModel
).join(
PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id
).filter(
PropertyModel.portfolio_id == portfolio_id # Filter by portfolio ID
).all()
# Transform properties data to include all fields dynamically
properties_data = [
{**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
**{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
PropertyDetailsEpcModel.__table__.columns}}
for prop in properties_query
]
# Get property IDs from fetched properties
# Get plans linked to the fetched properties
plans_query = session.query(Plan).filter(Plan.scenario_id.in_(scenario_ids)).all()
# Transform plans data to include all fields dynamically
plans_data = [
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
for plan in plans_query
]
# Extract plan IDs for filtering recommendations through PlanRecommendations
plan_ids = [plan['id'] for plan in plans_data]
# Get recommendations through PlanRecommendations for those plans and that are default
recommendations_query = session.query(
Recommendation,
Plan.scenario_id
).join(
PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
).join(
Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
).filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default == True # Filtering for default recommendations
).all()
# Transform recommendations data to include all fields dynamically and include scenario_id
recommendations_data = [
{**{col.name: getattr(rec.Recommendation, col.name) if hasattr(rec, 'Recommendation') else getattr(rec,
col.name) for
col in Recommendation.__table__.columns},
"Scenario ID": rec.scenario_id}
for rec in recommendations_query
]
session.close()
return properties_data, plans_data, recommendations_data
properties_data, plans_data, recommendations_data = get_data(portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS)
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
recommended_measures_df = recommendations_df[
["property_id", "measure_type", "estimated_cost", "default"]
]
recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]]
recommended_measures_df = recommended_measures_df.drop(columns=["default"])
post_install_sap = recommendations_df[["property_id", "default", "sap_points"]]
post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = post_install_sap.groupby("property_id")[["sap_points"]].sum().reset_index()
recommendations_measures_pivot = recommended_measures_df.pivot(
index='property_id',
columns='measure_type',
values='estimated_cost'
)
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
# Total cost is the row sum, excluding the property_id column
recommendations_measures_pivot["total_retrofit_cost"] = recommendations_measures_pivot.drop(
columns=["property_id"]
).sum(axis=1)
df = properties_df[
[
"property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows",
"current_epc_rating",
"current_sap_points", "total_floor_area", "number_of_rooms",
]
].merge(
recommendations_measures_pivot, how="left", on="property_id"
).merge(
post_install_sap, how="left", on="property_id"
)
df = df.drop(columns=["property_id"])
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
df["predicted_post_works_sap"] = df["predicted_post_works_sap"].round()
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(lambda x: sap_to_epc(x))
# We merge this back to the main dataframe, which will contain the bathrooms
from utils.s3 import read_csv_from_s3
asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv')
asset_list = pd.DataFrame(asset_list)
df["uprn"] = df["uprn"].astype(str)
asset_list = asset_list.merge(
df.drop(columns=["address", "postcode", "property_type", "total_floor_area"]),
how="left",
on="uprn"
)
condition_costs = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx",
sheet_name="Prices - Khalim",
header=35
)
# Remove unnamed columns and reset index
condition_costs = condition_costs.loc[:, ~condition_costs.columns.str.contains('^Unnamed')]
condition_costs = condition_costs.reset_index(drop=True)
# We now estimate condition cost
def simulate_condition(asset_list, condition_costs):
"""
This function is for testing, and will simulate condition cost from 1-10 for each property to see what the
costing array looks like.
:param df:
:return:
"""
condition_df = []
for _, row in asset_list.iterrows():
n_bathrooms = row["bathrooms"]
conditions = {}
for condition in reversed(range(1, 11)):
condition_cost = condition_costs[
condition_costs["Condition"] == condition
].drop(columns=["Condition"]).iloc[0]
# Each cost is scaled by floor area
condition_cost = condition_cost * row["total_floor_area"]
condition_cost["Bathroom"] = condition_cost["Bathroom"] * n_bathrooms
total_condition_cost = condition_cost.sum()
conditions["Condition " + str(condition)] = (total_condition_cost)
condition_df.append(
{
"uprn": row["uprn"],
**conditions
}
)
condition_df = pd.DataFrame(condition_df)
asset_list = asset_list.merge(
condition_df,
how="left",
on="uprn"
)
return asset_list
# asset_list = simulate_condition(asset_list, condition_costs)
# We calculate the condition cost based on the condition
for _, row in asset_list.iterrows():
condition = row["condition_score"]
if condition in [None, ""]:
continue
condition = int(float(condition))
condition_cost = condition_costs[
condition_costs["Condition"] == condition
].drop(columns=["Condition"]).iloc[0]
# Each cost is scaled by floor area
condition_cost = condition_cost * float(row["total_floor_area"])
n_bathrooms = row["n_bathrooms"]
condition_cost["Bathroom"] = condition_cost["Bathroom"] * float(n_bathrooms)
total_condition_cost = condition_cost.sum()
asset_list.loc[asset_list["uprn"] == row["uprn"], "domna_condition_cost"] = total_condition_cost
# Store output
asset_list.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/20250624_portfolio_retrofit_packages.xlsx",
index=False
)
condition_cost_comparison = asset_list[
["condition_score", "decoration_sum_min ", "decoration_sum_max", "domna_condition_cost"]
]