Merge pull request #556 from Hestia-Homes/eco-eligiblity-bug

Temp clearing additional dependencies from whlg endpoint
This commit is contained in:
KhalimCK 2025-11-16 08:27:21 +00:00 committed by GitHub
commit ce52d06c8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 771 additions and 128 deletions

View file

@ -1,6 +1,8 @@
import time
import random
import pandas as pd
from adhoc.investigation import newest_epc
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from tqdm import tqdm
@ -9,6 +11,132 @@ from utils.logger import setup_logger
logger = setup_logger()
def get_data_for_property(
address1: str,
postcode: str,
full_address: str,
property_type: [str | None],
built_form: [str | None],
uprn: [str | float | None],
epc_auth_token: str,
find_my_epc_return_page: bool
):
"""
Utility function that will fetch the data for a single property
:return:
"""
if property_type == "block of flats":
return None
house_number = str(address1).strip()
full_address = full_address.strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
if pd.isnull(uprn):
uprn = None
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=epc_auth_token,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
# We check if the property was split
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=epc_auth_token,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
# As a final resort, we estimate the EPC
if property_type is not None and searcher.newest_epc is None:
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
return None
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"],
postcode=searcher.newest_epc["postcode"]
)
find_epc_response = find_epc_searcher.retrieve_newest_find_my_epc_data(
return_page=find_my_epc_return_page
)
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_response = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_response = ({}, None) if find_my_epc_return_page else ({})
else:
logger.error(f"Error retrieving FindMyEPC data: {e}")
raise Exception(f"Error retrieving FindMyEPC data: {e}")
else:
find_epc_response = ({}, None) if find_my_epc_return_page else ({})
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
find_my_epc_page = None
if find_my_epc_return_page:
find_my_epc_data, find_my_epc_page = find_epc_response
else:
find_my_epc_data = find_epc_response
return newest_epc, older_epcs, find_my_epc_data, find_my_epc_page
def get_data(
df,
manual_uprn_map,

View file

@ -542,6 +542,8 @@ class Funding:
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code]
if pps.shape[0] != 1:
if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
raise ValueError(f"Invalid IWI category: {measure_code}")
return pps.squeeze()["Cost Savings"]
@ -554,6 +556,8 @@ class Funding:
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code]
if pps.shape[0] != 1:
if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
raise ValueError(f"Invalid EWI category: {measure_code}")
return pps.squeeze()["Cost Savings"]
@ -562,6 +566,8 @@ class Funding:
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code]
if pps.shape[0] != 1:
if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
raise ValueError(f"Invalid CWI category: {measure_code}")
return pps.squeeze()["Cost Savings"]
@ -597,6 +603,8 @@ class Funding:
code = "RIRI_res_unin"
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == code]
if pps.shape[0] != 1:
if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
raise ValueError(f"Invalid RIRI category: {code}")
return pps.squeeze()["Cost Savings"]

View file

@ -1221,11 +1221,13 @@ class Property:
None: "Natural Gas (Community Scheme)",
"mains gas": "Natural Gas (Community Scheme)",
"biomass": "Smokeless Fuel",
"electricity": "Electricity",
"biogas": "Smokeless Fuel",
}
if self.main_fuel["fuel_type"] in fuel_map: # We assume when None as it's unknown
self.heating_energy_source = fuel_map[self.main_fuel["fuel_type"]]
else:
raise Exception("Implement me")
raise NotImplementedError(f"Unhandled fuel {self.main_fuel['fuel_type']}")
if self.hotwater["heater_type"] is not None:
self.hot_water_energy_source = heater_type_to_fuel[self.hotwater["heater_type"]]
@ -1247,7 +1249,7 @@ class Property:
secondary_heating = self.data["secondheat-description"]
self.hot_water_energy_source = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[secondary_heating]["fuel"]
else:
raise Exception("Investiage me")
raise NotImplementedError(f"Investiage me - unhandled hot water fuel {fuel}")
else:
self.hot_water_energy_source = hotwater_appliance_to_fuel[self.hotwater["appliance"]]

View file

@ -80,6 +80,12 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
},
"Electric heat pump for water heating only": {"fuel": "Electricity", "cop": 1},
"Ground source heat pump, warm air, electric": {"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100},
"Room heaters, mains gas, Electric storage heaters": {"fuel": "Natural Gas", "cop": 0.85},
"Water source heat pump, radiators, electric": {"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100},
"Air source heat pump, Systems with radiators, electric": {"fuel": "Electricity",
"cop": AVERAGE_ASHP_EFFICIENCY / 100},
"Ground source heat pump, underfloor, electric": {"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100},
"Electric ceiling heating": {"fuel": "Electricity", "cop": 1},
}
# These are the measure types where if there is a ventilation recommendation, we force the inclusion of it

View file

@ -1,31 +1,30 @@
import boto3
import json
import math
import asyncio
import random
from datetime import datetime
# import boto3
# import json
# import math
# import asyncio
# import random
#
# from datetime import datetime
from fastapi import APIRouter, Depends
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
# from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.config import get_settings
from sqlalchemy.orm import sessionmaker
# from sqlalchemy.orm import sessionmaker
from utils.logger import setup_logger
from backend.app.db.connection import db_engine
from backend.app.db.functions.recommendations_functions import create_scenario
import pandas as pd
# from backend.app.db.connection import db_engine
# from backend.app.db.functions.recommendations_functions import create_scenario
# import pandas as pd
from backend.app.whlg.schema import WHLGElligibilityRequest
from utils.s3 import read_csv_from_s3
from sqlalchemy.dialects.postgresql import insert
from backend.app.db.connection import get_db_session
from backend.app.db.models.whlg import Whlg
from backend.app.db.functions.whlg_functions import upsert_whlg_postcode
# from utils.s3 import read_csv_from_s3
# from sqlalchemy.dialects.postgresql import insert
# from backend.app.db.connection import get_db_session
# from backend.app.db.models.whlg import Whlg
# from backend.app.db.functions.whlg_functions import upsert_whlg_postcode
logger = setup_logger()
if get_settings().ENVIRONMENT == "local":
router = APIRouter(
prefix="/whlg",
@ -40,6 +39,7 @@ else:
responses={404: {"description": "Not found"}}
)
@router.get("/")
async def whlg_entrypoint():
# body needs to include postcode, UPRN [task ID?]
@ -62,17 +62,16 @@ async def whlg_entrypoint():
@router.post("/eligible")
async def eligiable(body: WHLGElligibilityRequest):
postcode = body.postcode or ""
postcode = postcode.lower().replace(" ", "")
whlg_eligible_postcodes = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/whlg eligible postcodes.csv",
)
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
whlg_eligible_postcodes['Postcode'] = whlg_eligible_postcodes['Postcode'].str.replace(' ', '', regex=False)
is_eligible = postcode in whlg_eligible_postcodes['Postcode'].values
return {"whlg_eligible": is_eligible}
# postcode = body.postcode or ""
# postcode = postcode.lower().replace(" ", "")
#
# whlg_eligible_postcodes = read_csv_from_s3(
# bucket_name=get_settings().DATA_BUCKET,
# filepath="funding/whlg eligible postcodes.csv",
# )
# whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
# whlg_eligible_postcodes['Postcode'] = whlg_eligible_postcodes['Postcode'].str.replace(' ', '', regex=False)
#
# is_eligible = postcode in whlg_eligible_postcodes['Postcode'].values
# return {"whlg_eligible": is_eligible}
return None

View file

@ -263,7 +263,8 @@ class AnnualBillSavings:
if fuel == "Electricity":
return (kwh / cop) * cls.ELECTRICITY_PRICE_CAP
if fuel in ["Natural Gas", "Natural Gas (Community Scheme)"]:
# We handle "Unmapped" in a similar fashion to gas
if fuel in ["Natural Gas", "Natural Gas (Community Scheme)", "Unmapped"]:
return (kwh / cop) * cls.GAS_PRICE_CAP
if fuel == "LPG":
@ -285,7 +286,7 @@ class AnnualBillSavings:
# The solar thermal covers a % of the heating kwh, so we need to adjust the cost
return (kwh / cop) * assumptions.SOLAR_CONSUMPTION_PROPORTION * cls.GAS_PRICE_CAP
if fuel == "Electricity + Solar Thermal":
if fuel in ["Electricity + Solar Thermal", 'Unmapped + Solar Thermal']:
# The solar thermal covers a % of the heating kwh, so we need to adjust the cost
return (kwh / cop) * assumptions.SOLAR_CONSUMPTION_PROPORTION * cls.ELECTRICITY_PRICE_CAP

View file

@ -1,36 +1,36 @@
import ast
import json
# import ast
# import json
from copy import deepcopy
from dataclasses import replace
from datetime import datetime
# from dataclasses import replace
# from datetime import datetime
import random
from tqdm import tqdm
import pandas as pd
# import pandas as pd
import numpy as np
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
# from backend.SearchEpc import SearchEpc
# from sqlalchemy.exc import IntegrityError, OperationalError
# from sqlalchemy.orm import sessionmaker
# from starlette.responses import Response
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details
)
from backend.app.db.functions.recommendations_functions import (
create_plan, upload_recommendations, create_scenario
)
from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.models.portfolio import rating_lookup
# from backend.app.config import get_settings, get_prediction_buckets
# from backend.app.db.connection import db_engine
# from backend.app.db.functions.materials_functions import get_materials
# from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
# from backend.app.db.functions.property_functions import (
# create_property, create_property_details_epc, create_property_targets, update_property_data,
# update_or_create_property_spatial_details
# )
# from backend.app.db.functions.recommendations_functions import (
# create_plan, upload_recommendations, create_scenario
# )
# from backend.app.db.functions.funding_functions import upload_funding
# from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
# from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import get_cleaned
from backend.app.utils import sap_to_epc
# from backend.app.plan.utils import get_cleaned
# from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
@ -41,13 +41,13 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
import recommendations.optimiser.optimiser_functions as optimiser_functions
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
from backend.ml_models.Valuation import PropertyValuation
from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
# from utils.logger import setup_logger
# from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
# from backend.ml_models.Valuation import PropertyValuation
#
# from etl.bill_savings.KwhData import KwhData
# from etl.spatial.OpenUprnClient import OpenUprnClient
# from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.Funding import Funding
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
@ -90,8 +90,10 @@ costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[
["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"]
].mean().reset_index()
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample(
10000).reset_index(drop=True)
5000).reset_index(drop=True)
# TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type
# TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used
@ -302,6 +304,11 @@ body = PlanTriggerRequest(
'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None}
)
eco_packages = {}
# For testing
for p in input_properties:
eco_packages[p.id] = (None, None, None)
for p in tqdm(input_properties):
if not recommendations.get(p.id):
continue
@ -327,16 +334,16 @@ for p in tqdm(input_properties):
fixed_gain = optimiser_functions.calculate_fixed_gain(
property_required_measures, recommendations, p, needs_ventilation
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
funding = Funding(
tenure="Social",
tenure=body.housing_type,
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=12.5,
eco4_social_cavity_abs_rate=13,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=12.5,
eco4_private_cavity_abs_rate=13,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
@ -380,7 +387,7 @@ for p in tqdm(input_properties):
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=p.data["current-energy-efficiency"],
starting_sap=int(p.data["current-energy-efficiency"]),
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
@ -391,8 +398,16 @@ for p in tqdm(input_properties):
mainheat_energy_eff=p.data["mainheat-energy-eff"],
)
if r["already_installed"]:
# if already installed, we zero out the uplift and funding
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (
0, 0, 0, 0
)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True,
property_eco_packages=eco_packages.get(p.id)
)
# When the goal is Increasing EPC, we can run the funding optimiser
@ -404,20 +419,14 @@ for p in tqdm(input_properties):
housing_type=body.housing_type,
budget=body.budget,
target_gain=gain,
funding=funding
funding=funding,
work_package=eco_packages[p.id][2]
)
# Given the solutions we select the optimal one
solutions["cost_less_full_project_funding"] = np.where(
solutions["scheme"] == "eco4",
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"],
solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"]
)
solutions["cost_less_full_project_funding"] = (
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"]
)
solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True)
# If the solution isn't eligible, we can't really consider it
solutions = solutions[
(solutions["is_eligible"] & (solutions["scheme"] != "none")) | (solutions["scheme"] == "none")
]
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
@ -428,9 +437,13 @@ for p in tqdm(input_properties):
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
funded_measures = optimal_solution["items"] if scheme != "none" else []
solution = optimal_solution["items"] + optimal_solution["unfunded_items"]
# This is the total amount of funding that the project will produce (including uplifts) (£)
# We create this full list of selected measures, which is used in the next section for setting
# default measures
solution = deepcopy(optimal_solution["items"]) + deepcopy(optimal_solution["unfunded_items"])
funded_measures = deepcopy(optimal_solution["items"]) if scheme != "none" else []
# This is the total amount of funding that the project will produce (EXCLUDING uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
@ -470,8 +483,8 @@ for p in tqdm(input_properties):
funding.check_funding(
measures=solution,
starting_sap=p.data["current-energy-efficiency"],
ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]),
starting_sap=int(p.data["current-energy-efficiency"]),
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
@ -510,10 +523,10 @@ for p in tqdm(input_properties):
# Add best practice measures (ventilation/trickle vents)
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
# Final flattening - Don't do this!
# recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
# p.id, recommendations, selected
# )
# Final flattening
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
p.id, recommendations, selected
)
# TODO: functionise
for measure in funded_measures:
@ -529,3 +542,231 @@ for p in tqdm(input_properties):
partial_project_score=partial_project_score,
uplift_project_score=uplift_project_score
)
# for p in tqdm(input_properties):
# if not recommendations.get(p.id):
# continue
#
# # we need to double unlist because we have a list of lists
# property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
# property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
# measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
#
# # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# # its inclusion
# needs_ventilation = any(
# x in property_measure_types for x in assumptions.measures_needing_ventilation
# ) and not p.has_ventilation
#
# if not measures_to_optimise:
# # Nothing to do, we just reshape the recommendations
# recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
# p.id, recommendations, set()
# )
# continue
#
# fixed_gain = optimiser_functions.calculate_fixed_gain(
# property_required_measures, recommendations, p, needs_ventilation
# )
# gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain)
#
# funding = Funding(
# tenure="Social",
# project_scores_matrix=project_scores_matrix,
# partial_project_scores_matrix=partial_project_scores_matrix,
# whlg_eligible_postcodes=whlg_eligible_postcodes,
# eco4_social_cavity_abs_rate=12.5,
# eco4_social_solid_abs_rate=17,
# eco4_private_cavity_abs_rate=12.5,
# eco4_private_solid_abs_rate=17,
# gbis_social_cavity_abs_rate=21,
# gbis_social_solid_abs_rate=25,
# gbis_private_cavity_abs_rate=21,
# gbis_private_solid_abs_rate=28,
# )
#
# li_thickness = convert_thickness_to_numeric(
# p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
# )
# current_wall_u_value = p.walls["thermal_transmittance"]
# if current_wall_u_value is None:
# current_wall_u_value = get_wall_u_value(
# clean_description=p.walls["clean_description"],
# age_band=p.age_band,
# is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
# is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
# )
#
# # We insert the innovation uplift
# measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
#
# # TODO: Turn this into a function and store the innovaiton uplift
# for group in measures_to_optimise_with_uplift:
# for r in group:
#
# if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
# "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
# (
# r["partial_project_score"],
# r["partial_project_funding"],
# r["innovation_uplift"],
# r["uplift_project_score"],
# ) = (
# 0, 0, 0, 0
# )
# continue
#
# (
# r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
# r["uplift_project_score"]
# ) = funding.get_innovation_uplift(
# measure=r,
# starting_sap=p.data["current-energy-efficiency"],
# floor_area=p.floor_area,
# is_cavity=p.walls["is_cavity_wall"],
# current_wall_uvalue=current_wall_u_value,
# is_partial="partial" in p.walls["clean_description"].lower(),
# existing_li_thickness=li_thickness,
# mainheating=p.main_heating,
# main_fuel=p.main_fuel,
# mainheat_energy_eff=p.data["mainheat-energy-eff"],
# )
#
# input_measures = optimiser_functions.prepare_input_measures(
# measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True
# )
#
# # When the goal is Increasing EPC, we can run the funding optimiser
# if body.goal == "Increasing EPC":
#
# solutions = optimise_with_funding_paths(
# p=p,
# input_measures=input_measures,
# housing_type=body.housing_type,
# budget=body.budget,
# target_gain=gain,
# funding=funding
# )
#
# # Given the solutions we select the optimal one
# solutions["cost_less_full_project_funding"] = np.where(
# solutions["scheme"] == "eco4",
# solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"],
# solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"]
# )
#
# solutions["cost_less_full_project_funding"] = (
# solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"]
# )
# solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True)
#
# if solutions["meets_upgrade_target"].any():
# # If we have a solution that meets the upgrade target, we select that one
# optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
# else:
# # Pick the cheapest
# optimal_solution = solutions.iloc[0]
#
# # This is the list of measures that we will recommend
# scheme = optimal_solution["scheme"]
# funded_measures = optimal_solution["items"] if scheme != "none" else []
# solution = optimal_solution["items"] + optimal_solution["unfunded_items"]
# # This is the total amount of funding that the project will produce (including uplifts) (£)
# project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
# optimal_solution["partial_project_funding"]
# # This is the total amount of funding associated to the uplift (£)
# total_uplift = optimal_solution["total_uplift"]
# # This is the funding scheme selected
# # This is the full project ABS
# full_project_score = optimal_solution["project_score"]
# # This is the partial project ABS
# partial_project_score = optimal_solution["partial_project_score"]
# # This is the uplift score ABS
# uplift_project_score = optimal_solution["total_uplift_score"]
# else:
# # We optimise and then we determine eligibility for funding, based on the measures selected
# optimiser = (
# GainOptimiser(
# input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False
# ) if body.budget else CostOptimiser(input_measures, min_gain=gain)
# )
# optimiser.setup()
# optimiser.solve()
# solution = optimiser.solution
#
# recommendation_types = []
# for measures in input_measures:
# for measure in measures:
# recommendation_types.append(measure["type"])
# recommendation_types = set(recommendation_types)
#
# has_wall_insulation_recommendation = any(
# (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
# WALL_INSULATION_MEASURES
# )
# has_roof_insulation_recommendation = any(
# (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
# ROOF_INSULATION_MEASURES
# )
#
# funding.check_funding(
# measures=solution,
# starting_sap=p.data["current-energy-efficiency"],
# ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]),
# floor_area=p.floor_area,
# mainheat_description=p.main_heating["clean_description"],
# heating_control_description=p.main_heating_controls["clean_description"],
# is_cavity=p.walls["is_cavity_wall"],
# current_wall_uvalue=current_wall_u_value,
# is_partial="partial" in p.walls["clean_description"].lower(),
# existing_li_thickness=li_thickness,
# mainheating=p.main_heating,
# main_fuel=p.main_fuel,
# mainheat_energy_eff=p.data["mainheat-energy-eff"],
# has_wall_insulation_recommendation=has_wall_insulation_recommendation,
# has_roof_insulation_recommendation=has_roof_insulation_recommendation,
# )
#
# # Determine the scheme
# scheme = "none"
# if funding.eco4_eligible:
# scheme = "eco4"
# if scheme == "none" and funding.gbis_eligible:
# scheme = "gbis"
#
# funded_measures = solution if scheme in ["gbis", "eco4"] else []
# project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
# total_uplift = funding.eco4_uplift
# full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
# partial_project_score = funding.partial_project_abs
# uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
#
# selected = {r["id"] for r in solution}
#
# if property_required_measures:
# solution = optimiser_functions.add_required_measures(
# property_id=p.id, property_required_measures=property_required_measures,
# recommendations=recommendations, selected=selected,
# )
#
# # Add best practice measures (ventilation/trickle vents)
# selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
# # Final flattening - Don't do this!
# # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
# # p.id, recommendations, selected
# # )
#
# # TODO: functionise
# for measure in funded_measures:
# if "+mechanical_ventilation" in measure["type"]:
# measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
#
# p.insert_funding(
# scheme=scheme,
# funded_measures=funded_measures,
# project_funding=project_funding,
# total_uplift=total_uplift,
# full_project_score=full_project_score,
# partial_project_score=partial_project_score,
# uplift_project_score=uplift_project_score
# )

View file

@ -0,0 +1,145 @@
"""
This scipt prepares the raw data that was sent over by Peabody for production of
a standardised asset list
They have sent over just short of 100,000 properties and so, to make this easier, we will do the following
1) Break the data up into subsets of 25,000
2) Combine the data provided into a single list
"""
import json
import time
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from asset_list.utils import get_data_for_property
from utils.logger import setup_logger
logger = setup_logger()
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
property_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Properties"
)
sustainability_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)
# Basic overview:
# 1) We have 10,634 postcodes. If we needed to make requests to the ordnance survey API for
# all of these postcodes, it would cost at least £106, not accounting for double requests for postcodes
# where we have more than 100 properties (WE DONT!)
# 2) This is on average 9.36 properties per postcode
# 3) The UPRN in the property_list matches to the Org Ref in the sustainability data. These
# is an additional UPRN column in sustainability data which appears to be the ordnance survey UPRN
# 4) There appears to be some anomalous records, e.g. a flat with 543 m2 floor area and another flat
# with 6m2 floor area
# 5) Based on the residential indicator, all properties appear to be resi
# 6) We should do some quick calcs on how much it might cost to fetch all of the solar API data
# 7) We have 8785 missing UPRNS, which we should potentially try and fill
# 8) In the backend, we should probably start storing the raw EPC input data to allow for much quicker
# re-runs. All we really need to do is store the find my EPC data, perhaps against UPRN and RRN, as well
# as the raw EPC data, against uprn. This will be useful for scenario re-builds and will be much much
# quicker, as a starting point. Do we store in the database vs s3? TBC
n_postcodes = property_list["Post Code"].nunique()
postcode_summary = property_list.groupby("Post Code")["UPRN"].count().reset_index()
postcode_summary["UPRN"].mean()
test_match = property_list.merge(sustainability_data, left_on="UPRN", right_on="Org Ref")
def classify_floor_area(x):
if x <= 72:
return "0-72"
if x <= 97:
return "73-97"
if x <= 199:
return "98-199"
return "200+"
sustainability_data["Postal Region"] = sustainability_data["Postcode"].str.split(" ").str[0]
sustainability_data["Floor Area Band"] = sustainability_data["Total Floor Area (m2)"].apply(
lambda x: classify_floor_area(x)
)
archetypes = sustainability_data[
["Type", "Attachment", "Construction Years", "Wall Construction", "Wall Insulation",
"Roof Construction", "Roof Insulation", "Floor Construction", "Floor Insulation",
"Glazing", "Heating", "Boiler Efficiency", "Main Fuel", "Controls Adequacy",
"Floor Area Band"]
].drop_duplicates()
# Maps the property types to the format recognised by the EPC api
property_type_map = {}
# Maps the build form to the format recognised by the OS api
built_form_map = {}
# Proposed data fetching
# 1) grab propeties with UPRN and fetch the assocated EPC data & find my EPC data
# Some thoughts:
# S3 is quite cheap to query however we may incur some cost if we're making hundreds of thousands of calls
# to S3 to fetch data out of it. It's cheap to fetch data, if we aren't taking data out of S3, but we
# should consider this. This may influence whether or not we want to store each record individually
# against UPRN, or store against the 10,641 postcodes. We can fetch the data and store in a single
# large dump and then determine later if we want to split it up
# TODO: Handle properties without uprn
# TODO: I think we can json dump all of this, but check if we can load and re-use the page source
# TODO: Create batches?
batch_size = 500
batch_indexes = list(range(0, len(sustainability_data), batch_size))
# TODO: SET
working_directory = ""
download_contents = os.listdir(working_directory)
for i in range(0, len(sustainability_data.standardised_asset_list), batch_size):
batch_name = f"batch_{i}_to_{i + batch_size}"
# TODO: Check this
if batch_name in download_contents:
# Means we already have the data downloaded
continue
batch_data = {}
for _, property_data in tqdm(sustainability_data.iterrows(), total=len(sustainability_data)):
os_uprn = property_data["UPRN"]
address1 = property_data["Address 1"]
postcode = property_data["Postcode"]
full_address_components = [
x for x in [property_data["Address 1"], property_data["Address 2"], property_data["Address 3"]]
if not pd.isnull(x)
]
full_address = ", ".join(full_address_components)
fetched_data = get_data_for_property(
address1=address1,
postcode=postcode,
full_address=full_address,
property_type=property_type_map[property_data["Type"]],
built_form=built_form_map[property_data["Attachment"]],
uprn=property_data["UPRN"],
epc_auth_token=EPC_AUTH_TOKEN,
find_my_epc_return_page=True
)
batch_data[property_data["Org Ref"]] = fetched_data
# TODO: We likely want to do something like this: to slow down
# TODO: We also perhaps store the data in batches
if len(batch_data) % 50 == 0 and len(batch_data) > 0:
logger.info("Sleeping for 10 seconds to avoid hitting API rate limit")
time.sleep(10)
# Store the batch data in the wd
with open(os.path.join(working_directory, batch_name), "wb") as f:
json.dump(batch_data, f)

View file

@ -371,7 +371,7 @@ class RetrieveFindMyEpc:
return all_find_my_epc_data
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None):
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
@ -577,6 +577,10 @@ class RetrieveFindMyEpc:
**low_carbon_energy_sources,
}
if return_page:
# We return the page text as well, which can be parsed again later
return resulting_data, postcode_response.text
return resulting_data
def format_recommendations(self, recommendations, assessment_data, sap_2012_date=None):

View file

@ -751,7 +751,9 @@ class Costs:
# Adjust total radiator needs based on built form
form_factor = {
'Enclosed Mid-Terrace': 0.9,
'Mid-Terrace': 0.95,
'Enclosed End-Terrace': 0.95,
'Semi-Detached': 1.05,
'Detached': 1.25,
'End-Terrace': 1.05

View file

@ -10,6 +10,9 @@ from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes
from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes
from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes
from recommendations.HeatingControlRecommender import HeatingControlRecommender
from utils.logger import setup_logger
logger = setup_logger()
class HeatingRecommender:
@ -44,6 +47,22 @@ class HeatingRecommender:
]
}
},
"Boiler and radiators, mains gas, electric underfloor heating": {
"boiler": {
"mainheating_description": "Boiler and radiators, mains gas, electric underfloor heating",
"recommendation_description": "Upgrade the existing boiler to a new, more efficient condensing "
"boiler. ",
"controls_suffix": "Manual charge controls"
},
# These are the heating types we need to produce a dual heating recommendation
"dual": {
"recommendation_description": "Upgrade the existing boiler to a new condensing boiler",
"types": [
# type 1
"boiler_upgrade",
]
}
},
"Portable electric heaters assumed for most rooms, room heaters, electric": {
"hhr": {
"mainheating_description": "Electric storage heaters, radiators",
@ -127,7 +146,7 @@ class HeatingRecommender:
n_trues += 1
if n_trues > 2 or n_trues == 0:
raise Exception("Implement me")
raise NotImplementedError("Implement me, zero or more than two heating systemss")
if n_trues == 1:
return False
@ -917,9 +936,11 @@ class HeatingRecommender:
if self.property.main_heating_controls["clean_description"] != self.high_heat_retention_contols_desc:
if self.dual_heating:
controls_prefix = self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
]["hhr"]["controls_prefix"]
controls_prefix = self._map_dual_heating_description(
backup_map_to_description="current_controls",
output_type="controls_prefix",
recommendation_type="hhr"
)
if controls_prefix == "current_controls":
description_prefix = self.property.main_heating_controls["clean_description"]
@ -951,9 +972,11 @@ class HeatingRecommender:
# We check if the property has dual heating in place with a boiler and storage heaters
if self.dual_heating:
new_heating_description = self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
]["hhr"]["mainheating_description"]
new_heating_description = self._map_dual_heating_description(
backup_map_to_description="Electric storage heaters",
output_type="mainheating_description",
recommendation_type="hhr"
)
new_hot_water_description = self.property.hotwater["clean_description"] # We keep the hot water system
else:
new_heating_description = "Electric storage heaters"
@ -1010,10 +1033,12 @@ class HeatingRecommender:
product=hhrsh_product
)
if self.dual_heating:
description = self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
]["hhr"]["recommendation_description"]
description = self._map_dual_heating_description(
backup_map_to_description="Install high heat retention electric storage heaters with an appropriate "
"off-peak tariff.",
output_type="recommendation_description",
recommendation_type="hhr"
)
else:
description = "Install high heat retention electric storage heaters with an appropriate off-peak tariff."
@ -1102,6 +1127,61 @@ class HeatingRecommender:
return max(num_heated_rooms * 1.5, 6)
def _map_dual_heating_description(
self, backup_map_to_description, output_type, recommendation_type
):
"""
Utility function to handle dual heating systems
:param backup_map_to_description:
:return:
"""
if backup_map_to_description not in [
# Recommendation descriptions - these are the textual descriptions shown in the front end
"Upgrade to a new condensing boiler.",
"Install high heat retention electric storage heaters with an appropriate off-peak tariff.",
# Simulation descriptions - this is the new EPC description we simulate with in the case
# of single heating
"Boiler and radiators, mains gas",
"Electric storage heaters",
# Suffixes allowed
"",
# Controls prefixes
"current_controls"
]:
raise ValueError(f"Invalid backup_map_to_description, given {backup_map_to_description}")
if output_type not in [
"recommendation_description",
"mainheating_description",
"controls_suffix",
"controls_prefix",
]:
raise ValueError(f"Invalid output_type, given {output_type}")
if recommendation_type not in [
"boiler",
"hhr",
]:
raise ValueError(f"Given invalid recommendation type {recommendation_type}")
# "Upgrade to a new condensing boiler."
if self.dual_heating:
# We check if we have a mapped description
if self.property.main_heating["clean_description"] not in self.DUAL_HEATING_DESCRIPTIONS:
logger.warning(
f"We have a dual heating system that hasn't been mapped, defaulting to single "
f"{self.property.main_heating['clean_description']}"
)
return backup_map_to_description
return self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
][recommendation_type][output_type]
return backup_map_to_description
def recommend_boiler_upgrades(self, phase, system_change, exising_room_heaters):
"""
This boiler recommendation will only recommend a like-for-like upgrade, since changing the system
@ -1137,12 +1217,11 @@ class HeatingRecommender:
if has_inefficient_space_heating or has_inefficient_water:
if self.dual_heating:
description = self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
]["boiler"]["recommendation_description"]
else:
description = "Upgrade to a new condensing boiler."
description = self._map_dual_heating_description(
backup_map_to_description="Upgrade to a new condensing boiler.",
output_type="recommendation_description",
recommendation_type="boiler"
)
new_heating_eff = (
"Good" if self.property.data["mainheat-energy-eff"] in ["Very Poor", "Poor", "Average"]
@ -1167,13 +1246,12 @@ class HeatingRecommender:
if system_change:
# Installation of a boiler improves the hot water system so we need to reflect this in
# the outcome of the recommendation
if self.dual_heating:
new_heating_description = self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
]["boiler"]["mainheating_description"]
else:
new_heating_description = "Boiler and radiators, mains gas"
new_heating_description = self._map_dual_heating_description(
backup_map_to_description="Boiler and radiators, mains gas",
output_type="mainheating_description",
recommendation_type="boiler"
)
new_hotwater_description = "From main system"
new_fuel_description = "mains gas (not community)"
@ -1239,9 +1317,11 @@ class HeatingRecommender:
# If the property did not previously have a boiler, we combine
controls_recommender = HeatingControlRecommender(self.property)
if self.dual_heating:
description_suffix = self.DUAL_HEATING_DESCRIPTIONS[
self.property.main_heating["clean_description"]
]["boiler"]["controls_suffix"]
description_suffix = self._map_dual_heating_description(
backup_map_to_description="",
output_type="controls_suffix",
recommendation_type="boiler"
)
else:
description_suffix = ""
controls_recommender.recommend(

View file

@ -681,7 +681,9 @@ class Recommendations:
):
# Handle the case of community schemes
if (heating_description == "Community scheme") or (hotwater_description == "Community scheme"):
if (heating_description == "Community scheme") or (hotwater_description == "Community scheme") and (
"not community" not in main_fuel_description
):
if main_fuel_description in ["mains gas (community)", "UNKNOWN"]:
return {
"heating_fuel_type": "Natural Gas (Community Scheme)",
@ -689,9 +691,27 @@ class Recommendations:
"heating_cop": 1,
"hotwater_cop": 1
}
raise NotImplementedError("Handle this case")
if main_fuel_description in ['biogas (community)']:
return {
"heating_fuel_type": "Smokeless Fuel",
"hotwater_fuel_type": "Smokeless Fuel",
"heating_cop": 0.85,
"hotwater_cop": 0.85
}
logger.warning(
"Unhandled community fuel."
f"Fuel: {main_fuel_description}"
f"Heating: {heating_description}"
f"Heating: {hotwater_description}"
)
return {
"heating_fuel_type": "Unmapped",
"hotwater_fuel_type": "Unmapped",
"heating_cop": 0.9,
"hotwater_cop": 0.9
}
mapped = descriptions_to_fuel_types.get(heating_description, None)
mapped = descriptions_to_fuel_types.get(heating_description.strip(), None)
if mapped is None:
# TODO: This is a non-ideal placeholder but we put something in place for a process that falls over
# fairly regularly. A task has been added to planner to refactor this
@ -702,7 +722,7 @@ class Recommendations:
if hotwater_description in [
"From main system", "From main system, no cylinder thermostat",
'From main system, waste water heat recovery'
'From main system, waste water heat recovery',
]:
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": heating_fuel,
@ -718,7 +738,14 @@ class Recommendations:
"heating_cop": mapped["cop"], "hotwater_cop": 1
}
mapped_hotwater = descriptions_to_fuel_types[hotwater_description]
mapped_hotwater = descriptions_to_fuel_types.get(hotwater_description.strip())
if mapped_hotwater is None:
# TODO: This is a non-ideal placeholder but we put something in place for a process that falls over
# fairly regularly. A task has been added to planner to refactor this
# We have observed an edge case where the fuel is described as not being community
# but the hot water is. We handle as such
logger.warning("Hot water description not mapped: %s", heating_description)
mapped_hotwater = {"fuel": 'Unmapped', "cop": 0.9}
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": mapped_hotwater["fuel"],