overide the cleaned lookup from boolean false to None. merge in commits

This commit is contained in:
Michael Duong 2025-09-05 14:22:26 +01:00
commit c8c6453002
29 changed files with 904 additions and 148 deletions

View file

@ -1,55 +1,14 @@
from etl.epc.settings import DATA_ANOMALY_MATCHES as data_anon_matches
from etl.epc.settings import DATA_ANOMALY_MATCHES as data_anon_matches
class Definitions:
"""
This class contains some base attributes which are used across multiple other classes
"""
# Anomalies described here: https://epc.opendatacommunities.org/docs/guidance#glossary
DATA_ANOMALY_MATCHES = {
# Invalid reports are where the value provided is out of bounds, e.g. a negative energy rating of -1199 or a
# non-integer, there is no valid energy band for this, so it is marked as INVALID!
"INVALID",
"INVALID!",
# When the energy certificate was first lodged on the register there was no requirement to lodge this data
# item, i.e. a non-mandatory item.
"NO DATA!",
"NODATA!",
# When the energy certificate was first lodged on the register there was no requirement to lodge this data item,
# i.e.a non - mandatory item.
"N/A",
# A value generated by the register to account for a data item that was not mandatory when the lodgement of
# the energy certificate occurred. When the data item became mandatory the register operator, for backwards
# compatibility purposes, populated the data field with a value of not recorded to ensure that the energy
# certificate retrieval process is successfully completed. Mandatory data items cannot be applied
# retrospectively to energy certificates lodged before the date of the change.
"Not recorded",
# The data also contains DECs with an operational rating of 9999 (a default DEC). The production of a
# default DEC value was allowed to enable building occupiers, with poor quality or no energy data,
# the opportunity to comply with the regulations. From April 2011 the ability to lodge a default DEC was no
# longer allowed.
"9999",
# The Building Emission Rate (BER) data field for non-domestic buildings may contain a blank value. The BER
# was only lodged on the register from 7 March 2010.
"Blank"
# There are currently just over 8,600 records where the local authority identifier is null. This is due to
# the Register Operator not being able to match the building address in the Markermap Ordinance Survey (GB)
# lookup tables or OS MasterMap Address Layer 2 data. The majority of these addresses have been requested
# manually by energy assessors for inclusion by the Register Operator in the registers (e.g. new builds,
# etc). These records are being published for completeness. An ongoing process to manage these manually added
# addresses will take time to develop to deal with these and future anomalies.
#
# There are several fields within the lodged data where it is possible to enter multiple entries to cater for
# different data_types of build within a single property, i.e. extensions. This results in multiple entries for
# the description fields for floor, roof and wall. For the purposes of this data release only the information
# contained within the first of these multiple entries is being provided. As there are no restrictions on the
# value in this first field it means that sometimes the first field in a multiple entry description field may
# contain a null value. A resolution to correct these anomalies will be considered for future data releases.
"NULL",
# We sometimes see fields populated with just an empty string.
"",
# An older value which rarely shows up but has been seen in the data.
"UNKNOWN",
"Unknown",
}
DATA_ANOMALY_MATCHES = data_anon_matches
DATA_ANOMALY_SUBSTRINGS = {
# Where values in a pick list that have been superseded by another value. For example, where a value for

View file

@ -1376,10 +1376,16 @@ class AssetList:
# 3) We don't remove anything that haas access issues yet
if self.non_intrusives_present:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"])
)
if self.new_format_non_insturives_present_v2:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL", "EMPTY CAVITY"])
)
else:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"])
)
elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(

View file

@ -59,25 +59,25 @@ def app():
Property UPRN
"""
# Colchester
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester/Aug2025 202 inspections"
data_filename = "Colchester Borough Homes - Inspections - Additional 202 Addresses JW 280725 copy.xlsx"
sheet_name = "Extra 202 Colchester Addresses"
postcode_column = 'domna_postcode'
address1_column = "domna_address_1"
# Lambeth
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lambeth"
data_filename = "LAMBETH Asset List ( Incomplete).xlsx"
sheet_name = "Green properties"
postcode_column = 'SX3 Postcode'
address1_column = "SX3 Short Address"
address1_method = None
fulladdress_column = "domna_full_address"
address_cols_to_concat = []
fulladdress_column = None
address_cols_to_concat = ["SX3 Short Address"]
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = None
landlord_property_type = "landlord_property_type"
landlord_built_form = "landlord_built_form"
landlord_property_type = "Property Type"
landlord_built_form = None
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "landlord_property_id"
landlord_property_id = "row_id"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@ -91,7 +91,41 @@ def app():
phase = False
ecosurv_landlords = None
asset_list_header = 0
landlord_block_reference = "landlord_block_reference"
landlord_block_reference = None
# # Colchester
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester/Aug2025 202 inspections"
# data_filename = "Colchester Borough Homes - Inspections - Additional 202 Addresses JW 280725 copy.xlsx"
# sheet_name = "Extra 202 Colchester Addresses"
# postcode_column = 'domna_postcode'
# address1_column = "domna_address_1"
# address1_method = None
# fulladdress_column = "domna_full_address"
# address_cols_to_concat = []
# missing_postcodes_method = None
# landlord_year_built = None
# landlord_os_uprn = None
# landlord_property_type = "landlord_property_type"
# landlord_built_form = "landlord_built_form"
# landlord_wall_construction = None
# landlord_roof_construction = None
# landlord_heating_system = None
# landlord_existing_pv = None
# landlord_property_id = "landlord_property_id"
# landlord_sap = None
# outcomes_filename = None
# outcomes_sheetname = None
# outcomes_postcode = None
# outcomes_houseno = None
# outcomes_id = None
# outcomes_address = None
# master_filepaths = []
# master_id_colnames = []
# master_to_asset_list_filepath = None
# phase = False
# ecosurv_landlords = None
# asset_list_header = 0
# landlord_block_reference = "landlord_block_reference"
# # Abri
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Abri/Post Inspections"
@ -128,38 +162,38 @@ def app():
# landlord_block_reference = None
# Freebridge
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Freebridge"
# data_filename = "Domna - FCH property data May 25 copy.xlsx"
# sheet_name = "EPC Data"
# postcode_column = 'Post Code'
# address1_column = "Address 1"
# address1_method = None
# fulladdress_column = None
# address_cols_to_concat = ["Address 1", "Address 4"]
# missing_postcodes_method = None
# landlord_year_built = "Build Date"
# landlord_os_uprn = None
# landlord_property_type = "Property Type"
# landlord_built_form = None
# landlord_wall_construction = "Walls Description"
# landlord_heating_system = "Heating Type"
# landlord_existing_pv = None
# landlord_property_id = "Place Ref"
# landlord_roof_construction = "Roof Description"
# landlord_sap = "Current SAP"
# outcomes_filename = []
# outcomes_sheetname = []
# outcomes_postcode = []
# outcomes_houseno = []
# outcomes_address = []
# outcomes_id = []
# master_filepaths = []
# master_to_asset_list_filepath = None
# asset_list_header = 0
# landlord_block_reference = None
# master_id_colnames = []
# phase = True # Inspections not complete, produce a partial view
# ecosurv_landlords = None
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Freebridge/Aug2025 programme"
data_filename = "Domna - FCH property data May 25 copy.xlsx"
sheet_name = "EPC Data"
postcode_column = 'Post Code'
address1_column = "Address 1"
address1_method = None
fulladdress_column = None
address_cols_to_concat = ["Address 1", "Address 4"]
missing_postcodes_method = None
landlord_year_built = "Build Date"
landlord_os_uprn = None
landlord_property_type = "Property Type"
landlord_built_form = None
landlord_wall_construction = "Walls Description"
landlord_heating_system = "Heating Type"
landlord_existing_pv = None
landlord_property_id = "Place Ref"
landlord_roof_construction = "Roof Description"
landlord_sap = "Current SAP"
outcomes_filename = []
outcomes_sheetname = []
outcomes_postcode = []
outcomes_houseno = []
outcomes_address = []
outcomes_id = []
master_filepaths = []
master_to_asset_list_filepath = None
asset_list_header = 0
landlord_block_reference = None
master_id_colnames = []
phase = False # Inspections not complete, produce a partial view
ecosurv_landlords = None
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Broadlands"
# data_filename = "Broadlands Asset List.xlsx"

View file

@ -341,5 +341,7 @@ PROPERTY_MAPPING = {
'house': 'house',
'block of flats': 'block of flats',
'bungalow': 'bungalow',
'flat': 'flat'
'flat': 'flat',
'FLA': 'flat',
'HOU': 'house'
}

View file

@ -323,7 +323,9 @@ class Funding:
def _calculate_full_project_abs(self, floor_area_band: str, starting_sap_band: str, ending_sap_band: str):
if starting_sap_band == ending_sap_band:
if (starting_sap_band == ending_sap_band) or (
starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]
):
return 0
data = self.project_scores_matrix[
@ -356,11 +358,13 @@ class Funding:
# Format the starting U-value according to special rules
if closest_starting == 0.45:
starting_str = "2"
starting_str = "0.45"
elif closest_starting == 2.00:
starting_str = "2.0"
elif closest_starting == 1.70:
starting_str = "1.7"
elif closest_starting == 1:
starting_str = "1.0"
else:
starting_str = f"{closest_starting:.2f}"
@ -519,6 +523,7 @@ class Funding:
current_wall_uvalue: float = None,
is_partial: bool = False,
existing_li_thickness: float = None,
has_no_system: bool = False,
):
"""
Calculate the partial project ABS score for a single measure.
@ -564,6 +569,10 @@ class Funding:
measure_code = "LI_lessequal100" if existing_li_thickness <= 100 else "LI_greater100"
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == measure_code]
# There's no funding for EPC C or above
if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
if pps.shape[0] != 1:
raise ValueError(f"Invalid LI category: {measure_code}")
return pps.squeeze()["Cost Savings"]
@ -584,12 +593,21 @@ class Funding:
return pps.squeeze()["Cost Savings"]
if measure_type == "suspended_floor_insulation":
if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
# We don't fund SFI for properties starting at C or above
return 0
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == "UFI"]
if pps.shape[0] != 1:
raise ValueError("Invalid UFI category")
return pps.squeeze()["Cost Savings"]
if measure_type == "solid_floor_insulation":
if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
# We don't fund SFI for properties starting at C or above
return 0
pps = filtered_pps_matrix[filtered_pps_matrix["Measure_Type"] == "SFI"]
if pps.shape[0] != 1:
raise ValueError("Invalid SFI category")
@ -600,9 +618,20 @@ class Funding:
(filtered_pps_matrix["Measure_Type"] == "Solar_PV") &
(filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system)
]
if solar_pps_df.empty and self.starting_sap_band in [
"Low_C", "High_C", "Low_B", "High_B", "Low_B", "High_A", "Low_A"
]:
# No funding for EPC C or above
return 0
return solar_pps_df.squeeze()["Cost Savings"]
if measure_type == "air_source_heat_pump":
# No funding for EPC C or above
if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
pps = filtered_pps_matrix[
(filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system) &
(filtered_pps_matrix["Post_Main_Heating_Source"] == "Air to Water ASHP") &
@ -643,6 +672,9 @@ class Funding:
return 0
if measure_type in ["double_glazing", "secondary_glazing"]:
# No funding for EPC C or above
if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"]:
return 0
# pps is under the WG_singletodouble Measure_Type
pps = filtered_pps_matrix[
filtered_pps_matrix["Measure_Type"] == "WG_singletodouble"
@ -674,6 +706,36 @@ class Funding:
# If we don't have a pre heating system, we assume the measure is not applicable
return 0
if measure_type == "boiler_upgrade":
# We don't have funding for a gas to gas boiler upgrade unless it's first time central heating
if pre_heating_system == "Condensing Gas Boiler":
return 0
if has_no_system:
pps = filtered_pps_matrix[
(filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system) &
(filtered_pps_matrix["Post_Main_Heating_Source"] == "Condensing Gas Boiler") &
(filtered_pps_matrix["Measure_Type"] == "B_First_Time_CH")
]
else:
pps = filtered_pps_matrix[
(filtered_pps_matrix["Pre_Main_Heating_Source"] == pre_heating_system) &
(filtered_pps_matrix["Post_Main_Heating_Source"] == "Condensing Gas Boiler")
# (filtered_pps_matrix["Measure_Type"] == "B_Upgrade_preHCs")
]
# Depending on different systems, e.g. room heaters, we take the best options
if pps.shape[0] > 1:
pps = pps[pps["Cost Savings"] == min(pps["Cost Savings"])].head(1)
# No funding for EPC C or above
if self.starting_sap_band in ["Low_C", "High_C", "Low_B", "High_B", "Low_A", "High_A"] or pps.empty:
return 0
if pps.shape[0] != 1:
raise ValueError("something went wrong, more than one pps for boiler upgrade")
return pps.squeeze()["Cost Savings"]
raise ValueError(f"Invalid measure type for partial project ABS calculation: {measure_type}")
# -----------------------
@ -1105,6 +1167,8 @@ class Funding:
pre_heating_system = self._map_to_pre_main_heating(mainheating, main_fuel, mainheat_energy_eff)
has_no_system = mainheating["has_no_system_present"]
measure_type = measure["measure_type"]
pps = self.calculate_partial_project_abs(
@ -1113,7 +1177,8 @@ class Funding:
is_partial=is_partial,
existing_li_thickness=existing_li_thickness,
filtered_pps_matrix=filtered_pps_matrix,
pre_heating_system=pre_heating_system
pre_heating_system=pre_heating_system,
has_no_system=has_no_system
)
innovation_uplift = pps * measure["innovation_rate"]

View file

@ -624,7 +624,10 @@ class Property:
if len(attributes) == 0:
# We attempt to perform the clean on the fly
cleaner_cls = all_cleaner_map[description]
cleaner_cls = cleaner_cls(self.data[description])
if description == "lighting-description":
cleaner_cls = cleaner_cls(self.data[description], averages=None)
else:
cleaner_cls = cleaner_cls(self.data[description])
processed = {
"original_description": self.data[description],
"clean_description": cleaner_cls.description.replace(
@ -1165,7 +1168,8 @@ class Property:
'heat pump': 'Electricity',
'solid fuel boiler': 'Solid Fuel',
'solid fuel range cooker': 'Solid Fuel',
'room heaters': 'Varied' # Could be any fuel, further specifics needed based on context
'room heaters': 'Varied', # Could be any fuel, further specifics needed based on context
"single-point gas": "Natural Gas"
}
# Define a mapping from system types to general categories or modifications of fuel types
@ -1176,6 +1180,11 @@ class Property:
'community scheme': 'Community Scheme'
}
hotwater_appliance_to_fuel = {
'gas range cooker': 'Natural Gas',
'oil range cooker': 'Oil'
}
self.heating_energy_source = list({
fuel for key, fuel in heating_fuel_mapping.items() if self.main_heating.get(key, False)
})
@ -1202,8 +1211,13 @@ class Property:
self.heating_energy_source = self.heating_energy_source[0]
if self.heating_energy_source == "Varied (Community Scheme)":
if self.main_fuel["fuel_type"] in ["mains gas", None]: # We assume when None as it's unknown
self.heating_energy_source = "Natural Gas (Community Scheme)"
fuel_map = {
None: "Natural Gas (Community Scheme)",
"mains gas": "Natural Gas (Community Scheme)",
"biomass": "Smokeless Fuel",
}
if self.main_fuel["fuel_type"] in fuel_map: # We assume when None as it's unknown
self.heating_energy_source = fuel_map[self.main_fuel["fuel_type"]]
else:
raise Exception("Implement me")
@ -1213,8 +1227,7 @@ class Property:
if self.hotwater["extra_features"] == "plus solar":
self.hot_water_energy_source = self.heating_energy_source + " + Solar Thermal"
return
else:
elif self.hotwater["system_type"] is not None:
fuel = system_type_modification[self.hotwater["system_type"]]
if self.hotwater["extra_features"] == "plus solar":
@ -1229,6 +1242,8 @@ class Property:
self.hot_water_energy_source = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[secondary_heating]["fuel"]
else:
raise Exception("Investiage me")
else:
self.hot_water_energy_source = hotwater_appliance_to_fuel[self.hotwater["appliance"]]
def is_ashp_valid(self, measures):

View file

@ -900,8 +900,7 @@ class GoogleSolarApi:
return input_properties
@classmethod
def default_panel_performance(cls, property_instance):
def default_panel_performance(self, property_instance):
"""
In a small number of cases, where properties have simulated uprns, we do not have a longitude and latitude
value and therefore we just return a default panel performance
@ -911,6 +910,20 @@ class GoogleSolarApi:
cost_instance = Costs(property_instance=property_instance)
material_1_6 = next(
(m for m in self.solar_materials if m["type"] == "solar_pv" and
abs(m["size"] - 1.6) < 0.1 and not m["includes_battery"]),
None
)
material_3_2 = next(
(m for m in self.solar_materials if m["type"] == "solar_pv" and
abs(m["size"] - 3.2) < 0.1 and not m["includes_battery"]),
None
)
if material_1_6 is None or material_3_2 is None:
raise ValueError("No suitable solar product found for the default configuration.")
# We return a 1.6 and 3.2 kwp system
panel_performance = pd.DataFrame(
[
@ -918,7 +931,12 @@ class GoogleSolarApi:
'n_panels': 8,
'yearly_dc_energy': 3200 * assumptions.MEDIAN_WATTAGE_TO_DC,
'total_cost': cost_instance.solar_pv(
n_panels=8, has_battery=False, n_floors=property_instance.number_of_floors
solar_product=material_1_6,
scaffolding_options=[
{"total_cost": 1000, "size": property_instance.number_of_floors},
{"total_cost": 1000, "size": 3}
],
n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
'panneled_roof_area': 8 * assumptions.RDSAP_AREA_PER_PANEL,
@ -938,7 +956,12 @@ class GoogleSolarApi:
'n_panels': 4,
'yearly_dc_energy': 1600 * assumptions.MEDIAN_WATTAGE_TO_DC,
'total_cost': cost_instance.solar_pv(
n_panels=6, has_battery=False, n_floors=property_instance.number_of_floors
solar_product=material_3_2,
scaffolding_options=[
{"total_cost": 1000, "size": property_instance.number_of_floors},
{"total_cost": 1000, "size": 3}
],
n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
'panneled_roof_area': 4 * assumptions.RDSAP_AREA_PER_PANEL,

View file

@ -73,6 +73,11 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
"Electric storage heaters, Room heaters, electric": {"fuel": "Electricity", "cop": 1},
'Boiler and underfloor heating, oil': {"fuel": "Oil", "cop": 0.85},
"Boiler and radiators, smokeless fuel": {"fuel": "Smokeless Fuel", "cop": 0.85},
"Boiler and radiators, mains gas, Boiler and underfloor heating, mains gas": {"fuel": "Natural Gas", "cop": 0.85},
"Electric ceiling heating, electric": {"fuel": "Electricity", "cop": 1},
"Air source heat pump, warm air, electric": {
"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100
}
}
# These are the measure types where if there is a ventilation recommendation, we force the inclusion of it

View file

@ -40,6 +40,9 @@ def upload_funding(session: Session, p, plan_id, recommendations_to_upload):
part_type = "cavity_wall_insulation"
if part_type == "sealing_open_fireplace":
part_type = "sealing_fireplace"
if part == "low_energy_lighting":
part_type = "low_energy_lighting_installation"
funding_measures_data.append({
"funding_package_id": funding_package_id,
"measure": part_type,

View file

@ -894,6 +894,7 @@ async def model_engine(body: PlanTriggerRequest):
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
@ -942,6 +943,7 @@ async def model_engine(body: PlanTriggerRequest):
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
@ -949,7 +951,8 @@ async def model_engine(body: PlanTriggerRequest):
funded_measures = optimal_solution["items"] if scheme != "none" else []
solution = optimal_solution["items"] + optimal_solution["unfunded_items"]
# This is the total amount of funding that the project will produce (including uplifts) (£)
project_funding = optimal_solution["full_project_funding"]
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected

View file

@ -0,0 +1,531 @@
import ast
import json
from copy import deepcopy
from dataclasses import replace
from datetime import datetime
import random
from tqdm import tqdm
import pandas as pd
import numpy as np
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property, create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details
)
from backend.app.db.functions.recommendations_functions import (
create_plan, upload_recommendations, create_scenario
)
from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import get_cleaned
from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
import recommendations.optimiser.optimiser_functions as optimiser_functions
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
from backend.ml_models.Valuation import PropertyValuation
from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.Funding import Funding
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
# Input data (temp)
import pickle
import pandas as pd
with open("local_data_for_deletion.pkl", 'rb') as f:
local_data = pickle.load(f)
cleaning_data = local_data["cleaning_data"]
materials = local_data["materials"]
cleaned = local_data["cleaned"]
project_scores_matrix = local_data["project_scores_matrix"]
partial_project_scores_matrix = local_data["partial_project_scores_matrix"]
whlg_eligible_postcodes = local_data["whlg_eligible_postcodes"]
with open("kwh_client_for_deletion.pkl", "rb") as f:
kwh_client = pickle.load(f)
epc_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates/domestic-E06000002-Middlesbrough/certificates.csv",
low_memory=False
)
# TODO: Store this for cleaning
costs_by_floor_area = epc_data[
pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2024-01-01"
][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT",
"HOT_WATER_COST_CURRENT"]].copy()
costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns]
for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]:
costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"]
costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[
["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"]
].mean().reset_index()
sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample(
1000).reset_index(drop=True)
# TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type
# TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used
# in the google solar api but is it really needed? I don't think it's super accurate. It might be better to
# just use an average energy consumption by floor area for UK households?
# Load the input properties
input_properties = []
for row_id, config in tqdm(sample_epc_data.iterrows(), total=len(sample_epc_data)):
epc = {
k.lower().replace("_", "-"): v if not pd.isnull(v) else None for k, v in config.items()
}
# Avoid the data load inside of EPCRecord - something we should pull out
for x in ["number-habitable-rooms", "floor-height", "number-heated-rooms"]:
if pd.isnull(epc[x]):
if x == "floor-height":
epc[x] = 2.4
if x == "number-habitable-rooms":
epc[x] = 3
if x == "number-heated-rooms":
epc[x] = 3
epc_records = {'original_epc': epc, 'full_sap_epc': {}, 'old_data': []}
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data,
)
input_properties.append(
Property(
id=row_id,
is_new=True,
address=epc["address"],
postcode=epc["postcode"],
epc_record=prepared_epc,
already_installed={},
property_valuation={},
non_invasive_recommendations=[],
energy_assessment=None,
**Property.extract_kwargs(config), # TODO: Depraecate this
)
)
# For each property, insert the default solar configuration
for p in tqdm(input_properties):
solar_api = GoogleSolarApi(
api_key=None, solar_materials=[m for m in materials if m["type"] == "solar_pv"], max_retries=5
)
panel_performance = solar_api.default_panel_performance(property_instance=p)
p.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": None, "panel_performance": panel_performance, "unit_share_of_energy": 1
},
)
# We mock kwh preds
mocked_kwh_predictions = {"heating_kwh_predictions": [], "hotwater_kwh_predictions": []}
for p in tqdm(input_properties):
mocked_kwh_predictions["heating_kwh_predictions"].append({
"id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0]
})
mocked_kwh_predictions["hotwater_kwh_predictions"].append({
"id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0]
})
mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["heating_kwh_predictions"])
mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"])
# TODO: We might want to implement this generally, via an ETL process
for p in input_properties:
for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]:
if pd.isnull(p.data[col]):
min_diff = abs(
(costs_by_floor_area["current-energy-efficiency"] - p.data["current-energy-efficiency"])
).min()
df = costs_by_floor_area[
abs((costs_by_floor_area["current-energy-efficiency"] - p.data[
"current-energy-efficiency"])) == min_diff
]
if df.shape[0] > 1:
df = df.head(1)
p.data[col] = (df[col + "_scaled"] * p.data["total-floor-area"]).values[0]
[
p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) for p in
input_properties
]
# for p in input_properties:
# p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions)
# Run the recommendations
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
if p.data["property-type"] == "House" and pd.isnull(p.data["built-form"]):
p.data["built-form"] = "Semi-Detached"
recommender = Recommendations(
property_instance=p,
materials=materials,
exclusions=[],
inclusions=[],
default_u_values=True
)
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:
continue
recommendations[p.id] = property_recommendations
representative_recommendations[p.id] = property_representative_recommendations
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(
property_recommendations, property_representative_recommendations
)
recommendations_scoring_data.extend(p.recommendations_scoring_data)
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=[
"rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"
]
)
model_predictions_mocked = {
"sap_change_predictions": None,
"heat_demand_predictions": None,
"carbon_change_predictions": None,
"heating_kwh_predictions": None,
"hotwater_kwh_predictions": None,
}
for k in model_predictions_mocked.keys():
model_predictions_mocked[k] = recommendations_scoring_data[["id"]].copy()
model_predictions_mocked[k][['property_id', 'recommendation_id']] = (
model_predictions_mocked[k]['id'].str.split('+', expand=True)
)
model_predictions_mocked[k]['phase'] = model_predictions_mocked[k]['recommendation_id'].apply(
ModelApi.extract_phase)
if k in ["heating_kwh_predictions", "hotwater_kwh_predictions"]:
model_predictions_mocked[k]["predictions"] = random.choices(range(100, 3000),
k=len(recommendations_scoring_data))
continue
model_predictions_mocked[k] = model_predictions_mocked[k].sort_values(["property_id", "phase"], ascending=True)
preds = []
for p_id in model_predictions_mocked[k]["property_id"].unique():
# We add some amount each time
p = [p for p in input_properties if str(p.id) == p_id][0]
if k == "sap_change_predictions":
start = p.data["current-energy-efficiency"]
elif k == "heat_demand_predictions":
start = p.data["energy-consumption-current"]
else:
start = p.data["co2-emissions-current"]
df = model_predictions_mocked[k][model_predictions_mocked[k]["property_id"] == p_id].copy()
# Add some amount each time
to_add = random.choices(range(0, 15), k=len(df))
to_add = np.cumsum(to_add)
df["predictions"] = start + to_add
preds.append(df)
preds = pd.concat(preds)
model_predictions_mocked[k] = preds
for property_id in tqdm(recommendations.keys(), total=len(recommendations)):
property_instance = [p for p in input_properties if p.id == property_id][0]
recommendations_with_impact, impact_summary = (
Recommendations.calculate_recommendation_impact(
property_instance=property_instance,
all_predictions=model_predictions_mocked,
recommendations=recommendations,
representative_recommendations=representative_recommendations
)
)
# We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc
# at each phase
property_instance.update_simulation_epcs(impact_summary)
recommendations[property_id] = recommendations_with_impact
for property_id in tqdm([p.id for p in input_properties]):
property_recommendations = recommendations.get(property_id, [])
property_instance = [p for p in input_properties if p.id == property_id][0]
property_current_energy_bill = (
Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
kwh_simulation_predictions=model_predictions_mocked,
property_recommendations=property_recommendations,
ashp_cop=2.8
)
)
property_instance.current_energy_bill = property_current_energy_bill
body = PlanTriggerRequest(
**{'budget': None, 'goal': 'Increasing EPC', 'housing_type': 'Social', 'goal_value': 'B', 'portfolio_id': 0,
'trigger_file_path': '', 'already_installed_file_path': '',
'patches_file_path': None, 'non_invasive_recommendations_file_path': None,
'valuation_file_path': '',
'required_measures': [], 'scenario_name': 'EPC B', 'scenario_id': None,
'multi_plan': True, 'optimise': True, 'default_u_values': True, 'ashp_cop': 2.8,
'event_type': 'remote_assessment', 'simulate_sap_10': False, 'file_type': None, 'file_format': None,
'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None}
)
for p in tqdm(input_properties):
if not recommendations.get(p.id):
continue
# we need to double unlist because we have a list of lists
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# its inclusion
needs_ventilation = any(
x in property_measure_types for x in assumptions.measures_needing_ventilation
) and not p.has_ventilation
if not measures_to_optimise:
# Nothing to do, we just reshape the recommendations
recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
p.id, recommendations, set()
)
continue
fixed_gain = optimiser_functions.calculate_fixed_gain(
property_required_measures, recommendations, p, needs_ventilation
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain)
funding = Funding(
tenure="Social",
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=12.5,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=12.5,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
gbis_private_cavity_abs_rate=21,
gbis_private_solid_abs_rate=28,
)
li_thickness = convert_thickness_to_numeric(
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
)
current_wall_u_value = p.walls["thermal_transmittance"]
if current_wall_u_value is None:
current_wall_u_value = get_wall_u_value(
clean_description=p.walls["clean_description"],
age_band=p.age_band,
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
)
# We insert the innovation uplift
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
(
r["partial_project_score"],
r["partial_project_funding"],
r["innovation_uplift"],
r["uplift_project_score"],
) = (
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=p.data["current-energy-efficiency"],
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
)
input_measures = optimiser_functions.prepare_input_measures(
measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True
)
# When the goal is Increasing EPC, we can run the funding optimiser
if body.goal == "Increasing EPC":
solutions = optimise_with_funding_paths(
p=p,
input_measures=input_measures,
housing_type=body.housing_type,
budget=body.budget,
target_gain=gain,
funding=funding
)
# Given the solutions we select the optimal one
solutions["cost_less_full_project_funding"] = np.where(
solutions["scheme"] == "eco4",
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"],
solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"]
)
solutions["cost_less_full_project_funding"] = (
solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"]
)
solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True)
if solutions["meets_upgrade_target"].any():
# If we have a solution that meets the upgrade target, we select that one
optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0]
else:
# Pick the cheapest
optimal_solution = solutions.iloc[0]
# This is the list of measures that we will recommend
scheme = optimal_solution["scheme"]
funded_measures = optimal_solution["items"] if scheme != "none" else []
solution = optimal_solution["items"] + optimal_solution["unfunded_items"]
# This is the total amount of funding that the project will produce (including uplifts) (£)
project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \
optimal_solution["partial_project_funding"]
# This is the total amount of funding associated to the uplift (£)
total_uplift = optimal_solution["total_uplift"]
# This is the funding scheme selected
# This is the full project ABS
full_project_score = optimal_solution["project_score"]
# This is the partial project ABS
partial_project_score = optimal_solution["partial_project_score"]
# This is the uplift score ABS
uplift_project_score = optimal_solution["total_uplift_score"]
else:
# We optimise and then we determine eligibility for funding, based on the measures selected
optimiser = (
GainOptimiser(
input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False
) if body.budget else CostOptimiser(input_measures, min_gain=gain)
)
optimiser.setup()
optimiser.solve()
solution = optimiser.solution
recommendation_types = []
for measures in input_measures:
for measure in measures:
recommendation_types.append(measure["type"])
recommendation_types = set(recommendation_types)
has_wall_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
WALL_INSULATION_MEASURES
)
has_roof_insulation_recommendation = any(
(m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in
ROOF_INSULATION_MEASURES
)
funding.check_funding(
measures=solution,
starting_sap=p.data["current-energy-efficiency"],
ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
)
# Determine the scheme
scheme = "none"
if funding.eco4_eligible:
scheme = "eco4"
if scheme == "none" and funding.gbis_eligible:
scheme = "gbis"
funded_measures = solution if scheme in ["gbis", "eco4"] else []
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
total_uplift = funding.eco4_uplift
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
selected = {r["id"] for r in solution}
if property_required_measures:
solution = optimiser_functions.add_required_measures(
property_id=p.id, property_required_measures=property_required_measures,
recommendations=recommendations, selected=selected,
)
# Add best practice measures (ventilation/trickle vents)
selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected)
# Final flattening - Don't do this!
# recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults(
# p.id, recommendations, selected
# )
# TODO: functionise
for measure in funded_measures:
if "+mechanical_ventilation" in measure["type"]:
measure["type"] = measure["type"].split("+mechanical_ventilation")[0]
p.insert_funding(
scheme=scheme,
funded_measures=funded_measures,
project_funding=project_funding,
total_uplift=total_uplift,
full_project_score=full_project_score,
partial_project_score=partial_project_score,
uplift_project_score=uplift_project_score
)

View file

@ -1,7 +1,6 @@
from pathlib import Path
import numpy as np
import pandas as pd
from BaseUtility import Definitions
from etl.epc.settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
@ -22,6 +21,7 @@ from etl.epc.settings import (
ENDING_SUFFIX_COMPONENT_COLS,
POTENTIAL_COLUMNS,
EFFICIENCY_FEATURES,
DATA_ANOMALY_MATCHES
)
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
@ -249,8 +249,7 @@ class EPCDataProcessor:
# Map all anomaly values to None
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
DATA_ANOMALY_MATCHES, [None] * len(DATA_ANOMALY_MATCHES),
)
)
@ -285,7 +284,7 @@ class EPCDataProcessor:
@staticmethod
def clean_construction_age_band(x):
# Firstly, we check if it's an error value
if x in Definitions.DATA_ANOMALY_MATCHES or x in [None, np.nan]:
if x in DATA_ANOMALY_MATCHES or x in [None, np.nan]:
return x
# Next, we check if it's a value in our map

View file

@ -713,6 +713,9 @@ class TrainingDataset(BaseDataset):
for x in missing_descriptions:
desc_cleaner = cleaner(x)
cleaned = desc_cleaner.process()
# IF NODATA, REMAP TO NONE VALUES
if all((pd.DataFrame(cleaned, index=[0]).T)[0] == False):
cleaned = {key: None for key in cleaned.keys()}
cleaned_data.append(
{
"original_description": x,

View file

@ -76,6 +76,41 @@ new_walls_description_mapping.loc[
clean_lookup["walls-description"] = new_walls_description_mapping.to_dict(
orient="records"
)
# TODO: THIS IS A TEMPORARY FIX
new_mainheatcont_mapping = pd.DataFrame(clean_lookup["mainheatcont-description"])
new_mainheatcont_mapping.loc[
new_mainheatcont_mapping["original_description"] == "SAP:Main-Heating-Controls",
[
"thermostatic_control",
"charging_system",
"switch_system",
"no_control",
"dhw_control",
"community_heating",
"multiple_room_thermostats",
"auxiliary_systems",
"trvs",
"rate_control",
],
] = None
new_mainheatcont_mapping.loc[
new_mainheatcont_mapping["original_description"] == " ",
[
"thermostatic_control",
"charging_system",
"switch_system",
"no_control",
"dhw_control",
"community_heating",
"multiple_room_thermostats",
"auxiliary_systems",
"trvs",
"rate_control",
],
] = None
clean_lookup["mainheatcont-description"] = new_mainheatcont_mapping.to_dict(
orient="records"
)
class EPCPipeline:

View file

@ -593,6 +593,15 @@ class EPCRecord:
self.prepared_epc["total-floor-area"]
)
# We handle the edge case of floor area being 0. We set it to zero and it is cleaned by
# _clean_with_data_processor
if self.prepared_epc['total-floor-area'] == 0:
print(
"Edge case of floor area being zero - will set to none and will be cleaned in "
"_clean_with_data_processor"
)
self.prepared_epc['total-floor-area'] = None
def _clean_mains_gas(self):
"""
This method will clean the mains gas, if empty or invalid
@ -668,7 +677,7 @@ class EPCRecord:
for attribute in fields:
value = self.prepared_epc[attribute]
if value in DATA_ANOMALY_MATCHES:
if value in DATA_ANOMALY_MATCHES or pd.isnull(value):
if attribute in null_attributes:
value = None
else:
@ -751,6 +760,8 @@ class EPCRecord:
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] in ["Flat", "Maisonette"]:
self.prepared_epc["built-form"] = "End-Terrace"
else:
self.prepared_epc["built-form"] = "Semi-Detached"
def _clean_age_band(self):
"""

View file

@ -48,6 +48,8 @@ DATA_ANOMALY_MATCHES = {
None,
# An older value which rarely shows up but has been seen in the data.
"UNKNOWN",
#
"Unknown",
}
DATA_ANOMALY_SUBSTRINGS = {

View file

@ -75,9 +75,6 @@ class EpcClean:
]
]
# Average
filtered_data.groupby("lighting-description")["low-energy-lighting"].mean().reset_index()
# Convert low-energy-lighting to float
for row in filtered_data:
row["low-energy-lighting"] = float(row["low-energy-lighting"])
@ -93,7 +90,7 @@ class EpcClean:
# Scale to between 0 and 1
averages = [{
"lighting-description": correct_spelling(description.lower()) / 100,
"lighting-description": correct_spelling(description.lower()),
"low-energy-lighting": total / counts[description] / 100
} for description, total in sums.items()]

View file

@ -20,6 +20,7 @@ class HotWaterAttributes(Definitions):
'solid fuel range cooker',
'room heaters', # Generic/unspecified category
'electric multipoint',
'single-point gas',
]
# SYSTEM_TYPES refer to the larger system within which the heater operates.
@ -29,6 +30,7 @@ class HotWaterAttributes(Definitions):
# The hot water is provided by a secondary (or supplementary) heating system in the building
'from second main heating system', # Same as 'from secondary system'
'community scheme', # The hot water is provided by a community heating system
"water heater",
]
# THERMOSTAT_CHARACTERISTICS refer to features related to temperature control in the system.

View file

@ -28,6 +28,9 @@ class LightingAttributes(Definitions):
self.nodata = (not description) or (description in self.DATA_ANOMALY_MATCHES) or (
description in self.OBSERVED_ERRORS) or (description == "SAP05:Lighting")
if description == "SAP05:Lighting":
self.description = description # Reset self.description
def welsh_translation_search(self):
"""
For welsh text describing the percentage of low energy lighting, we match the regular

View file

@ -128,6 +128,7 @@ class MainheatControlAttributes(Definitions):
]
def __init__(self, description: str):
self.description: str = clean_description(description.lower()).strip()
self.nodata = not self.description or description in self.DATA_ANOMALY_MATCHES or (
description in self.NO_DATA_DESCRIPTIONS

View file

@ -17,5 +17,5 @@ all_cleaner_map = {
'roof-description': RoofAttributes,
'walls-description': WallAttributes,
'windows-description': WindowAttributes,
'lighting-description:': LightingAttributes,
'lighting-description': LightingAttributes,
}

View file

@ -222,6 +222,11 @@ hotwater_cases = [
{'original_description': 'Electric multipoint', 'heater_type': 'electric multipoint', 'system_type': None,
'thermostat_characteristics': None,
'heating_scope': None, 'energy_recovery': None, 'tariff_type': None, 'extra_features': None, 'chp_systems': None,
'distribution_system': None, 'no_system_present': None, 'appliance': None, 'assumed': False}
'distribution_system': None, 'no_system_present': None, 'appliance': None, 'assumed': False},
{'original_description': 'Single-point gas water heater, standard tariff',
'heater_type': 'single-point gas', 'system_type': "water heater", 'thermostat_characteristics': None,
'heating_scope': None, 'energy_recovery': None, 'tariff_type': 'standard tariff', 'extra_features': None,
'chp_systems': None, 'distribution_system': None, 'no_system_present': None, 'appliance': None
}
]

View file

@ -1,5 +1,8 @@
import numpy as np
from recommendations.county_to_region import county_to_region_map
from utils.logger import setup_logger
logger = setup_logger()
# This data comes from SPONs 2023
regional_labour_variations = [
@ -224,7 +227,9 @@ class Costs:
}.get(self.property.data["local-authority-label"].lower(), None)
if self.region is None:
raise ValueError("Region not found in county map")
logger.warning("No region found for county %s, defaulting to South East England",
self.property.data["county"])
self.region = "South East England"
self.labour_adjustment_factor = [
x["Adjustment_Factor"] for x in self.regional_labour_variations if

View file

@ -82,6 +82,14 @@ class HeatingRecommender:
"controls_prefix": ""
},
"dual": None
},
'Electric storage heaters, room heaters, electric': {
"hhr": {
"mainheating_description": "Electric storage heaters, radiators",
"recommendation_description": "Install high heat retention electric storage heaters.",
"controls_prefix": ""
},
"dual": None
}
}

View file

@ -693,6 +693,7 @@ class Recommendations:
if hotwater_description in [
"From main system", "From main system, no cylinder thermostat",
'From main system, waste water heat recovery'
]:
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": heating_fuel,

View file

@ -581,7 +581,7 @@ class RoofRecommendations:
**cost_result,
"already_installed": already_installed,
"survey": rir_non_invasive_recommendation.get("survey", None),
"innovation_rate": material.to_dict()["innovation_rate"]
"innovation_rate": material.innovation_rate
}
)

View file

@ -66,6 +66,7 @@ class WallRecommendations(Definitions):
"Solid brick, as built, partial insulation": "Solid brick, with external insulation",
"Cob, as built": "Cob, with external insulation",
"System built, as built, no insulation": "System built, with external insulation",
'System built, as built, partial insulation': "System built, with external insulation",
"Granite or whinstone, as built, no insulation": 'Granite or whinstone, with external insulation',
"Timber frame, as built, no insulation": "Timber frame, with external insulation",
'Timber frame, as built, partial insulation': 'Timber frame, with external insulation',
@ -81,6 +82,7 @@ class WallRecommendations(Definitions):
"Solid brick, as built, partial insulation": "Solid brick, with internal insulation",
"Cob, as built": "Cob, with internal insulation",
"System built, as built, no insulation": "System built, with internal insulation",
'System built, as built, partial insulation': "System built, with internal insulation",
"Granite or whinstone, as built, no insulation": 'Granite or whinstone, with internal insulation',
"Timber frame, as built, no insulation": "Timber frame, with internal insulation",
'Timber frame, as built, partial insulation': 'Timber frame, with internal insulation',

View file

@ -110,7 +110,9 @@ county_to_region_map = {
'West Oxfordshire': 'South East England', 'West Sussex': 'South East England', 'Winchester': 'South East England',
'Windsor and Maidenhead': 'South East England', 'Woking': 'South East England', 'Wokingham': 'South East England',
'Worthing': 'South East England', 'Wycombe': 'South East England',
'Bath and North East Somerset': 'South West England', 'Bournemouth': 'South West England',
'Bath and North East Somerset': 'South West England',
'Bournemouth': 'South West England',
'Bournemouth, Christchurch and Poole': 'South West England',
'Bristol': 'South West England',
'Cheltenham': 'South West England', 'Christchurch': 'South West England',
'City of Bristol': 'South West England',
@ -164,6 +166,7 @@ county_to_region_map = {
'Wakefield': 'Yorkshire and the Humber', 'West Yorkshire': 'Yorkshire and the Humber',
'York': 'Yorkshire and the Humber',
'Westmorland': 'North West England',
'Westmorland and Furness': 'North West England',
# Additional mappings requried, based on what we find in the EPC database
'Greater London Authority': 'Inner London',

View file

@ -21,7 +21,9 @@ from backend.Funding import Funding
logger = setup_logger()
# measures we DO NOT treat as fundable in the ECO4 'funded' pass
_ECO4_EXCLUDE_TYPES = {"secondary_heating", "extension_cavity_wall_insulation", "sealing_open_fireplace"}
_ECO4_EXCLUDE_TYPES = {
"secondary_heating", "extension_cavity_wall_insulation", "sealing_open_fireplace", "low_energy_lighting"
}
def _path_scheme(path_spec):
@ -75,8 +77,12 @@ def _sum_cost_gain_with_scheme(items, scheme):
return total_cost, total_gain
def violates_min_insulation(fixed):
"""Return True if fixed selection includes a heating/PV measure but no required insulation."""
def violates_min_insulation(fixed, optimisation_input_measures):
"""
Return True if fixed selection includes a heating/PV measure but no required insulation.
It should *only* violate min insulation if the fixed selection excldes insulation but the
property needs insulation
"""
picked_types = {opt["type"] for (_, _, opt) in fixed}
def has_any(substrs):
@ -104,7 +110,22 @@ def violates_min_insulation(fixed):
"room_roof_insulation",
])
return is_heating and not has_insul
def _needs_insulation(measures, t):
return _find_measure(measures, t) and not has_any({t})
needs_insul = any(
_needs_insulation(optimisation_input_measures, t)
for t in [
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
]
)
return is_heating and not has_insul and needs_insul
# Treat "type" like "external_wall_insulation+mechanical_ventilation" → "external_wall_insulation"
@ -276,10 +297,11 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
for fixed in fixed_selections:
if violates_min_insulation(fixed):
if violates_min_insulation(fixed, optimisation_input_measures):
# We log an error and skip this - we should not see any errors but we can probably get a reasonable
# outcome for the end user without a complete termination of the process
logger.error("Skipping fixed selection due to minimum insulation violation: %s", fixed)
blah
continue
scheme = _path_scheme(path_spec)
@ -329,25 +351,31 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
if picked is None:
continue
scheme = _path_scheme(path_spec)
total_cost = fixed_cost + sub_cost
total_gain = fixed_gain + sub_gain
total_picks = fixed_items + picked
unfunded_picked = []
if scheme == "gbis":
# The fixed items are fundded, everything else is unfunded
total_picks = fixed_items
unfunded_picked = picked
else:
total_picks = fixed_items + picked
if housing_type == "Private":
if not _prs_solution_ok(total_picks, p, funding):
if not _prs_solution_ok(total_picks, p, funding) and scheme == "eco4":
logger.error(
"Found a solution that does not meet the PRS requirements: %s - this shouldn't be happening",
total_picks
)
continue
scheme = _path_scheme(path_spec)
unfunded_picked = []
if total_gain - target_gain < -0.1:
# In this case, we have a funded package that does not meet the target gain, so we look at the remaining
# measures and see if we can include them
picked_types = {opt["type"] for opt in total_picks}
picked_types = {opt["type"] for opt in total_picks + unfunded_picked}
# We find the indexes of the picked types
picked_group_index = {}
@ -371,11 +399,13 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
if remaining:
# If we have remaining measures we can optimise, we run them down an unfunded route
unfunded_picked, unfunded_cost, unfunded_gain = run_optimizer(
unfunded_picked_remaining, unfunded_cost, unfunded_gain = run_optimizer(
remaining,
budget - total_cost if budget is not None else None,
sub_target_gain=target_gain - total_gain if target_gain is not None else None
)
if unfunded_picked_remaining is not None:
unfunded_picked += unfunded_picked_remaining
total_cost += unfunded_cost
total_gain += unfunded_gain
@ -796,6 +826,7 @@ def make_funding_paths(p, input_measures, housing_type, funding: Funding):
:param p: The property object containing details about the property, including main heating and controls.
:param input_measures:
:param housing_type:
:param funding: The funding object that provides methods to check eligibility and calculate funding.
:return:
"""
# We handle the case of minimum insulation requirements. Whenever we have a heating system recommendation,
@ -855,25 +886,27 @@ def make_funding_paths(p, input_measures, housing_type, funding: Funding):
return funding_paths, input_measures_innovation
if housing_type == "Private":
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# EWI or IWI
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 1) The package must include EWI or IWI if the property is private rental sector
# We check if we have any EWI or IWI measures available
ewi_or_iwi = [{"OR": []}]
reference_measures = []
# If we have EWI we add it in
if _find_measure(input_measures, "external_wall_insulation"):
ewi_or_iwi[0]["OR"].append("external_wall_insulation")
reference_measures.append("ewi")
# We check if we have any EWI or IWI measures available - only for EPC E or below
if p.data["current-energy-rating"] not in ["E", "F", "G"]:
ewi_or_iwi = [{"OR": []}]
reference_measures = []
# If we have EWI we add it in
if _find_measure(input_measures, "external_wall_insulation"):
ewi_or_iwi[0]["OR"].append("external_wall_insulation")
reference_measures.append("ewi")
if _find_measure(input_measures, "internal_wall_insulation"):
ewi_or_iwi[0]["OR"].append("internal_wall_insulation")
reference_measures.append("iwi")
if _find_measure(input_measures, "internal_wall_insulation"):
ewi_or_iwi[0]["OR"].append("internal_wall_insulation")
reference_measures.append("iwi")
if ewi_or_iwi[0]["OR"]:
ewi_or_iwi[0]["reference"] = "+".join(reference_measures) + ":eco4"
funding_paths.append(ewi_or_iwi)
if ewi_or_iwi[0]["OR"]:
ewi_or_iwi[0]["reference"] = "+".join(reference_measures) + ":eco4"
funding_paths.append(ewi_or_iwi)
funding_paths = _make_solar_heating_funding_paths(
p, input_measures, funding_paths, remaining_insulation_type, housing_type, funding