Merge pull request #606 from Hestia-Homes/eco-eligiblity-bug

Debugging windows edge case and handling battery SAP estimate recommendations
This commit is contained in:
KhalimCK 2025-12-10 23:40:52 +08:00 committed by GitHub
commit 2a391ec5e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 226 additions and 29 deletions

View file

@ -301,9 +301,18 @@ class Property:
if k in fixed_data_col_names
}
difference_record = self.epc_record.create_EPCDifferenceRecord(
self.epc_record, fixed_data
)
difference_record = self.epc_record.create_EPCDifferenceRecord(self.epc_record, fixed_data)
# We have rare cases where entire description columns are missing. EpcRecords will convert this to None.
# Due to the sensitivity of the EPCDifferenceRecord creation to missing data, we will fill in these missing
# descriptions with and empty string, for the purpose of creating this scoring record
description_cols = [
x for x in difference_record.difference_record if
"_description" in x and difference_record.difference_record[x] is None
]
if description_cols:
for col in description_cols:
difference_record.difference_record[col] = ""
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
@ -1228,6 +1237,7 @@ class Property:
"biomass": "Smokeless Fuel",
"electricity": "Electricity",
"biogas": "Smokeless Fuel",
"heat network": "Natural Gas (Community Scheme)",
}
self.heating_energy_source = list({

View file

@ -0,0 +1,30 @@
import numpy as np
class BatterySAPScorer:
"""
Lightweight production scorer no sklearn dependency.
Uses hard-coded coefficients discovered offline. The code for discovering the coefficients
can be found in etl/battery_model/train.py
We're only concerned with SAP, as we already have a method for carbon and bill savings.
"""
INTERCEPT = 10.310168559226678
COEF_STARTING_SAP = -0.16120648633993315
COEF_PV_SIZE = 1.0500492005420736
@classmethod
def score(cls, starting_sap, pv_size):
"""
heating_system: string used to infer is_electric
"""
sap_uplift = (
cls.INTERCEPT
+ cls.COEF_STARTING_SAP * starting_sap
+ cls.COEF_PV_SIZE * pv_size
)
# Round + clamp to [0,5]
sap_uplift = int(np.round(np.clip(sap_uplift, 0, 5)))
return sap_uplift

View file

@ -15,7 +15,7 @@ from etl.epc.Record import EPCRecord
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.app.BatterySapScorer import BatterySAPScorer
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
@ -405,6 +405,29 @@ def check_duplicate_uprns(plan_input):
return True
def check_duplicate_property_ids(input_properties):
"""
Simple function to check if the input data contains duplicated property IDs. This will happen in very rare
cases where we have properties across different servers, where the input UPRN is possibly incorrect and we
find the right property via an address search, instead of a UPRN search and so we end up with the same property
twice.
:param input_properties:
:return:
"""
input_property_ids = [x.id for x in input_properties]
if input_property_ids:
# Check for dupes
if len(input_property_ids) != len(set(input_property_ids)):
# Find the duplicate property IDs
duplicates = set([x for x in input_property_ids if input_property_ids.count(x) > 1])
# de-dupe input_uprns
raise ValueError(f"Duplicate property IDs in the input data: {duplicates}")
return True
def averages_cleaning(prepared_epc: EPCRecord, cleaning_data: pd.DataFrame):
"""
Placeholder cleaning function to handle edge cases where we have missing data for
@ -780,6 +803,8 @@ async def model_engine(body: PlanTriggerRequest):
if not input_properties:
return Response(status_code=204)
check_duplicate_property_ids(input_properties)
# We check if we have inspections data and store it in the database if so. We'll update or create
# aginst each property if
if inspections_map:
@ -1075,11 +1100,10 @@ async def model_engine(body: PlanTriggerRequest):
scheme = "none"
funded_measures, solution = [], []
(
project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score
) = 0, 0, 0, 0, 0
project_funding, total_uplift, full_project_score, partial_project_score, uplift_project_score,
battery_sap_score
) = 0, 0, 0, 0, 0, 0
else:
# If the solution isn't eligible, we can't really consider it
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
@ -1111,6 +1135,8 @@ async def model_engine(body: PlanTriggerRequest):
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
# This is the SAP score associated to a battery
battery_sap_score = optimal_solution["battery_sap_uplift"]
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
optimiser = (
@ -1121,6 +1147,8 @@ async def model_engine(body: PlanTriggerRequest):
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
gain = optimiser.solution_gain
post_sap = int(p.data["current-energy-efficiency"]) + gain
recommendation_types = []
for measures in input_measures:
@ -1168,6 +1196,10 @@ async def model_engine(body: PlanTriggerRequest):
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
pv_size = next(
(m["array_size"] for m in solution if m["type"] == "solar_pv"), 0
)
battery_sap_score = BatterySAPScorer.score(starting_sap=post_sap, pv_size=pv_size)
selected = {r["id"] for r in solution}
@ -1181,7 +1213,7 @@ async def model_engine(body: PlanTriggerRequest):
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
# Final flattening
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
p.id, recommendations, selected
p.id, recommendations, selected, battery_sap_score
)
# TODO: functionise

View file

@ -0,0 +1,62 @@
import pandas as pd
from sklearn.linear_model import Ridge
class SAPUpliftTrainer:
"""
Offline training class discovers SAP uplift model coefficients.
"""
def __init__(self, alpha=1.0):
self.alpha = alpha
self.model = Ridge(alpha=self.alpha)
self.feature_names = ["starting SAP", "PV Array size"]
def prepare_data(self, df):
df = df.copy()
# df["is_electric"] = df["heating"].str.contains(
# "Electric", case=False, na=False
# ).astype(int)
X = df[self.feature_names]
y = df["SAP points"]
return X, y
def fit(self, df):
X, y = self.prepare_data(df)
self.model.fit(X, y)
def coefficients(self):
return {
"intercept": float(self.model.intercept_),
**{
name: float(coef)
for name, coef in zip(self.feature_names, self.model.coef_)
}
}
def export_runtime_config(self):
"""
Returns a dict suitable for copy-pasting into the runtime scoring class.
"""
coefs = self.coefficients()
return {
"intercept": coefs["intercept"],
"coef_starting_sap": coefs["starting SAP"],
"coef_pv_size": coefs["PV Array size"],
# "coef_is_electric": coefs["is_electric"],
}
# The training data can be found in the Domna sharepoint in Product Development > Solar Battery Recommendations
df = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/SAP Movement data(Sheet1).csv")
trainer = SAPUpliftTrainer(alpha=1.0)
trainer.fit(df)
print(trainer.coefficients())
print(trainer.export_runtime_config())
# Last updated: 9th December 2025
# Coefficients:
# {'intercept': 10.310168559226678, 'starting SAP': -0.16120648633993315, 'PV Array size': 1.0500492005420736}
# The code for scoring with this model can be found in backend/app/BatterySapScorer.py

View file

@ -388,7 +388,7 @@ class EPCDataProcessor:
has_missings = pd.isnull(self.data[col]).sum()
while has_missings:
self.data = apply_clean(
data=self.data, matching_columns=matching_columns[0 : to_index + 1]
data=self.data, matching_columns=matching_columns[0: to_index + 1]
)
has_missings = pd.isnull(self.data[col]).sum()
@ -705,7 +705,7 @@ class EPCDataProcessor:
[
violation_uprn_missing,
violation_old_lodgment_date,
violation_invalid_transaction_type,
# violation_invalid_transaction_type,
violation_ignored_floor_level,
violation_rdsap_score_above_max,
violation_missing_windows_description,

View file

@ -840,7 +840,9 @@ class TrainingDataset(BaseDataset):
if len(missings) == 0:
return
# Make sure they are all efficiency columns
#
# Make sure they are all efficiency columns
if any(~missings.index.str.contains("energy_eff")):
raise ValueError("Non efficiency columns are missing")

View file

@ -52,6 +52,10 @@ class WindowsRecommendations:
# We don't make any recommendations in this case. The property already has outstanding glazing
return
# We handle the rare case of not having any windows data
if self.property.windows["clean_description"] is None:
return
if self.property.windows["has_glazing"] & (
self.property.windows["glazing_coverage"] == "full"
):
@ -190,7 +194,7 @@ class WindowsRecommendations:
raise ValueError("Invalid glazing type - implement me")
if self.property.data["windows-energy-eff"] == "Very Good":
raise ValueError("Very Good energy efficiency is not supported")
windows_energy_eff = "Very Good"
# For post 2002 windows, the energy efficiency is "Good" and so for the simulation, we simulate with "Good"

View file

@ -18,6 +18,7 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from utils.logger import setup_logger
from backend.Funding import Funding
from backend.app.BatterySapScorer import BatterySAPScorer
logger = setup_logger()
@ -239,6 +240,10 @@ def _move_hhrsh_to_unfunded(picked, unfunded_picked, needs_pre_eco_hhrsh_upgrade
return picked, unfunded_picked
def has_battery(items):
return any(x.get("has_battery", False) for x in items)
def optimise_with_funding_paths(
p, input_measures, housing_type, funding: Funding, budget=None, target_gain=None, work_package=None
):
@ -519,6 +524,23 @@ def optimise_with_funding_paths(
solutions["starting_sap"] = int(p.data["current-energy-efficiency"])
solutions["floor_area"] = p.floor_area
solutions["ending_sap"] = solutions["starting_sap"] + solutions["total_gain"]
# We flag projects that are including batteries
solutions["has_battery"] = solutions["items"].apply(has_battery)
solutions["array_size"] = solutions["items"].apply(
lambda x: sum(float(y["array_size"]) for y in x if "array_size" in y)
)
# For properties that are including batteries, we need to adjust the starting SAP to include the battery SAP uplift
# Note: We score on ending sap, as the battery SAP uplift is based on the ending SAP after fabric/heat/solar
# upgrades of each package is applied
solutions["battery_sap_uplift"] = solutions.apply(
lambda x: BatterySAPScorer.score(starting_sap=x["ending_sap"], pv_size=x["array_size"])
if x["has_battery"] else 0,
axis=1
)
# We add this on to ending SAP
solutions["ending_sap"] = solutions["ending_sap"] + solutions["battery_sap_uplift"]
solutions["starting_band"] = (solutions["starting_sap"] + solutions["already_installed_gain"]).apply(
funding.get_sap_band
)

View file

@ -75,8 +75,8 @@ def prepare_input_measures(
continue
# Filter out solar PV with batteries
if recs[0]["type"] == "solar_pv":
recs = [r for r in recs if ~r["has_battery"]]
# if recs[0]["type"] == "solar_pv":
# recs = [r for r in recs if ~r["has_battery"]]
# Only include measures with non-negative cost savings
if eco_measures:
@ -123,6 +123,14 @@ def prepare_input_measures(
else rec["measure_type"]
)
array_size = 0
if rec["measure_type"] == "solar_pv":
# Grab the parts
solar_part = next(
(part for part in rec["parts"] if part["type"] == "solar_pv"),
)
array_size = solar_part["size"]
# We also include the innovation uplift
to_append.append(
{
@ -136,6 +144,8 @@ def prepare_input_measures(
"partial_project_score": rec["partial_project_score"],
"uplift_project_score": rec["uplift_project_score"],
"already_installed": rec.get("already_installed", False),
"has_battery": rec.get("has_battery", False),
"array_size": array_size,
}
)
@ -331,7 +341,7 @@ def add_best_practice_measures(property_id, solution, recommendations, selected)
return selected
def flatten_recommendations_with_defaults(property_id, recommendations, selected):
def flatten_recommendations_with_defaults(property_id, recommendations, selected, battery_sap_score=0):
"""
Flattens nested recommendation lists for a property and marks which
recommendations were selected.
@ -349,6 +359,8 @@ def flatten_recommendations_with_defaults(property_id, recommendations, selected
Each value is a list of lists (grouped by measure type).
selected : set
Set of selected recommendation IDs.
battery_sap_score: int, optional
SAP score uplift from battery storage, if applicable.
Returns
-------
@ -356,13 +368,14 @@ def flatten_recommendations_with_defaults(property_id, recommendations, selected
A flattened list of recommendation dicts for the given property,
each with an added `default` field.
"""
final_recommendations = [
[
{**rec, "default": rec["recommendation_id"] in selected}
for rec in recommendations_by_type
]
for recommendations_by_type in recommendations[property_id]
]
final_recommendations = []
for recommendations_by_type in recommendations[property_id]:
for rec in recommendations_by_type:
rec_copy = {**rec, "default": rec["recommendation_id"] in selected}
if rec_copy.get("has_battery", False):
rec_copy["sap_points"] += battery_sap_score
final_recommendations.append(rec_copy)
# Flatten the nested list of lists into a single list
return [rec for recommendations_by_type in final_recommendations for rec in recommendations_by_type]

View file

@ -11,8 +11,8 @@ from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcMod
# PORTFOLIO_ID = 206
# SCENARIOS = [389]
PORTFOLIO_ID = 221
SCENARIOS = [427]
PORTFOLIO_ID = 388
SCENARIOS = [803]
def get_data(portfolio_id, scenario_ids):
@ -95,6 +95,18 @@ post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = post_install_sap.groupby("property_id")[["sap_points"]].sum().reset_index()
# Find dupes by property id and measure type
dupes = recommended_measures_df.duplicated(
subset=["property_id", "measure_type"], keep=False
)
dupe_df = recommended_measures_df[dupes]
if dupe_df.shape:
# Drop dupes - happened due to a funny bug
recommended_measures_df = recommended_measures_df.drop_duplicates(
subset=["property_id", "measure_type"], keep='first'
)
recommendations_measures_pivot = recommended_measures_df.pivot(
index='property_id',
columns='measure_type',
@ -131,10 +143,19 @@ from utils.s3 import read_csv_from_s3, read_excel_from_s3
# asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv')
asset_list = read_excel_from_s3(
bucket_name="retrofit-plan-inputs-dev", file_key='8/221/20250722T202328736Z/asset_list.xlsx',
header_row=0, sheet_name="320 - edited"
bucket_name="retrofit-plan-inputs-dev", file_key='2/388/20251208T203603925Z/asset_list.xlsx',
header_row=0, sheet_name="Standardised Asset List"
)
asset_list = pd.DataFrame(asset_list)
asset_list = asset_list.rename(
columns={
"postcode": "domna_postcode"
}
)
if "domna_full_address":
# For Peabody
asset_list["domna_full_address"] = asset_list["domna_address_1"]
asset_list = asset_list[["domna_full_address", "domna_postcode", "epc_os_uprn", ]].copy()
asset_list = asset_list.rename(columns={"epc_os_uprn": "uprn"})
df["uprn"] = df["uprn"].astype(str)
@ -179,9 +200,10 @@ asset_list = asset_list.merge(
on="uprn"
)
# For exporting NCHA
# For exporting
asset_list.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/NCHA/320 Portfolio/asset_list_epc_b.xlsx",
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/20251209_sample_package_data.xlsx",
index=False
)