Merge pull request #658 from Hestia-Homes/portfolio-diagnostics

Implemented a better version of already installed re-baselining
This commit is contained in:
KhalimCK 2026-01-10 21:25:43 +00:00 committed by GitHub
commit 689f1a23d1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1422 additions and 238 deletions

View file

@ -469,10 +469,8 @@ class Property:
# It means we've recommended HHR with electric immersion, and shouldn't overwrite
# the hot water description
continue
# Set the new value otherwise as it's due to already installed measures - do nothing
raise NotImplementedError(
"Already have this key in the phase_epc_transformation - implement me"
)
phase_epc_transformation[k] = v
simulation_epc.update(phase_epc_transformation)
self.simulation_epcs[rec["recommendation_id"]] = simulation_epc
@ -800,13 +798,19 @@ class Property:
to_update[k] = None
return to_update
def get_full_property_data(self, current_valuation=None):
def get_full_property_data(self, current_valuation=None, needs_rebaselining=False, rebaselining_sap=0):
"""
This method extracts the data which is pushed to the database, containing core information, from the EPC
about a property
:return:
"""
current_sap_rating = self.data["current-energy-efficiency"]
if needs_rebaselining:
current_sap_rating += rebaselining_sap
current_epc_rating = sap_to_epc(current_sap_rating)
property_data = {
"creation_status": "READY",
"uprn": int(self.data["uprn"]),
@ -823,9 +827,12 @@ class Property:
"number_of_rooms": self.number_of_rooms,
"year_built": self.year_built,
"tenure": self.data["tenure"],
"current_epc_rating": self.data["current-energy-rating"],
"current_sap_points": self.data["current-energy-efficiency"],
"current_epc_rating": current_epc_rating,
"current_sap_points": current_sap_rating,
"current_valuation": current_valuation,
"original_sap_points": self.data["current-energy-efficiency"],
"is_sap_points_adjusted_for_installed_measures": needs_rebaselining,
"installed_measures_sap_point_adjustment": rebaselining_sap,
}
property_data = self._clean_upload_data(property_data)
@ -843,7 +850,10 @@ class Property:
else None
)
def get_property_details_epc(self, portfolio_id: int):
def get_property_details_epc(
self, portfolio_id: int, needs_rebaselining: bool = False, rebaselining_carbon: float = 0,
rebaselining_heat_demand: float = 0, rebaselining_kwh: float = 0, rebaselining_bills: float = 0
):
if self.current_energy_bill is None:
raise ValueError("Current energy bill has not been set")
@ -866,6 +876,19 @@ class Property:
# We check if the lodgement date is more than 10 years old
is_expired = (datetime.now() - pd.to_datetime(lodgement_date)) > timedelta(days=3650)
# Handle re-baselining
co2_emissions = self.energy["co2_emissions"]
primary_energy_consumption = self.energy["primary_energy_consumption"]
current_kwh_demand = self.current_energy_consumption
current_kwh_heating_hotwater = self.current_energy_consumption_heating_hotwater
if needs_rebaselining:
# Carbon will be reduced
co2_emissions -= rebaselining_carbon
# Heat demand will be reduced
primary_energy_consumption -= rebaselining_heat_demand
current_kwh_demand -= rebaselining_kwh
current_kwh_heating_hotwater -= rebaselining_kwh
property_details_epc = {
"property_id": self.id,
"portfolio_id": portfolio_id,
@ -902,16 +925,25 @@ class Property:
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
"mains_gas": self.mains_gas,
"energy_tariff": self.data["energy-tariff"],
"primary_energy_consumption": self.energy["primary_energy_consumption"],
"co2_emissions": self.energy["co2_emissions"],
"current_energy_demand": self.current_energy_consumption,
"current_energy_demand_heating_hotwater": self.current_energy_consumption_heating_hotwater,
"primary_energy_consumption": primary_energy_consumption,
"co2_emissions": co2_emissions,
"current_energy_demand": current_kwh_demand, # This is kwh - naming is confusing
"current_energy_demand_heating_hotwater": current_kwh_heating_hotwater, # This is kwh
"estimated": self.data.get("estimated", False),
# We indicate if we've overwritten a SAP 05 EPC
"sap_05_overwritten": sap_05_overwritten,
"sap_05_score": sap_05_score,
"sap_05_epc_rating": sap_05_epc_rating,
**self.current_energy_bill
**self.current_energy_bill,
"original_co2_emissions": self.energy["co2_emissions"],
"original_primary_energy_consumption": self.energy["primary_energy_consumption"],
"original_current_energy_demand": self.current_energy_consumption, # Bad naming, this is kwh
"original_current_energy_demand_heating_hotwater": self.current_energy_consumption_heating_hotwater, # kwh
"installed_measures_co2_adjustment": rebaselining_carbon,
"installed_measures_energy_demand_adjustment": rebaselining_kwh, # kwh
"installed_measures_total_energy_bill_adjustment": rebaselining_bills,
"installed_measures_heat_demand_adjustment": rebaselining_heat_demand,
"is_epc_adjusted_for_installed_measures": needs_rebaselining,
}
return property_details_epc

View file

@ -10,3 +10,4 @@ from .materials_functions import *
from .inspections_functions import *
from .non_intrusive_surveys import *
from .whlg_functions import *
from .already_installed_functions import *

View file

@ -0,0 +1,40 @@
from backend.app.db.models.recommendations import InstalledMeasure
from typing import Dict, List, Set
from collections import defaultdict
def get_installed_measure_types_by_uprns(
session,
uprns: List[int],
) -> Dict[int, Set[str]]:
"""
Returns installed measure types per UPRN.
{
uprn: {"cavity_wall_insulation", "mechanical_ventilation", ...}
}
"""
if not uprns:
return {}
rows = (
session.query(
InstalledMeasure.uprn,
InstalledMeasure.measure_type,
)
.filter(InstalledMeasure.is_active.is_(True))
.filter(InstalledMeasure.uprn.in_(uprns))
.all()
)
out: Dict[int, Set[str]] = defaultdict(set)
for uprn, measure_type in rows:
out[uprn].add(
measure_type.value
if hasattr(measure_type, "value")
else measure_type
)
return out

View file

@ -10,7 +10,8 @@ from backend.app.db.connection import db_session, db_read_session
def prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations,
rebaselining_carbon=0, rebaselining_heat_demand=0, rebaselining_kwh=0, rebaselining_bills=0,
):
"""
Utility function to prepare the data that goes into the production of a plan. Is a fairly rough and unstructured
@ -23,19 +24,29 @@ def prepare_plan_data(
:param new_sap_points: sap points, post default recommendations
:param new_epc: new epc rating, post default recommendations
:param default_recommendations: list of default recommendations for a property
:param rebaselining_carbon: carbon emissions adjustment for rebaselining
:param rebaselining_heat_demand: heat demand adjustment for rebaselining
:param rebaselining_kwh: kwh consumption adjustment for rebaselining
:param rebaselining_bills: energy bill adjustment for rebaselining
:return:
"""
# Plan carbon savings
co2_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
post_co2_emissions = p.data["co2-emissions-current"] - co2_savings
co2_savings = sum(
[r["co2_equivalent_savings"] for r in default_recommendations if not r.get("already_installed", False)]
)
post_co2_emissions = p.energy["co2_emissions"] - rebaselining_carbon - co2_savings
# Plan bill savings
energy_bill_savings = sum([r["energy_cost_savings"] for r in default_recommendations])
post_energy_bill = sum(p.current_energy_bill.values()) - energy_bill_savings
energy_bill_savings = sum(
[r["energy_cost_savings"] for r in default_recommendations if not r.get("already_installed", False)]
)
post_energy_bill = sum(p.current_energy_bill.values()) - rebaselining_bills - energy_bill_savings
# energy consumption
energy_consumption_savings = sum([r["kwh_savings"] for r in default_recommendations])
post_energy_consumption = p.current_energy_consumption - energy_consumption_savings
energy_consumption_savings = sum(
[r["kwh_savings"] for r in default_recommendations if not r.get("already_installed", False)]
)
post_energy_consumption = p.current_energy_consumption - rebaselining_kwh - energy_consumption_savings
valuation_post_retrofit, valuation_increase = None, None
if valuations["current_value"]:
@ -43,8 +54,10 @@ def prepare_plan_data(
valuation_post_retrofit = valuations["average_increased_value"]
# plan costing data
cost_of_works = sum([r["total"] for r in default_recommendations])
contingency_cost = sum([r.get("contingency", 0) for r in default_recommendations])
cost_of_works = sum([r["total"] for r in default_recommendations if not r.get("already_installed", False)])
contingency_cost = sum(
[r.get("contingency", 0) for r in default_recommendations if not r.get("already_installed", False)]
)
return {
"portfolio_id": body.portfolio_id,

View file

@ -5,6 +5,5 @@ from typing import Any, Optional
@dataclass
class PropertyRequestData:
patch: dict
already_installed: list
non_invasive_recommendations: dict
valuation: Optional[float]

View file

@ -52,7 +52,7 @@ def patch_epc(patch, epc_records):
def extract_property_request_data(
address: Address, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
address: Address, patches, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
@ -64,10 +64,6 @@ def extract_property_request_data(
x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), {})
property_already_installed = next((
x for x in already_installed if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), [])
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
@ -119,7 +115,6 @@ def extract_property_request_data(
# Return data class to give a structured format
return PropertyRequestData(
patch=patch,
already_installed=property_already_installed,
non_invasive_recommendations=property_non_invasive_recommendations,
valuation=property_valuation
)

View file

@ -525,6 +525,22 @@ def extract_address_data(config, body):
return uprn, address1, full_address
def keep_max_sap_per_measure_type(items):
# First pass: find max sap_points per measure_type
max_by_type = {}
for item in items:
t = item["measure_type"]
max_by_type[t] = max(max_by_type.get(t, float("-inf")), item["sap_points"])
# Second pass: keep only items matching the max for their type
output = []
for measure_type, points in max_by_type.items():
to_consider = [x for x in items if x["measure_type"] == measure_type and x["sap_points"] == points]
output.append(to_consider[0]) # pick the first one in case of ties
return output
async def model_engine(body: PlanTriggerRequest):
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
@ -684,6 +700,9 @@ async def model_engine(body: PlanTriggerRequest):
energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns(
session, uprns
)
already_installed_by_uprn = db_funcs.already_installed_functions.get_installed_measure_types_by_uprns(
session, uprns
)
# If we have properties that need to be created, we cerate them in bulk
logger.info("Determine new properties to be created")
@ -703,7 +722,7 @@ async def model_engine(body: PlanTriggerRequest):
property_lookup[("uprn", uprn)] = prop_id
if landlord_property_id:
property_lookup[("landlord_property_id", landlord_property_id)] = prop_id
logger.info("Processing each property for model input preparation")
input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, []
for addr, config in tqdm(
@ -725,6 +744,8 @@ async def model_engine(body: PlanTriggerRequest):
energy_assessment = energy_assessments_by_uprn.get(addr.uprn)
property_already_installed = list(already_installed_by_uprn[addr.uprn])
epc_searcher = SearchEpc(
address1=addr.address1,
postcode=addr.postcode,
@ -767,7 +788,6 @@ async def model_engine(body: PlanTriggerRequest):
req_data = extract_property_request_data(
address=addr,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=addr.uprn,
@ -813,7 +833,7 @@ async def model_engine(body: PlanTriggerRequest):
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=req_data.already_installed + eco_packages.get(property_id)[3],
already_installed=property_already_installed + eco_packages.get(property_id)[3],
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
@ -925,9 +945,7 @@ async def model_engine(body: PlanTriggerRequest):
# any panel performance, we ensure that we have a 3kWp and 4kWp option for the property
logger.info("Identifying property recommendations")
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
recommendations, recommendations_scoring_data, representative_recommendations = {}, [], {}
for p in tqdm(input_properties):
# We set the ECO package data, if we have it
property_eco_package = eco_packages.get(p.id, (None, None, None))
@ -961,15 +979,15 @@ async def model_engine(body: PlanTriggerRequest):
recommendations_scoring_data.extend(p.recommendations_scoring_data)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data).drop(
columns=[
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"
]
)
# Temp putting this here
recommendations_scoring_data["is_post_sap10_ending"] = True
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
)
all_predictions = await model_api.async_paginated_predictions(
data=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
@ -1009,19 +1027,19 @@ async def model_engine(body: PlanTriggerRequest):
# We now insert kwh estimates and costs into the recommendations
logger.info("Calculating tenant savings - kwh and bills")
for property_id in tqdm([p.id for p in input_properties]):
for p in tqdm(input_properties):
property_id = p.id
property_recommendations = recommendations.get(property_id, [])
property_instance = [p for p in input_properties if p.id == property_id][0]
property_current_energy_bill = (
Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
property_instance=p,
kwh_simulation_predictions=kwh_simulation_predictions,
property_recommendations=property_recommendations,
ashp_cop=body.ashp_cop
)
)
property_instance.current_energy_bill = property_current_energy_bill
p.current_energy_bill = property_current_energy_bill
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising measures")
@ -1055,14 +1073,38 @@ async def model_engine(body: PlanTriggerRequest):
# We insert the innovation uplift
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (0, 0, 0, 0)
already_installed_measures = []
for measures in measures_to_optimise_with_uplift:
for m in measures:
# A) We're going to make the already installed measures default
# B) We need to SAP points for all already installed measures to avoid double counting
if m["already_installed"]:
already_installed_measures.append(
{
"id": m["recommendation_id"],
"measure_type": m["measure_type"],
"sap_points": m["sap_points"],
}
)
# We get the ones with the highest SAP
default_already_installed = keep_max_sap_per_measure_type(already_installed_measures)
already_installed_sap = float(sum(d["sap_points"] for d in default_already_installed))
# Remove them from the optimisation pool
finalised_measures_to_optimise = []
for m in measures_to_optimise_with_uplift:
filtered = [x for x in m if not x["already_installed"]]
if filtered:
finalised_measures_to_optimise.append(filtered)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
finalised_measures_to_optimise, body.goal, needs_ventilation, funding=True,
property_eco_packages=eco_packages.get(p.id)
)
@ -1073,9 +1115,10 @@ async def model_engine(body: PlanTriggerRequest):
p=p,
input_measures=input_measures,
budget=body.budget,
target_gain=gain,
target_gain=gain - already_installed_sap,
enforce_heat_pump_insulation=True,
enforce_fabric_first=body.enforce_fabric_first
enforce_fabric_first=body.enforce_fabric_first,
already_installed_sap=already_installed_sap, # To be passed to output
)
# if handle the empty case
@ -1118,7 +1161,8 @@ async def model_engine(body: PlanTriggerRequest):
)
battery_sap_score = BatterySAPScorer.score(starting_sap=post_sap, pv_size=pv_size)
selected = {r["id"] for r in solution}
# We add the defauly already installed measures to the solution
selected = {r["id"] for r in solution + default_already_installed}
if property_required_measures:
solution = optimiser_functions.add_required_measures(
@ -1189,25 +1233,51 @@ async def model_engine(body: PlanTriggerRequest):
property_updates, property_epc_details, property_spatial_updates = [], [], []
plans_to_create, recommendations_to_create = [], []
# Prepare the data that will need to be uploaded in bulk
for p in input_properties:
recommendations_for_property = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_for_property if r["default"]]
# We need to:
# Get already installed measures
already_installed_default = [r for r in default_recommendations if r["already_installed"]]
# Property should be have increased SAP
needs_rebaselining = bool(len(already_installed_default))
rebaselining_sap = float(sum([r["sap_points"] for r in already_installed_default]))
rebaselining_carbon = float(sum([r["co2_equivalent_savings"] for r in already_installed_default]))
rebaselining_heat_demand = float(sum([r["heat_demand"] for r in already_installed_default]))
rebaselining_kwh = float(sum([r["kwh_savings"] for r in already_installed_default]))
rebaselining_bills = float(sum([r["energy_cost_savings"] for r in already_installed_default]))
# This will include everything, including already installed
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
total_cost = sum([r["total"] for r in default_recommendations])
# Already installed measures do not have a cost but we remove anyway
total_cost = sum([r["total"] for r in default_recommendations if not r["already_installed"]])
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc, total_cost=total_cost)
# --- property-level updates (always) ---
property_updates.append({
"property_id": p.id,
"portfolio_id": body.portfolio_id,
"data": p.get_full_property_data(current_valuation=valuations["current_value"])
"data": p.get_full_property_data(
current_valuation=valuations["current_value"],
needs_rebaselining=needs_rebaselining,
rebaselining_sap=rebaselining_sap,
)
})
property_epc_details.append(p.get_property_details_epc(portfolio_id=body.portfolio_id))
property_epc_details.append(
p.get_property_details_epc(
portfolio_id=body.portfolio_id,
needs_rebaselining=needs_rebaselining,
rebaselining_carbon=rebaselining_carbon,
rebaselining_heat_demand=rebaselining_heat_demand,
rebaselining_kwh=rebaselining_kwh,
rebaselining_bills=rebaselining_bills,
)
)
property_spatial_updates.append({"uprn": p.uprn, "data": p.spatial})
@ -1216,7 +1286,18 @@ async def model_engine(body: PlanTriggerRequest):
continue
plan_data = db_funcs.recommendations_functions.prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations
p=p,
body=body,
scenario_id=scenario_id,
eco_packages=eco_packages,
valuations=valuations,
new_sap_points=new_sap_points,
new_epc=new_epc,
default_recommendations=default_recommendations,
rebaselining_carbon=rebaselining_carbon,
rebaselining_heat_demand=rebaselining_heat_demand,
rebaselining_kwh=rebaselining_kwh,
rebaselining_bills=rebaselining_bills,
)
plans_to_create.append({"property_id": p.id, "plan_data": plan_data})

View file

@ -142,7 +142,8 @@ class ModelApi:
@staticmethod
def extract_phase(recommendation_id):
if 'phase=' in recommendation_id:
return int(recommendation_id.split('phase=')[1][0])
extracted = recommendation_id.split('phase=')[1]
return int(extracted.strip())
else:
return None

View file

@ -0,0 +1,14 @@
party_map = {
"Before 1900": 'England and Wales: before 1900',
"1900-1929": 'England and Wales: 1900-1929',
"1930-1949": 'England and Wales: 1930-1949',
"1950-1966": 'England and Wales: 1950-1966',
"1967-1975": 'England and Wales: 1967-1975',
"1976-1982": 'England and Wales: 1976-1982',
"1983-1990": 'England and Wales: 1983-1990',
"1991-1995": 'England and Wales: 1991-1995',
"1996-2002": 'England and Wales: 1996-2002',
"2003-2006": 'England and Wales: 2003-2006',
"2007-2011": 'England and Wales: 2007-2011',
"2012 onwards": 'England and Wales: 2012-2021',
}

View file

@ -0,0 +1,15 @@
parity_map = {
"MidTerrace": "Mid-Terrace",
"EndTerrace": "End-Terrace",
"Detached": "Detached",
"SemiDetached": "Semi-Detached",
"EnclosedMidTerrace": "Enclosed Mid-Terrace",
"EnclosedEndTerrace": "Enclosed End-Terrace",
}
# MidTerrace 41462
# EndTerrace 20910
# Detached 16875
# SemiDetached 14725
# EnclosedMidTerrace 3176
# EnclosedEndTerrace 2393

View file

@ -0,0 +1,6 @@
parity_map = {
"Flat": "Flat",
"Maisonette": "Maisonette",
"Bungalow": "Bungalow",
"House": "House",
}

View file

@ -0,0 +1,3 @@
parity_map = {
}

View file

@ -0,0 +1,95 @@
import pandas as pd
from etl.epc.DataProcessor import construction_age_bounds_map
from backend.onboarders.mappings.property_type import parity_map as property_map
from backend.onboarders.mappings.age_band import party_map as age_band_map
from backend.onboarders.mappings.built_form import parity_map as built_form_map
def check_nulls(data, original_column, mapped_column):
# We only allow nulls if the oroginal value was null
null_vals = data[pd.isnull(data[mapped_column])]
if null_vals.empty:
return True
# We make sure all original values were null
assert pd.isnull(null_vals[original_column]).all(), (
f"Some values in {mapped_column} were not mapped, but original values were not null"
)
# Sample input data
data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)
# We want to map the parity fields to standard EPC references. This will allow us to
# 1) Estimate EPCs, more accurately
# 2) Patch incorrect EPCs with ease
# 3) Indicate already installed measures
# ------------ construction_age_band ------------
# Map to EPC age bands
# def construction_date_to_band(year):
# if pd.isnull(year):
# return None
# # Get the year from the date which is numpy datetime format
# for label, ranges in construction_age_bounds_map.items():
# if ranges["l"] <= year <= ranges["u"]:
# return label
# raise NotImplementedError("year out of bounds")
#
#
# data["construction_age_band"] = pd.to_datetime(data["Construction Date"]).dt.year.apply(construction_date_to_band)
data["construction_age_band"] = data["Construction Years"].map(age_band_map)
check_nulls(data, "Construction Years", "construction_age_band")
# ------------ property_type ------------
data["property_type"] = data["Type"].map(property_map)
assert pd.isnull(data["property_type"]).sum() == 0, "Some property types were not mapped"
# ------------ built_form ------------
data["built_form"] = data["Attachment"].map(built_form_map)
assert pd.isnull(data["built_form"]).sum() == 0, "Some built forms were not mapped"
# ------------ Wall Construction ------------
data["walls_combined"] = data["Wall Construction"] + "+" + data["Wall Insulation"].fillna("Unknown Insulation")
data["Wall Insulation"].value_counts()
data["Wall Construction"].value_counts()
as_built_map = {
"Cavity": {"insulated_age_bands":[], "partial_insulated_age_bands": []},
"Solid Brick": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"System": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Timber Frame": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Sandstone": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Granite": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Cob": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
}
def map_wall_construction(wall_constuction, wall_insulation, construction_age_band):
if wall_insulation == "AsBuilt":
# Deduce based on wall construction and age band
bands = as_built_map.get(wall_constuction, None)
if bands is None:
raise NotImplementedError(f"Wall construction {wall_constuction} not in as built map")
# We check if the age band is in insulated or partial insulated, and if neither, we assume uninsulated
# Variables we want to map
'Org Ref', 'Address 1', 'Address 2', 'Address 3', 'Postcode', 'Type',
'Attachment', 'Construction Years', 'Wall Construction',
'Wall Insulation', 'Roof Construction', 'Roof Insulation',
'Floor Construction', 'Floor Insulation', 'Glazing', 'Heating',
'Boiler Efficiency', 'Main Fuel', 'Controls Adequacy', 'UPRN',
'Total Floor Area (m2)'

View file

@ -75,6 +75,10 @@ df = df.sort_values("property_id", ascending=True)
agg = df.groupby("property_id").size().reset_index(name="n_plans")
agg = agg.sort_values("n_plans", ascending=True)
agg[agg["n_plans"] == 3]
agg[agg["n_plans"] == 2].shape
agg[agg["n_plans"] != 3]
assert all(agg["n_plans"] == 3)
@ -153,4 +157,54 @@ with pd.ExcelWriter(filename) as writer:
sal.iloc[41000:61000, :].to_excel(writer, sheet_name="batch 4", index=False)
sal.iloc[61000:81000, :].to_excel(writer, sheet_name="batch 5", index=False)
sal.iloc[81000:, :].to_excel(writer, sheet_name="batch 5", index=False)
sal.iloc[81000:, :].to_excel(writer, sheet_name="batch 6", index=False)
# TODO - mistake was made when creating the final SAL
b1 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 1"
)
b2 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 2"
)
b3 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 3"
)
b4 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 4"
)
b5 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 5"
)
# Batch 6 should be the remaining
total = pd.concat([b1, b2, b3, b4, b5])
remaining = sal[~sal["epc_os_uprn"].isin(total["epc_os_uprn"].values)]
# Create new output
filename = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/"
"20260107 corrected batch 6 sal.xlsx")
with pd.ExcelWriter(filename) as writer:
sal.to_excel(writer, sheet_name="Standardised Asset List", index=False)
# Top 1000 for testing
b1.to_excel(writer, sheet_name="batch 1", index=False)
# Batch 2 is the next 20,000
b2.to_excel(writer, sheet_name="batch 2", index=False)
# Batch 3 is the next 20,000
b3.to_excel(writer, sheet_name="batch 3", index=False)
b4.to_excel(writer, sheet_name="batch 4", index=False)
b5.to_excel(writer, sheet_name="batch 5", index=False)
remaining.to_excel(writer, sheet_name="batch 6", index=False)
all_together = pd.concat(
[b1, b2, b3, b4, b5, remaining]
)

View file

@ -0,0 +1,21 @@
import pandas as pd
df = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/Parity Data 08012026.xlsx"
)
df['SAP Score'].mean()
df[~pd.isnull(df["Lodged EPC Score"])]["Lodged EPC Score"].mean()
df[~pd.isnull(df["Lodged EPC Score"])]["SAP Score"].mean()
df['Difference'] = abs(df['SAP Score'] - df['Lodged EPC Score'])
df[~pd.isnull(df["Lodged EPC Score"])]["Difference"].mean()
df["Lodged EPC Band"].value_counts(normalize=True)
df["SAP Band"].value_counts(normalize=True)
z = df[df["SAP Band"] != df["Lodged EPC Band"]]
agg = z.groupby(["Lodged EPC Band", "SAP Band"]).size().reset_index(name="count")
zz = z[z["Lodged EPC Band"] == "A"]

View file

@ -0,0 +1,7 @@
import pandas as pd
sustainability_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)

View file

@ -1,5 +1,6 @@
import re
import backend.app.assumptions as assumptions
from etl.customers.immo.pilot.asset_list import already_installed
from recommendations.recommendation_utils import (
check_simulation_difference, override_costs, combine_recommendation_configs
)
@ -320,12 +321,6 @@ class HeatingRecommender:
measures = MEASURE_MAP["heating"] if measures is None else measures
# TODO: We could have a system flush recommendation for an existing boiler, where there is no need to replace
# the boiler, but instead flushing the system will make it run more efficiently. There is a cost for this
# in the Costs class, stored as SYSTEM_FLUSH_COST
# TODO: Right now, we don't have recommendations for electric boilers - we should probably have one
# if we have a non-invasive ashp recommendation, we get the configuration directly from the property instance
non_invasive_ashp_recommendation = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "air_source_heat_pump"),
@ -1115,6 +1110,7 @@ class HeatingRecommender:
"hot-water-energy-eff": heating_simulation_config["hot_water_energy_eff_ending"]
}
# TODO: Probably don't need to use this for HHRSH - simplify
recommendations = self.combine_heating_and_controls(
controls_recommendations=controls_recommender.recommendation,
heating_simulation_config=heating_simulation_config,
@ -1128,6 +1124,12 @@ class HeatingRecommender:
non_intrusive_recommendation=non_intrusive_recommendation,
heating_product=hhrsh_product
)
# Check if HHRSH are already installed
already_installed = "high_heat_retention_storage_heaters" in self.property.already_installed
for rec in recommendations:
rec["already_installed"] = already_installed
if _return:
return recommendations
@ -1347,7 +1349,7 @@ class HeatingRecommender:
n_rooms=self.property.number_of_rooms
)
already_installed = "heating" in self.property.already_installed
already_installed = "boiler_upgrade" in self.property.already_installed
if already_installed:
boiler_costs = override_costs(boiler_costs)
description = "Heating system has already been upgraded, no further action needed."

View file

@ -272,6 +272,36 @@ class Recommendations:
property_recommendations.append(self.solar_recommender.recommendation)
phase += 1
if self.property_instance.already_installed:
# We need to re-shuffle our measures
property_recommendations_removed_installed = []
already_installed_recs = []
for recs in property_recommendations:
phase_recs = []
phase_already_installed_recs = []
for rec in recs:
if rec["already_installed"]:
phase_already_installed_recs.append(rec)
else:
phase_recs.append(rec)
if phase_recs:
property_recommendations_removed_installed.append(phase_recs)
if phase_already_installed_recs:
already_installed_recs.append(phase_already_installed_recs)
# We re-set the phases
for i, recs in enumerate(property_recommendations_removed_installed):
for rec in recs:
rec["phase"] = i
# already installed recs get negative phasing
already_installed_phase = -len(already_installed_recs)
for recs in already_installed_recs:
for rec in recs:
rec["phase"] = already_installed_phase
already_installed_phase += 1
property_recommendations = already_installed_recs + property_recommendations_removed_installed
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)
@ -486,6 +516,11 @@ class Recommendations:
mv_increasing_variables = ["carbon", "heat_demand"]
mv_decreasing_variables = ["sap"]
# We allow for negative phase
starting_phase = min(
rec["phase"] for recs in property_recommendations for rec in recs
)
impact_summary = []
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
@ -526,7 +561,7 @@ class Recommendations:
# We structure this so that depending on the phase, we capture the previous phase impacts and
# then just have one piece of code to calculate the difference
if rec["phase"] == 0:
if rec["phase"] == starting_phase:
# These are just the starting values, from the EPC. When we score the ML models,
# heating_cost_starting and heating_cost_ending are just the values in the EPC. However, with
# heating_cost_ending, we expect that the EPC will predict a heating cost based on what would happen
@ -954,6 +989,33 @@ class Recommendations:
pd.isnull(kwh_impact_table["hotwater_fuel_type"]).sum()):
raise Exception("Fuel type is missing")
# As one final adjustment, if we
# 1) have a boiler upgrade recommendation
# 2) Have an average efficiency boiler, we adjust the COP of the existing boiler down to 75%
heating_upgrades = [x for x in property_recommendations if x[0]["type"] == "heating"]
boiler_upgrade = [r for recs in heating_upgrades for r in recs if r["measure_type"] == "boiler_upgrade"]
existing_heating_efficiency = property_instance.data["mainheat-energy-eff"]
if len(boiler_upgrade) and existing_heating_efficiency in ["Very Poor", "Poor", "Average"]:
efficiency_map = {"Very Poor": 0.6, "Poor": 0.65, "Average": 0.7}
adjusted_cop = efficiency_map[existing_heating_efficiency]
boiler_phase = boiler_upgrade[0]["phase"]
heating_measure_types_to_id = [
{"recommendation_id": r["recommendation_id"], "measure_type": r["measure_type"]}
for r in heating_upgrades[0]
]
kwh_impact_table = kwh_impact_table.merge(
pd.DataFrame(heating_measure_types_to_id), how="left", on="recommendation_id"
)
for col in ["heating_cop", "hotwater_cop"]:
kwh_impact_table[col] = np.where(
(kwh_impact_table["phase"] <= boiler_phase) &
(kwh_impact_table["heating_fuel_type"] == "Natural Gas") &
(kwh_impact_table["measure_type"] != "boiler_upgrade"),
adjusted_cop, kwh_impact_table[col]
)
kwh_impact_table = kwh_impact_table.drop(columns=["measure_type"])
# We now calculate the fuel cost
for k in ["heating", "hotwater"]:
kwh_impact_table[f"{k}_cost"] = kwh_impact_table.apply(

View file

@ -8,6 +8,7 @@ from datatypes.enums import QuantityUnits
from backend.Property import Property
from backend.app.plan.schemas import MEASURE_MAP
from BaseUtility import Definitions
from etl.customers.vander_elliot.non_intrusives import already_installed
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
@ -641,8 +642,18 @@ class WallRecommendations(Definitions):
# we separate the logic for for recommending them, therefore we don't
# consider diminishing returns between the two as they are considered to be separate measures
prop_already_installed = self.property.already_installed
# So, we'll end up with problems if e.g. an external wall insulation is already installed and we try and
# recommend internal wall insulation. To avoid this, we check if either measure is already installed
# and:
# 1) If EWI is installed, we don't recommend IWI
# 2) If IWI is installed, we don't recommend EWI
# We only produce the recommendation for the moment, for the purpose of re-baselining
ewi_recommendations = []
if self.ewi_valid() and "external_wall_insulation" in measures:
if self.ewi_valid() and "external_wall_insulation" in measures and (
"internal_wall_insulation" not in prop_already_installed
):
ewi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(
@ -653,7 +664,7 @@ class WallRecommendations(Definitions):
)
iwi_recommendations = []
if "internal_wall_insulation" in measures:
if "internal_wall_insulation" in measures and "external_wall_insulation" not in prop_already_installed:
iwi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),

View file

@ -72,18 +72,23 @@ class WindowsRecommendations:
elif "secondary_glazing" in measures and "double_glazing" not in measures:
is_secondary_glazing = True
else:
is_secondary_glazing = self.property.restricted_measures or (
self.property.windows["glazing_type"] == "secondary"
# If the property currently has some secondary glazing but isn't in a conservation area
#
is_secondary_glazing = self.property.restricted_measures and (
self.property.data["windows-energy-eff"] in ["Poor", "Very Poor"]
)
windows_area = self.property.windows_area
# We check if the windows are partially insulated but we're recommending double glazing as a complete
# replacement
double_glazing_replacement = (
not is_secondary_glazing and
# As defined in coverage_map in windows attributes
self.property.windows["glazing_coverage"] in ["partial", "most"]
)
if not number_of_windows:
raise ValueError("Number of windows not specified")
if windows_area is not None:
# TODO - we don't have a price for this so we can't recommend it
print("We have windows area, we should use this data for our recommendations!!!")
# We scale the number of windows based on the proportion of existing glazing
if self.property.data["multi-glaze-proportion"] != "":
@ -115,7 +120,10 @@ class WindowsRecommendations:
is_secondary_glazing=is_secondary_glazing,
)
already_installed = "windows_glazing" in self.property.already_installed
measure_type = "double_glazing" if not is_secondary_glazing else "secondary_glazing"
already_installed = measure_type in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
description = "The property already has double glazing installed. No further action is required."
@ -123,7 +131,7 @@ class WindowsRecommendations:
glazing_type = (
"secondary glazing" if is_secondary_glazing else "double glazing"
)
if self.property.windows["glazing_coverage"] in ["partial", "most"]:
if self.property.windows["glazing_coverage"] in ["partial", "most"] and not double_glazing_replacement:
description = f"Install {glazing_type} to the remaining windows"
else:
description = f"Install {glazing_type} to all windows"
@ -200,6 +208,8 @@ class WindowsRecommendations:
else:
glazed_type_ending = "secondary glazing"
new_windows_description = "Multiple glazing throughout"
# Windows only end up with an average efficiency
windows_energy_eff = "Average"
else:
raise ValueError("Invalid glazing type - implement me")
@ -208,7 +218,6 @@ class WindowsRecommendations:
windows_energy_eff = "Very Good"
# For post 2002 windows, the energy efficiency is "Good" and so for the simulation, we simulate with "Good"
windows_ending_config = WindowAttributes(new_windows_description).process()
windows_simulation_config = check_simulation_difference(
@ -230,8 +239,6 @@ class WindowsRecommendations:
"glazed-type": glazed_type_ending,
}
measure_type = "double_glazing" if not is_secondary_glazing else "secondary_glazing"
non_invasive_recommendation = next(
(r for r in self.property.non_invasive_recommendations if r["type"] in ["windows_glazing", measure_type]),
{}

View file

@ -643,7 +643,8 @@ def optimise_with_scenarios(
budget=None,
target_gain=None,
enforce_heat_pump_insulation=True,
enforce_fabric_first=False
enforce_fabric_first=False,
already_installed_sap=0
):
"""
Scenario-based optimiser (funding-agnostic).
@ -754,7 +755,11 @@ def optimise_with_scenarios(
heat_pump_paths = build_heat_pump_paths(remaining_wall_measures, remaining_roof_measures)
paths.extend(heat_pump_paths)
fixed_selections = expand_funding_path(optimisation_measures, paths)
fixed_selections = []
for path in paths:
result = expand_funding_path(input_measures, [path])
if result:
fixed_selections.extend(result)
for fixed in fixed_selections:
@ -825,7 +830,7 @@ def optimise_with_scenarios(
"already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]])
})
solutions_df = append_solution_metrics(solutions, target_gain, p)
solutions_df = append_solution_metrics(solutions, target_gain, p, already_installed_sap)
return solutions_df
@ -835,12 +840,14 @@ def _get_ending_sap_without_battery(x):
return float(sum(gain))
def append_solution_metrics(solutions, target_gain, p):
def append_solution_metrics(solutions, target_gain, p, already_installed_sap=0):
"""
Given a set of solutions, this function will return a dataframe, with cost metrics appended, to allow
the end user to select the optimal solution.
:param solutions:
:param target_gain:
:param p:
:param already_installed_sap:
:return:
"""
@ -852,7 +859,7 @@ def append_solution_metrics(solutions, target_gain, p):
# Given the scheme, we now check if the packages are eligible. If they *are* eligible, but they don't meet the
# final upgrade target, we then look to perform a final optimisation pass to meet the target gain.
solutions_df["meets_upgrade_target"] = solutions_df["total_gain"] >= target_gain - 0.1
solutions_df["meets_upgrade_target"] = solutions_df["total_gain"] >= target_gain
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
# We flag projects that are including batteries
solutions_df["has_battery"] = solutions_df["items"].apply(has_battery)
@ -863,7 +870,7 @@ def append_solution_metrics(solutions, target_gain, p):
# We need the ending SAP, but we'll need to remove the battery SAP uplift first
solutions_df["ending_sap_without_battery"] = solutions_df.apply(
lambda x: int(p.data["current-energy-efficiency"]) + _get_ending_sap_without_battery(x),
lambda x: int(p.data["current-energy-efficiency"]) + already_installed_sap + _get_ending_sap_without_battery(x),
axis=1
)
@ -1015,7 +1022,6 @@ def expand_funding_path(input_measures, path_spec):
cands = iter_and_candidates(input_measures, elem["AND"])
else:
raise ValueError("unknown path element; expected 'OR' or 'AND'")
if not cands:
return []