handling already installed ashp and removing loft only eco4 packaged

This commit is contained in:
Khalim Conn-Kowlessar 2025-11-03 22:18:17 +00:00
parent 42e447e416
commit 8a8389a4bb
8 changed files with 77 additions and 17 deletions

View file

@ -107,7 +107,7 @@ class Property:
# of the non-invasive surveys. We reflect that this has been installed in the recommendations, but remove the # of the non-invasive surveys. We reflect that this has been installed in the recommendations, but remove the
# cost and instead, provide a message that the measure has already been installed # cost and instead, provide a message that the measure has already been installed
self.already_installed = ast.literal_eval(already_installed['already_installed']) if already_installed else [] self.already_installed = already_installed
self.non_invasive_recommendations = ( self.non_invasive_recommendations = (
non_invasive_recommendations['recommendations'] if non_invasive_recommendations['recommendations'] if
non_invasive_recommendations else [] non_invasive_recommendations else []

View file

@ -8,6 +8,7 @@ from backend.app.db.models.portfolio import (
PropertyModel, PropertyTargetsModel, PropertyDetailsEpcModel PropertyModel, PropertyTargetsModel, PropertyDetailsEpcModel
) )
from backend.app.db.models.funding import FundingPackageMeasures, FundingPackage from backend.app.db.models.funding import FundingPackageMeasures, FundingPackage
from backend.app.db.models.inspections import InspectionModel
def create_plan(session: Session, plan): def create_plan(session: Session, plan):
@ -210,6 +211,14 @@ def clear_portfolio(session: Session, portfolio_id: int):
# Delete all Recommendations associated with the properties # Delete all Recommendations associated with the properties
session.execute(delete(Recommendation).where(Recommendation.property_id.in_(property_ids))) session.execute(delete(Recommendation).where(Recommendation.property_id.in_(property_ids)))
session.execute(
delete(InspectionModel)
.where(InspectionModel.property_id.in_(
session.query(PropertyModel.id).filter(PropertyModel.portfolio_id == portfolio_id)
))
.execution_options(synchronize_session=False)
)
# Now, delete the PropertyModels and related details # Now, delete the PropertyModels and related details
# Delete PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel, and PropertyModel # Delete PropertyTargetsModel, PropertyDetailsMeter, PropertyDetailsEpcModel, and PropertyModel
session.execute(delete(PropertyTargetsModel).where(PropertyTargetsModel.portfolio_id == portfolio_id)) session.execute(delete(PropertyTargetsModel).where(PropertyTargetsModel.portfolio_id == portfolio_id))

View file

@ -5,6 +5,6 @@ from typing import Any, Optional
@dataclass @dataclass
class PropertyRequestData: class PropertyRequestData:
patch: dict patch: dict
already_installed: dict already_installed: list
non_invasive_recommendations: dict non_invasive_recommendations: dict
valuation: Optional[float] valuation: Optional[float]

View file

@ -3,6 +3,10 @@ from utils.s3 import read_from_s3
from backend.app.config import get_settings from backend.app.config import get_settings
from backend.app.plan.data_classes import PropertyRequestData from backend.app.plan.data_classes import PropertyRequestData
from typing import Any from typing import Any
from starlette.responses import Response
from utils.logger import setup_logger
logger = setup_logger()
def get_cleaned(): def get_cleaned():
@ -59,7 +63,7 @@ def extract_property_request_data(
property_already_installed = next(( property_already_installed = next((
x for x in already_installed if x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {}) ), [])
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn # we need to check existence of uprn
@ -118,11 +122,16 @@ def extract_property_request_data(
) )
def parse_eco_packages(config: dict[str, Any]) -> tuple[list[str], int, str] | tuple[None, None, None]: def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
None, None, None, list]:
solar_identification = config.get("solar_reason", None) solar_identification = config.get("solar_reason", None)
cavity_identification = config.get("cavity_reason", None) cavity_identification = config.get("cavity_reason", None)
if not solar_identification and not cavity_identification: if not solar_identification and not cavity_identification:
return None, None, None return None, None, None, []
landlord_heating_system = config["landlord_heating_system"]
# This is the initial version of tackling "already installed" measures
already_installed = ["air_source_heat_pump"] if landlord_heating_system == "air source heat pump" else []
# We map the categories to the desired measures and upgrade targets # We map the categories to the desired measures and upgrade targets
# We note that the categories are placeholder until we move the standardised asset list # We note that the categories are placeholder until we move the standardised asset list
@ -180,7 +189,23 @@ def parse_eco_packages(config: dict[str, Any]) -> tuple[list[str], int, str] | t
_key = cavity_identification.split(":")[0] _key = cavity_identification.split(":")[0]
mapped = identification_map[_key] mapped = identification_map[_key]
return mapped["measures"], mapped["target_sap"], mapped["plan_type"] measures = mapped["measures"]
# If we have already installed an ASHP, we adjust the measures
if "air_source_heat_pump" in already_installed:
if "high_heat_retention_storage_heater" in measures:
# If we have a HHRSH already, we remove it
measures.remove("high_heat_retention_storage_heater")
# Add in ASHP (replacing HHRSH if already had)
measures.append("air_source_heat_pump")
current_sap = prepared_epc.current_energy_efficiency
# If we have a solar package, and the property is a D or above, we don't need to do lofts
if "solar_eco4" in mapped["plan_type"] and current_sap >= 55:
if "loft_insulation" in measures:
measures.remove("loft_insulation")
return measures, mapped["target_sap"], mapped["plan_type"], already_installed
def handle_error(session, msg, status=500): def handle_error(session, msg, status=500):

View file

@ -594,6 +594,9 @@ async def model_engine(body: PlanTriggerRequest):
cleaning_data=cleaning_data, cleaning_data=cleaning_data,
) )
# If we have an ECO project, we parse the cavity/solar reasons
eco_packages[property_id] = parse_eco_packages(config, prepared_epc)
input_properties.append( input_properties.append(
Property( Property(
id=property_id, id=property_id,
@ -601,7 +604,7 @@ async def model_engine(body: PlanTriggerRequest):
address=epc_searcher.address_clean, address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean, postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc, epc_record=prepared_epc,
already_installed=req_data.already_installed, already_installed=req_data.already_installed + eco_packages[property_id][3],
property_valuation=req_data.valuation, property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations, non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment, energy_assessment=energy_assessment,
@ -609,9 +612,6 @@ async def model_engine(body: PlanTriggerRequest):
) )
) )
# If we have an ECO project, we parse the cavity/solar reasons
eco_packages[property_id] = parse_eco_packages(config)
# Final step - extract inspections data, if we have it # Final step - extract inspections data, if we have it
property_inspections = extract_inspection_data(config) property_inspections = extract_inspection_data(config)
if property_inspections: if property_inspections:
@ -890,6 +890,13 @@ async def model_engine(body: PlanTriggerRequest):
mainheat_energy_eff=p.data["mainheat-energy-eff"], mainheat_energy_eff=p.data["mainheat-energy-eff"],
) )
if r["already_installed"]:
# if already installed, we zero out the uplift and funding
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (
0, 0, 0, 0
)
input_measures = optimiser_functions.prepare_input_measures( input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True
) )

View file

@ -34,11 +34,11 @@ class CostOptimiser:
if min_gain == 0: if min_gain == 0:
return min_gain return min_gain
elif min_gain <= 5: elif min_gain <= 5:
return min_gain + 0.5 return min_gain + 0.25
elif min_gain <= 20: elif min_gain <= 20:
return min_gain + 1.5 return min_gain + 0.5
else: else:
return min_gain + 2 return min_gain + 0.75
def setup(self): def setup(self):
# Initialize Model # Initialize Model

View file

@ -222,7 +222,8 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
"path": {"reference": "unfunded:all"}, "path": {"reference": "unfunded:all"},
"scheme": "none", "scheme": "none",
"is_eligible": False, # no funding scheme applied "is_eligible": False, # no funding scheme applied
"unfunded_items": [] "unfunded_items": [],
"already_installed_gain": sum([x["gain"] for x in picked if x["already_installed"]])
}) })
# This function will filter down on innovation measures if we are social EPC D # This function will filter down on innovation measures if we are social EPC D
@ -264,6 +265,11 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
if not sub_measures: if not sub_measures:
continue continue
# If the only measure is loft insulation, we skip this because you cannot do a minor measure only (LI)
# under ECO4
if len(sub_measures) == 1 and sub_measures[0][0]["type"] in ["loft_insulation"]:
continue
picked, sub_cost, sub_gain = run_optimizer( picked, sub_cost, sub_gain = run_optimizer(
sub_measures, sub_measures,
budget=budget, # no fixed items; budget unchanged budget=budget, # no fixed items; budget unchanged
@ -275,6 +281,9 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
scheme = _path_scheme([path_spec]) scheme = _path_scheme([path_spec])
# We sum of gain, for already installed measures
already_installed_gain = sum([x["gain"] for x in picked if x["already_installed"]])
solutions.append( solutions.append(
{ {
"fixed_ids": [], "fixed_ids": [],
@ -283,8 +292,11 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
"total_gain": sub_gain, "total_gain": sub_gain,
"path": path_spec, "path": path_spec,
"scheme": scheme, "scheme": scheme,
"is_eligible": _is_eligible_funding_package(scheme, p.data["current-energy-efficiency"], sub_gain), "is_eligible": _is_eligible_funding_package(
"unfunded_items": [] scheme, float(p.data["current-energy-efficiency"]), sub_gain
),
"unfunded_items": [],
"already_installed_gain": already_installed_gain
} }
) )
@ -409,6 +421,9 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
total_cost += unfunded_cost total_cost += unfunded_cost
total_gain += unfunded_gain total_gain += unfunded_gain
# We now grab the "already installed gain"
already_installed_gain = sum([x["gain"] for x in total_picks if x["already_installed"]])
solutions.append({ solutions.append({
"fixed_ids": fixed_ids, "fixed_ids": fixed_ids,
"items": total_picks, "items": total_picks,
@ -420,6 +435,7 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
scheme, int(p.data["current-energy-efficiency"]), total_gain scheme, int(p.data["current-energy-efficiency"]), total_gain
), ),
"unfunded_items": unfunded_picked, "unfunded_items": unfunded_picked,
"already_installed_gain": already_installed_gain
}) })
solutions = pd.DataFrame(solutions) solutions = pd.DataFrame(solutions)
@ -437,7 +453,9 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
solutions["starting_sap"] = int(p.data["current-energy-efficiency"]) solutions["starting_sap"] = int(p.data["current-energy-efficiency"])
solutions["floor_area"] = p.floor_area solutions["floor_area"] = p.floor_area
solutions["ending_sap"] = solutions["starting_sap"] + solutions["total_gain"] solutions["ending_sap"] = solutions["starting_sap"] + solutions["total_gain"]
solutions["starting_band"] = solutions["starting_sap"].apply(funding.get_sap_band) solutions["starting_band"] = (solutions["starting_sap"] + solutions["already_installed_gain"]).apply(
funding.get_sap_band
)
solutions["ending_band"] = solutions["ending_sap"].apply(funding.get_sap_band) solutions["ending_band"] = solutions["ending_sap"].apply(funding.get_sap_band)
solutions["floor_area_band"] = solutions["floor_area"].apply(funding.get_floor_area_band) solutions["floor_area_band"] = solutions["floor_area"].apply(funding.get_floor_area_band)
solutions["project_score"] = solutions.apply( solutions["project_score"] = solutions.apply(

View file

@ -120,6 +120,7 @@ def prepare_input_measures(property_recommendations, goal, needs_ventilation, fu
"partial_project_funding": rec["partial_project_funding"], "partial_project_funding": rec["partial_project_funding"],
"partial_project_score": rec["partial_project_score"], "partial_project_score": rec["partial_project_score"],
"uplift_project_score": rec["uplift_project_score"], "uplift_project_score": rec["uplift_project_score"],
"already_installed": rec.get("already_installed", False),
} }
) )