from enum import Enum from typing import List import pandas as pd from utils.logger import setup_logger from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes from backend.app.plan.schemas import VALID_HOUSING_TYPES, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES, \ MEASURE_MAP logger = setup_logger(__name__) class EligibilityCaveats(Enum): EPC_RATING = "epc_rating" # EPC requirements not met TENANT_ON_BENEFITS_OR_LOW_INCOME = "tenant_on_benefits_or_low_income" INNOVATION_REQUIRED = "innovation_required" SOLAR_NEEDS_HEATING = "solar_needs_heating" MINIMUM_INSULATION_PRECONDITIONS_NOT_MET = "minimum_insulation_preconditions_not_met" class Funding: """ Handles eligibility and funding calculations for ECO4 & GBIS (PRS + Social Housing). """ SOLID_FUELS = [ 'wood logs', 'manufactured smokeless fuel', 'house coal', 'smokeless coal', 'oil', 'dual fuel mineral wood', 'anthracite', 'dual fuel appliance mineral and wood', "bulk wood pellets", "wood chips", "wood pellets" ] def __init__( self, tenure: str, # 'Private' or 'Social' eco4_social_cavity_abs_rate: float, eco4_social_solid_abs_rate: float, eco4_private_cavity_abs_rate: float, eco4_private_solid_abs_rate: float, gbis_social_cavity_abs_rate: float, gbis_social_solid_abs_rate: float, gbis_private_cavity_abs_rate: float, gbis_private_solid_abs_rate: float, project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes ): if tenure not in VALID_HOUSING_TYPES: raise ValueError("Invalid tenure type. Must be 'Private' or 'Social'.") self.tenure = tenure self.eco4_social_cavity_abs_rate = eco4_social_cavity_abs_rate self.eco4_social_solid_abs_rate = eco4_social_solid_abs_rate self.eco4_private_cavity_abs_rate = eco4_private_cavity_abs_rate self.eco4_private_solid_abs_rate = eco4_private_solid_abs_rate self.gbis_social_cavity_abs_rate = gbis_social_cavity_abs_rate self.gbis_social_solid_abs_rate = gbis_social_solid_abs_rate self.gbis_private_cavity_abs_rate = gbis_private_cavity_abs_rate self.gbis_private_solid_abs_rate = gbis_private_solid_abs_rate self.starting_sap_band = None self.ending_sap_band = None self.floor_area_band = None self.project_scores_matrix = project_scores_matrix self.partial_project_scores_matrix = partial_project_scores_matrix self.whlg_eligible_postcodes = whlg_eligible_postcodes self.eco4_eligible = False self.eco4_eligibility_caveats = [] self.gbis_eligible = False self.gbis_eligibility_caveats = [] # Funding calculation variables self.full_project_abs = None self.gbis_funding = None self.eco4_funding = None self.eco4_uplift = 0 self.gbis_uplift = 0 self.partial_project_abs = None # ----------------------- # Utility Helpers # ----------------------- @staticmethod def get_sap_band(sap_score_number): bands = [ ("High_A", 96, float("inf")), ("Low_A", 92, 96), ("High_B", 86, 92), ("Low_B", 81, 86), ("High_C", 74.5, 81), ("Low_C", 69, 74.5), ("High_D", 61.5, 69), ("Low_D", 55, 61.5), ("High_E", 46.5, 55), ("Low_E", 39, 46.5), ("High_F", 29.5, 39), ("Low_F", 21, 29.5), ("High_G", 10.5, 21), ("Low_G", 1, 10.5), ] for band, lower, upper in bands: if lower <= sap_score_number < upper: return band return None @staticmethod def get_floor_area_band(floor_area): if floor_area <= 72: return "0-72" if floor_area <= 97: return "73-97" if floor_area <= 199: return "98-199" return "200+" @staticmethod def _split_measures(measures: List[dict]): """ Extracts measure types and flags innovation. measures: list of dicts like {"type": "solar_pv", "is_innovation": True} """ measure_types = [m["type"] for m in measures] innovation_flags = [m.get("is_innovation", False) for m in measures] uplifts = [m["innovation_uplift"] for m in measures] innovation_measures = [m["type"] for m in measures if m.get("is_innovation", False)] return measure_types, uplifts, innovation_flags, innovation_measures @staticmethod def _meets_upgrade_target(starting_sap: int, ending_sap: int) -> bool: """ ECO4 Upgrade Requirement: - EPC E/D (SAP ≥ 39): must upgrade to EPC C (SAP ≥ 69) - EPC F/G (SAP < 39): must upgrade to EPC D (SAP ≥ 55) """ if starting_sap >= 39 and ending_sap >= 69: return True if starting_sap < 39 and ending_sap >= 55: return True return False # ----------------------- # Private Rented Sector # ----------------------- def eco4_prs_eligibility( self, starting_sap: int, ending_sap: int, measure_types: List, has_solar: bool, solar_eligible: bool ): """ ECO4 PRS eligibility: - EPC E–G - Must include SWI, FTCH, renewable heating, or DHC - Tenant must be on benefits (flagged) """ meets_epc = starting_sap <= 54 # EPC E–G meets_upgrade_target = self._meets_upgrade_target(starting_sap, ending_sap) if not meets_epc or not meets_upgrade_target: self.eco4_eligible = False self.eco4_eligibility_caveats.append(EligibilityCaveats.EPC_RATING) return if has_solar and not solar_eligible: self.eco4_eligible = False self.eco4_eligibility_caveats.append(EligibilityCaveats.SOLAR_NEEDS_HEATING) return has_swi = "internal_wall_insulation" in measure_types or "external_wall_insulation" in measure_types has_renewable = "air_source_heat_pump" in measure_types or "ground_source_heat_pump" in measure_types has_ftch = "first_time_central_heating" in measure_types has_dhc = "district_heating_connection" in measure_types if meets_upgrade_target and meets_epc and ( has_swi or has_renewable or has_ftch or has_dhc or solar_eligible ): self.eco4_eligible = True self.eco4_eligibility_caveats.append(EligibilityCaveats.TENANT_ON_BENEFITS_OR_LOW_INCOME) return self.eco4_eligible = False self.eco4_eligibility_caveats = [] def gbis_prs_eligibility(self, starting_sap: int, council_tax_band: str, measure_types: List): """ GBIS PRS eligibility: - General route: Council Tax Band & EPC D–G - Low-income route: tenant on benefits (flagged) """ gbis_measures = { "general": [ # Cannot do CWI "internal_wall_insulation", "external_wall_insulation", "flat_roof_insulation", "suspended_floor_insulation", "room_roof_insulation", "solid_floor_insulation" ], "low_income": [ "internal_wall_insulation", "external_wall_insulation", "flat_roof_insulation", "suspended_floor_insulation", "room_roof_insulation", "solid_floor_insulation", "cavity_wall_insulation", "loft_insulation" ] } meets_epc = starting_sap <= 69 # EPC D–G is_single_measure = len(measure_types) == 1 if not is_single_measure or not meets_epc: self.gbis_eligible = False self.gbis_eligibility_caveats = [] return # General route if meets_epc and council_tax_band in ["A", "B", "C", "D", "E"]: if any(m in gbis_measures["general"] for m in measure_types): self.gbis_eligible = True # Low-income route if meets_epc and any(m in gbis_measures["low_income"] for m in measure_types): self.gbis_eligible = True self.gbis_eligibility_caveats.append(EligibilityCaveats.TENANT_ON_BENEFITS_OR_LOW_INCOME) # ----------------------- # Social Housing # ----------------------- def eco4_sh_eligibility( self, starting_sap: int, ending_sap: int, has_innovation: bool, has_solar: bool, solar_eligible: bool, solar_meets_mir: bool, ): """ ECO4 Social Housing eligibility. - EPC E–G: eligible, no income check. - EPC D: innovation measure required. If solar PV is the innovation measure, must also install ASHP or HHRSH. """ if has_solar and not solar_eligible: # The package contins solar PV but it doesn't meet the eligibility requirements self.eco4_eligible = False if not solar_meets_mir: self.eco4_eligibility_caveats.append(EligibilityCaveats.MINIMUM_INSULATION_PRECONDITIONS_NOT_MET) else: self.eco4_eligibility_caveats.append(EligibilityCaveats.SOLAR_NEEDS_HEATING) return meets_epc = starting_sap <= 69 meets_upgrade_target = self._meets_upgrade_target(starting_sap, ending_sap) if not meets_epc or not meets_upgrade_target: return # EPC D innovation rule if 55 <= starting_sap <= 68: # EPC D # If we don't meet the innovation requirements, we're not eligible if not has_innovation: self.eco4_eligible = False self.eco4_eligibility_caveats.append(EligibilityCaveats.INNOVATION_REQUIRED) return self.eco4_eligible = True self.eco4_eligibility_caveats = [] return self.eco4_eligible = True self.eco4_eligibility_caveats = [] def gbis_sh_eligibility(self, starting_sap: int, measure_types: List, has_innovation: bool): """ GBIS Social Housing eligibility. - EPC E–G: single insulation measure - EPC D: single insulation, innovation measure """ meets_epc = starting_sap <= 69 # EPC D–G is_single_measure = len(measure_types) == 1 # Check if has a valid measure insulation_measures = [ 'internal_wall_insulation', 'external_wall_insulation', 'cavity_wall_insulation', 'loft_insulation', 'flat_roof_insulation', 'room_roof_insulation', 'suspended_floor_insulation', 'solid_floor_insulation', ] has_valid_measures = any(m in measure_types for m in insulation_measures) if not is_single_measure or not meets_epc or not has_valid_measures: self.gbis_eligible = False self.gbis_eligibility_caveats = [] return if 55 <= starting_sap <= 68: # EPC D # Since it's single measure if has_innovation is true, the single insulation measure # must be the innovation measure if not has_innovation: self.gbis_eligible = False self.gbis_eligibility_caveats.append(EligibilityCaveats.INNOVATION_REQUIRED) return # If we don't have an innovation measure, we're not eligible self.gbis_eligible = False self.gbis_eligibility_caveats.append(EligibilityCaveats.SOLAR_NEEDS_HEATING) return self.gbis_eligible = True # ----------------------- # Score Lookup # ----------------------- def calculate_full_project_abs(self): """Look up ABS score for full projects.""" data = self.project_scores_matrix[ (self.project_scores_matrix["Floor Area Segment"] == self.floor_area_band) & (self.project_scores_matrix["Starting Band"] == self.starting_sap_band) & (self.project_scores_matrix["Finishing Band"] == self.ending_sap_band) ] if data.empty: raise ValueError("Missing ABS rate, check the project scores matrix") return data["Cost Savings"].values[0] def _calculate_full_project_abs(self, floor_area_band: str, starting_sap_band: str, ending_sap_band: str): if (starting_sap_band == ending_sap_band) or ( starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"] ): return 0 data = self.project_scores_matrix[ (self.project_scores_matrix["Floor Area Segment"] == floor_area_band) & (self.project_scores_matrix["Starting Band"] == starting_sap_band) & (self.project_scores_matrix["Finishing Band"] == ending_sap_band) ] if data.empty: raise ValueError("Missing ABS rate, check the project scores matrix") return data["Cost Savings"].values[0] @staticmethod def get_starting_ending_uvalues(current_uvalue: float) -> tuple[str, str]: """ Returns the closest starting U-value and appropriate ending U-value for solid wall insulation. - If the closest starting U-value is 0.45, assume an improvement to 0.21. - Otherwise, assume improvement to 0.30. - Special formatting rules: - If closest is 0.45 → return "2" (string) - If closest is 2.00 → return "2.0" - Else: format with 2 decimal places """ possible_starting_u_values = [2.00, 1.70, 1.00, 0.60, 0.45] closest_starting = min(possible_starting_u_values, key=lambda x: abs(x - current_uvalue)) # Determine the ending U-value ending_uvalue = "0.21" if closest_starting == 0.45 else "0.3" # Format the starting U-value according to special rules if closest_starting == 0.45: starting_str = "0.45" elif closest_starting == 2.00: starting_str = "2.0" elif closest_starting == 1.70: starting_str = "1.7" elif closest_starting == 1: starting_str = "1.0" else: starting_str = f"{closest_starting:.2f}" return starting_str, ending_uvalue def _map_to_pre_main_heating(self, mainheating, main_fuel, mainheat_energy_eff): # We check most likely primary heating system. Because mixed systems are hard to break up, we # check the larger, more prominent heating systems first and then the smaller ones. We aim # to cover the case where properties have heating systems like # "boiler radiators, mains gas, electric storage heaters" so mixed systems is_solid_fuel = (main_fuel["fuel_type"] in self.SOLID_FUELS) or ( mainheating["has_dual_fuel_mineral_and_wood"] or mainheating["has_coal"] or mainheating["has_anthracite"] or mainheating["has_smokeless_fuel"] or mainheating["has_mineral_and_wood"] or mainheating["has_dual_fuel_appliance"] ) if mainheating["has_air_source_heat_pump"]: return 'Air to Water ASHP' if mainheating["has_boiler"] and (main_fuel["fuel_type"] == "biomass"): return 'Biomass Boiler' if mainheating["has_boiler"] and (main_fuel["fuel_type"] == "lpg"): return 'Bottled LPG Boiler' if mainheating["has_boiler"] and ( (main_fuel["fuel_type"] in ["mains gas", "biogas"]) or ( (main_fuel["fuel_type"] == "unknown") and (mainheating["has_mains_gas"])) ): if mainheat_energy_eff in ["Good", "Very Good"]: # Assume higher efficiency condensing boiler return 'Condensing Gas Boiler' if mainheat_energy_eff in ["Average", "Poor"]: return 'Non Condensing Gas Boiler' return 'Gas Back Boiler to Radiators' if mainheating["has_boiler"] and (main_fuel["fuel_type"] == "mains gas") and ( mainheat_energy_eff in ["Very Poor"] ) and not mainheating["has_radiators"]: # Doesnt have radiators return 'Gas Fire with Back Boiler' if mainheating["has_boiler"] and (main_fuel["fuel_type"] in ["oil", "b30k"]): # b30k - kerosene if mainheat_energy_eff in ["Good", "Very Good"]: return 'Condensing Oil Boiler' return 'Non Condensing Oil Boiler' if mainheating["has_boiler"] and (main_fuel["fuel_type"] == "lpg") and ( mainheat_energy_eff in ["Good", "Very Good"] ): return 'Condensing LPG Boiler' if mainheating["has_boiler"] and (main_fuel["fuel_type"] == "lpg") and ( mainheat_energy_eff in ["Average", "Very Poor", "Poor"] ): return 'Non Condensing LPG Boiler' if mainheating["has_boiler"] and is_solid_fuel: return 'Solid Fossil Boiler' if mainheating["has_ground_source_heat_pump"] or mainheating["has_water_source_heat_pump"]: return 'GSHP' if mainheating["has_boiler"] and (main_fuel["fuel_type"] in ["electric", "electricity"]): return 'Electric Boiler' if mainheating["has_community_scheme"] and mainheat_energy_eff in ["Good", "Very Good"]: return 'DHS CHP' if mainheating["has_community_scheme"] and ( mainheat_energy_eff in ["Average", "Very Poor", "Poor"] or pd.isnull(mainheat_energy_eff) ): return 'DHS non-CHP' if mainheating["has_electric_storage_heaters"] and ( (mainheat_energy_eff == "Very Poor") or pd.isnull(mainheat_energy_eff) ): return 'Electric Storage Heaters Responsiveness <=0.2' if mainheating["has_electric_storage_heaters"] and mainheat_energy_eff in [ "Poor", "Average", "Good", "Very Good", ]: return 'Electric Storage Heaters Responsiveness >0.2' if mainheating["has_room_heaters"] and main_fuel["fuel_type"] == "lpg": return 'Bottled LPG Room Heaters' if mainheating["has_room_heaters"] and ( (main_fuel["fuel_type"] == "electricity") or mainheating["has_electric"] ): return 'Electric Room Heaters' if mainheating["has_room_heaters"] and main_fuel["fuel_type"] == "mains gas": return 'Gas Room Heaters' if mainheating["has_room_heaters"] and is_solid_fuel: return 'Solid Fossil Room Heaters' # Handle the case of no heating system - electric heaters assumed if mainheating["has_no_system_present"] or mainheating["has_portable_electric_heaters"] or ( mainheating["has_warm_air"] and mainheating["has_electric"] and not mainheating["has_electricaire"] ) or mainheating['has_hot-water-only']: return 'Electric Room Heaters' if not any(mainheating.values()): # This means we have an unknown heating system like 'SAP05:Main-Heating' return 'Electric Room Heaters' if mainheating["has_warm_air"] and main_fuel["fuel_type"] == "mains gas": if mainheat_energy_eff in ["Good", "Very Good"]: return 'Condensing Gas Boiler' if mainheat_energy_eff in ["Average", "Poor"]: return 'Non Condensing Gas Boiler' return 'Gas Back Boiler to Radiators' if mainheating["has_electricaire"]: # Based on current understanding, electricaire is an electric warm air storage heater, using # off-peak electricity to heat a thermal store and then a fan blows the heat through ducts # into rooms if mainheat_energy_eff == "Very Poor": return "Electric Storage Heaters Responsiveness <=0.2" return "Electric Storage Heaters Responsiveness >0.2" # direct-acting electric space heating (no storage) if mainheating["has_electric_underfloor_heating"] or mainheating["has_electric_ceiling_heating"]: return "Electric Room Heaters" # Treat warm air lpg as a direct acting lpg oiler if mainheating["has_warm_air"] and main_fuel["fuel_type"] == "lpg": if mainheat_energy_eff in ["Good", "Very Good"]: return 'Condensing LPG Boiler' return 'Non Condensing LPG Boiler' # Treat warm air oil as a direct acting oil boiler if mainheating["has_warm_air"] and main_fuel["fuel_type"] == "oil": if mainheat_energy_eff in ["Good", "Very Good"]: return 'Condensing Oil Boiler' return 'Non Condensing Oil Boiler' fuels_identified = [] for fuel in MainHeatAttributes.FUEL_TYPES: fuels_identified.append(mainheating[f"has_{fuel.replace(' ', '_')}"]) unknown_fuel = main_fuel["fuel_type"] == "unknown" and not any(fuels_identified) if mainheating["has_boiler"] and unknown_fuel: return 'Non Condensing Gas Boiler' raise ValueError("Invalid pre heating system") def calculate_partial_project_abs( self, measure_type: str, filtered_pps_matrix: pd.DataFrame, pre_heating_system: str, current_wall_uvalue: float = None, is_partial: bool = False, existing_li_thickness: float = None, has_no_system: bool = False, ): """ Calculate the partial project ABS score for a single measure. """ # Filter on the starting band and floor area so we only do this once if measure_type == "internal_wall_insulation": if current_wall_uvalue is None: raise ValueError("current_wall_uvalue is required for IWI") starting_str, ending_str = self.get_starting_ending_uvalues(current_wall_uvalue) measure_code = f"IWI_solid_{starting_str}_{ending_str}" pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code] if pps.shape[0] != 1: if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 raise ValueError(f"Invalid IWI category: {measure_code}") return pps.squeeze()["Cost Savings"] if measure_type == "external_wall_insulation": if current_wall_uvalue is None: raise ValueError("current_wall_uvalue is required for EWI") starting_str, ending_str = self.get_starting_ending_uvalues(current_wall_uvalue) measure_code = f"EWI_solid_{starting_str}_{ending_str}" pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code] if pps.shape[0] != 1: if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 raise ValueError(f"Invalid EWI category: {measure_code}") return pps.squeeze()["Cost Savings"] if measure_type == "cavity_wall_insulation": measure_code = "CWI_partial_fill" if is_partial else "CWI_0.033" pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code] if pps.shape[0] != 1: if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 raise ValueError(f"Invalid CWI category: {measure_code}") return pps.squeeze()["Cost Savings"] if measure_type == "loft_insulation": if existing_li_thickness is None: raise ValueError("existing_li_thickness is required for LI") measure_code = "LI_lessequal100" if existing_li_thickness <= 100 else "LI_greater100" pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code] # There's no funding for EPC C or above if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 if pps.shape[0] != 1: raise ValueError(f"Invalid LI category: {measure_code}") return pps.squeeze()["Cost Savings"] if measure_type == "flat_roof_insulation": # Not funding for properties starting at C or above if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == "FRI"] if pps.shape[0] != 1: raise ValueError("Invalid FRI category") return pps.squeeze()["Cost Savings"] if measure_type == "room_roof_insulation": # Use the more conservative score (unin is usually lower) # code = "RIRI_res_unin" if not is_roof_insulated else "RIRI_res_in" code = "RIRI_res_unin" pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == code] if pps.shape[0] != 1: if pps.empty and self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 raise ValueError(f"Invalid RIRI category: {code}") return pps.squeeze()["Cost Savings"] if measure_type == "suspended_floor_insulation": if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: # We don't fund SFI for properties starting at C or above return 0 pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == "UFI"] if pps.shape[0] != 1: raise ValueError("Invalid UFI category") return pps.squeeze()["Cost Savings"] if measure_type == "solid_floor_insulation": if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: # We don't fund SFI for properties starting at C or above return 0 pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == "SFI"] if pps.shape[0] != 1: raise ValueError("Invalid SFI category") return pps.squeeze()["Cost Savings"] if measure_type == "solar_pv": solar_pps_df = filtered_pps_matrix[ (filtered_pps_matrix["Measure_Type"] == "Solar_PV") & (filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system) ] if solar_pps_df.empty and self.starting_sap_band in [ "Low_C", "High_C", "Low_B", "High_B", "Low_B", "High_A", "Low_A" ]: # No funding for EPC C or above return 0 return solar_pps_df.squeeze()["Cost Savings"] if measure_type == "air_source_heat_pump": # No funding for EPC C or above if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 pps_data = filtered_pps_matrix[ filtered_pps_matrix["Post_Main_Heating_Source"] == "Air to Water ASHP" ] if pre_heating_system not in pps_data["Pre_Main_Heating_Source"].values: logger.info( f"No PPS data for ASHP upgrade from {pre_heating_system}, returning 0" ) return 0 pps = pps_data[ (pps_data["Pre_Main_Heating_Source"] == pre_heating_system) & (pps_data["Measure_Type"] == "B_Upgrade_nopreHCs") # We assume we'll be making a heating system upgrade ] # Not every pre heating system will result in PPS, e.g. a ground source heat pump to ASHP upgrade # won't have a PPS. if pps.shape[0] != 1: raise ValueError("something went wrong, more than one pps for ashp") return pps.squeeze()["Cost Savings"] if measure_type == "high_heat_retention_storage_heaters": pps_data = filtered_pps_matrix[ filtered_pps_matrix["Post_Main_Heating_Source"] == "High Heat Retention Storage Heaters" ] # Not every heating upgrade, that ends at HHRSH, will have a PPS. E.g. a gas boiler to HHRSH upgrade # doesn't have a PPS if pre_heating_system in pps_data["Pre_Main_Heating_Source"].values: pps = pps_data[ pps_data["Pre_Main_Heating_Source"] == pre_heating_system ] if pps.shape[0] != 1: raise ValueError("something went wrong, more than one pps for HHRSH") return pps.squeeze()["Cost Savings"] return 0 if measure_type == "time_temperature_zone_control": pps = filtered_pps_matrix[ filtered_pps_matrix["Measure_Type"] == "TTZC" ] if pre_heating_system in pps["Pre_Main_Heating_Source"].values: pps = pps[pps["Pre_Main_Heating_Source"] == pre_heating_system] if pps.shape[0] != 1: raise ValueError("something went wrong, more than one pps for TTZC") return pps.squeeze()["Cost Savings"] # If we don't have a pre heating system, we assume the measure is not applicable return 0 if measure_type in ["double_glazing", "secondary_glazing"]: # No funding for EPC C or above if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]: return 0 # pps is under the WG_singletodouble Measure_Type pps = filtered_pps_matrix[ filtered_pps_matrix["Measure_Type"] == "WG_singletodouble" ] return pps.squeeze()["Cost Savings"] if measure_type == "roomstat_programmer_trvs": # We can get funding for TRVs pps = filtered_pps_matrix[ filtered_pps_matrix["Measure_Type"] == "TRV" ] if pre_heating_system in pps["Pre_Main_Heating_Source"].values: pps = pps[pps["Pre_Main_Heating_Source"] == pre_heating_system] if pps.shape[0] != 1: raise ValueError("something went wrong, more than one pps for TRV") return pps.squeeze()["Cost Savings"] # If we don't have a pre heating system, we assume the measure is not applicable return 0 if measure_type == "time_temperature_zone_control": pps = filtered_pps_matrix[ filtered_pps_matrix["Measure_Type"] == "TTZC" ] if pre_heating_system in pps["Pre_Main_Heating_Source"].values: pps = pps[pps["Pre_Main_Heating_Source"] == pre_heating_system] if pps.shape[0] != 1: raise ValueError("something went wrong, more than one pps for TTZC") return pps.squeeze()["Cost Savings"] # If we don't have a pre heating system, we assume the measure is not applicable return 0 if measure_type == "boiler_upgrade": # We don't have funding for a gas to gas boiler upgrade unless it's first time central heating if pre_heating_system == "Condensing Gas Boiler": return 0 if has_no_system: pps = filtered_pps_matrix[ (filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system) & (filtered_pps_matrix["Post_Main_Heating_Source"] == "Condensing Gas Boiler") & (filtered_pps_matrix["Measure_Type"] == "B_First_Time_CH") ] else: pps = filtered_pps_matrix[ (filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system) & (filtered_pps_matrix["Post_Main_Heating_Source"] == "Condensing Gas Boiler") # (filtered_pps_matrix["Measure_Type"] == "B_Upgrade_preHCs") ] # Depending on different systems, e.g. room heaters, we take the best options if pps.shape[0] > 1: pps = pps[pps["Cost Savings"] == min(pps["Cost Savings"])].head(1) # No funding for EPC C or above if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"] or pps.empty: return 0 if pps.shape[0] != 1: raise ValueError("something went wrong, more than one pps for boiler upgrade") return pps.squeeze()["Cost Savings"] raise ValueError(f"Invalid measure type for partial project ABS calculation: {measure_type}") # ----------------------- # Main Entry Point # ----------------------- @staticmethod def check_solar_eligible_heating_system(mainheat_description, heating_control_description): """ Checks if the main heating system is eligible for solar PV funding. :param mainheat_description: Describes the primary heating system :param heating_control_description: Heating controls associated to the primary heating system :return: """ return ( any(x in mainheat_description.lower() for x in [ "air source heat pump", "ground source heat pump", "boiler and radiators, electric" ]) or ( ("electric storage heaters" in mainheat_description) and (heating_control_description.lower() == "controls for high heat retention storage heaters") ) ) def check_solar_eligibility( self, measure_types, mainheat_description, heating_control_description, has_wall_insulation_recommendation: bool = False, has_roof_insulation_recommendation: bool = False, ): """ Because of the various pre-requisites for solar, we have a self-contained function to check for eligibility Returns a tuple of booleans (has_solar, solar_eligible, meets_mir): corresponding to: - If the package contains solar PV - If the package is eligible for solar - whether the package meets the minimum insulation requirements (MIR) """ if "solar_pv" not in measure_types: return False, False, False # 1) We check if there is an eligible heating system in place has_eligibile_heating = self.check_solar_eligible_heating_system( mainheat_description, heating_control_description ) if not has_eligibile_heating: # We check if there is a recommendation for an ASHP or HHRSH if ("air_source_heat_pump" not in measure_types) and ( "high_heat_retention_storage_heaters" not in measure_types): return True, False, True # 2) We check if there is a wall insulation measure for this property. If so, we make sure # we have a wall insulation recommendation in this package if has_wall_insulation_recommendation: # Make sure we have a wall insulation recommendation if not any(m in measure_types for m in WALL_INSULATION_MEASURES): return True, False, False # 3) We check if there is a roof insulation measure for this property. If so, we make sure # we have a roof insulation recommendation in this package if has_roof_insulation_recommendation: # Make sure we have a roof insulation recommendation if not any(m in measure_types for m in ROOF_INSULATION_MEASURES): return True, False, False return True, True, True @staticmethod def meets_innovation_requirement( starting_sap: int, measures: List[dict], has_solar: bool, solar_meets_mir: bool, ) -> bool: """ Determines if the innovation requirement is met for EPC D social housing. - All measures must be innovation, unless: - solar is present - solar meets MIR (e.g. enough insulation) - solar is innovation - all other measures are insulation (can be non-innovation) """ # The condition is: # one of the following insulation measures must be installed as part of the # same ECO4 project: # o roof insulation (flat roof, pitched roof, room-in-roof) # o exterior facing wall insulation (cavity wall, solid wall) # o party cavity wall insulation # or, # • all measures listed above must already be installed # # All Band E, F and G homes receiving any heating measure and Band D homes # receiving FTCH or a DHC must have all exterior facing cavity walls and loft # (including rafters) / roof (including flat and pitched roof or room-in-roof) area # insulated (except where insulation is not possible and exemptions are lodged, # see 5.87). The insulation of these areas can be: # • installed as part of the same ECO4 project, # • pre-existing insulation, # • subject to exemptions or # • a combination of the above if not (55 <= starting_sap <= 68): return True # Only EPC D requires innovation check # Case 1: solar + MIR met if has_solar and solar_meets_mir: for m in measures: if m["type"] == "solar_pv": if not m.get("is_innovation", False): return False # solar must be innovation elif m["type"] not in WALL_INSULATION_MEASURES + ROOF_INSULATION_MEASURES + [ "suspended_floor_insulation", "solid_floor_insulation" ]: if not m.get("is_innovation", False): return False # non-insulation, non-innovation = not eligible return True # Case 2: No solar or MIR not met — all measures must be innovation return all(m.get("is_innovation", False) for m in measures) @staticmethod def has_heating_measure(measure_types: List[str]) -> bool: """ Heating measures include: ASHP, GSHP, FTCH, DHC, HHRSH, other storage heaters, heating controls, solar PV. """ heating_measures = MEASURE_MAP["heating"] + MEASURE_MAP["heating_controls"] + [ "first_time_central_heating", "district_heating_connection", "solar_pv" ] return any(m in heating_measures for m in measure_types) @staticmethod def meets_minimum_insulation_preconditions( starting_sap: int, measure_types: List[str], has_wall_insulation_recommendation: bool, has_roof_insulation_recommendation: bool, has_ftch: bool = False, has_dhc: bool = False, ) -> bool: """ Applies ECO4 insulation guidance: - **Precondition 1**: - Applies to EPC D homes WITHOUT FTCH or DHC - Must have at least one insulation measure IF any are recommended - **Precondition 2**: - Applies to EPC E/F/G or EPC D WITH FTCH or DHC - Must include ALL *recommended* exterior wall and roof insulation (floor is exempt) """ # Normalize insulation types from MEASURE_MAP wall_measures = MEASURE_MAP["wall_insulation"] roof_measures = MEASURE_MAP["roof_insulation"] floor_measures = MEASURE_MAP["floor_insulation"] has_any_insulation_recommendation = ( has_wall_insulation_recommendation or has_roof_insulation_recommendation # Floor is exempt, so we don't check for a recommendation here ) # EPC D homes with no FTCH/DHC must include at least one insulation measure if 55 <= starting_sap <= 68 and not has_ftch and not has_dhc: if not has_any_insulation_recommendation: return True return any(m in measure_types for m in wall_measures + roof_measures + floor_measures) # EPC EFG or D with FTCH/DHC: all recommended insulation types must be in place if has_wall_insulation_recommendation and not any(m in measure_types for m in wall_measures): return False if has_roof_insulation_recommendation and not any(m in measure_types for m in roof_measures): return False # We treat floors are exempt due to payback periods # if has_floor_insulation_recommendation and not any(m in measure_types for m in floor_measures): # return False return True def calc_innovation_uplift( self, measure_types, innovation_flags, uplifts, filtered_pps_matrix, pre_heating_system, mainheating, main_fuel, mainheat_energy_eff, current_wall_uvalue, is_partial, existing_li_thickness, ): """Wrapper fundgion to calculate the innovation uplift for a project.""" project_uplifts = [] for i, measure in enumerate(measure_types): if not innovation_flags[i]: project_uplifts.append(0) continue pps = self.calculate_partial_project_abs( measure_type=measure, current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system ) project_uplifts.append(pps * uplifts[i]) return sum(project_uplifts) def check_funding( self, measures: List[dict], starting_sap: int, ending_sap: int, floor_area: float, mainheat_description: str, heating_control_description: str, is_cavity: bool, current_wall_uvalue: float, is_partial: False, existing_li_thickness: float, mainheating: dict, main_fuel: dict, mainheat_energy_eff: str, council_tax_band: str = None, has_wall_insulation_recommendation: bool = False, has_roof_insulation_recommendation: bool = False, ): """ Given a list of measures, check ECO4/GBIS eligibility. Because measures like solar PV are subject to the minimum insulation requirements and we can get exemptions on floor insulation recommendations, if has_wall_insulation_recommendation or has_roof_insulation_recommendation are true, we check that the measures package contain a wall or roof insulation measure otherwise solar PV isn't eligible """ # Normalize measures measure_types, uplifts, innovation_flags, innovation_measures = self._split_measures(measures) # If we have a heating measure, we check if we meet the pre conditions has_ftch = "first_time_central_heating" in measure_types has_dhc = "district_heating_connection" in measure_types has_heating = self.has_heating_measure(measure_types) if has_heating: meets_mir = self.meets_minimum_insulation_preconditions( starting_sap, measure_types, has_wall_insulation_recommendation, has_roof_insulation_recommendation, has_ftch=has_ftch, has_dhc=has_dhc, ) if not meets_mir: self.eco4_eligible = False self.eco4_eligibility_caveats.append( EligibilityCaveats.MINIMUM_INSULATION_PRECONDITIONS_NOT_MET ) return # Determine if we have a solar eligible heating system has_solar, solar_eligible, solar_meets_mir = self.check_solar_eligibility( measure_types, mainheat_description, heating_control_description, has_wall_insulation_recommendation, has_roof_insulation_recommendation, ) meets_innovation = self.meets_innovation_requirement( starting_sap, measures, has_solar, solar_meets_mir ) # Track EPC bands and floor area self.starting_sap_band = self.get_sap_band(starting_sap) self.ending_sap_band = self.get_sap_band(ending_sap) self.floor_area_band = self.get_floor_area_band(floor_area) filtered_pps_matrix = self.partial_project_scores_matrix[ (self.partial_project_scores_matrix["Total Floor Area Band"] == self.floor_area_band) & (self.partial_project_scores_matrix["Starting Band"] == self.starting_sap_band) ].copy() pre_heating_system = self._map_to_pre_main_heating(mainheating, main_fuel, mainheat_energy_eff) if self.tenure == "Private": # ECO4 PRS self.eco4_prs_eligibility(starting_sap, ending_sap, measure_types, has_solar, solar_eligible) # GBIS PRS self.gbis_prs_eligibility(starting_sap, council_tax_band or "", measure_types) if self.eco4_eligible: # Calculate the full project ABS for ECO4 self.full_project_abs = self.calculate_full_project_abs() self.eco4_uplift = self.calc_innovation_uplift( measure_types=measure_types, innovation_flags=innovation_flags, uplifts=uplifts, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system, mainheating=mainheating, main_fuel=main_fuel, mainheat_energy_eff=mainheat_energy_eff, current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, ) self.eco4_funding = (self.full_project_abs + self.eco4_uplift) * ( self.eco4_social_cavity_abs_rate if is_cavity else self.eco4_social_solid_abs_rate ) if self.gbis_eligible: self.partial_project_abs = self.calculate_partial_project_abs( measure_type=measure_types[0], current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system ) self.gbis_uplift = self.calc_innovation_uplift( measure_types=measure_types, innovation_flags=innovation_flags, uplifts=uplifts, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system, mainheating=mainheating, main_fuel=main_fuel, mainheat_energy_eff=mainheat_energy_eff, current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, ) self.gbis_funding = (self.partial_project_abs + self.gbis_uplift) * ( self.gbis_private_cavity_abs_rate if is_cavity else self.gbis_private_solid_abs_rate ) elif self.tenure == "Social": # ECO4 Social self.eco4_sh_eligibility( starting_sap, ending_sap, meets_innovation, has_solar, solar_eligible, solar_meets_mir ) # GBIS Social self.gbis_sh_eligibility(starting_sap, measure_types, meets_innovation) if self.eco4_eligible: # Calculate the full project ABS for ECO4 self.full_project_abs = self.calculate_full_project_abs() self.eco4_uplift = self.calc_innovation_uplift( measure_types=measure_types, innovation_flags=innovation_flags, uplifts=uplifts, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system, mainheating=mainheating, main_fuel=main_fuel, mainheat_energy_eff=mainheat_energy_eff, current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, ) self.eco4_funding = (self.full_project_abs + self.eco4_uplift) * ( self.eco4_social_cavity_abs_rate if is_cavity else self.eco4_social_solid_abs_rate ) if self.gbis_eligible: # Calculate the partial project score - this is dependent on the measure self.partial_project_abs = self.calculate_partial_project_abs( measure_type=measure_types[0], current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system ) self.gbis_uplift = self.calc_innovation_uplift( measure_types=measure_types, innovation_flags=innovation_flags, uplifts=uplifts, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system, mainheating=mainheating, main_fuel=main_fuel, mainheat_energy_eff=mainheat_energy_eff, current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, ) self.gbis_funding = (self.partial_project_abs + self.gbis_uplift) * ( self.gbis_social_cavity_abs_rate if is_cavity else self.gbis_social_solid_abs_rate ) else: raise NotImplementedError("Only 'Private' and 'Social' tenures are supported.") def get_innovation_uplift( self, measure, starting_sap, floor_area, current_wall_uvalue, mainheating, main_fuel, mainheat_energy_eff, is_partial, is_cavity, existing_li_thickness=None ): """ Helper function to calculate the innovation uplift for a measure based on the PPS :param measure: :param current_wall_uvalue: :return: """ self.starting_sap_band = self.get_sap_band(starting_sap) self.floor_area_band = self.get_floor_area_band(floor_area) filtered_pps_matrix = self.partial_project_scores_matrix[ (self.partial_project_scores_matrix["Total Floor Area Band"] == self.floor_area_band) & (self.partial_project_scores_matrix["Starting Band"] == self.starting_sap_band) ].copy() pre_heating_system = self._map_to_pre_main_heating(mainheating, main_fuel, mainheat_energy_eff) has_no_system = mainheating["has_no_system_present"] measure_type = measure["measure_type"] pps = self.calculate_partial_project_abs( measure_type=measure_type, current_wall_uvalue=current_wall_uvalue, is_partial=is_partial, existing_li_thickness=existing_li_thickness, filtered_pps_matrix=filtered_pps_matrix, pre_heating_system=pre_heating_system, has_no_system=has_no_system ) innovation_uplift = pps * measure["innovation_rate"] if self.tenure == "Private": # We return ECO4 rates rate = ( self.eco4_private_cavity_abs_rate if is_cavity else self.eco4_private_solid_abs_rate ) return pps, pps * rate, innovation_uplift * rate, innovation_uplift if self.tenure == "Social": # We return ECO4 rates rate = ( self.eco4_social_cavity_abs_rate if is_cavity else self.eco4_social_solid_abs_rate ) return pps, pps * rate, innovation_uplift * rate, innovation_uplift raise ValueError("Invalid tenure type for innovation uplift calculation: {}".format(self.tenure)) def get_eco4_abs_rate(self, is_cavity: bool) -> float: if self.tenure == "Social": return self.eco4_social_cavity_abs_rate if is_cavity else self.eco4_social_solid_abs_rate if self.tenure == "Private": return self.eco4_private_cavity_abs_rate if is_cavity else self.eco4_private_solid_abs_rate raise NotImplementedError( "Only 'Private' and 'Social' tenures are supported for ABS rate calculation." )