mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
225 lines
8.5 KiB
Python
225 lines
8.5 KiB
Python
import msgpack
|
|
from uuid import UUID
|
|
from typing import Any
|
|
from utils.s3 import read_from_s3
|
|
from backend.app.config import get_settings
|
|
from backend.app.plan.data_classes import PropertyRequestData
|
|
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
|
from starlette.responses import Response
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def get_cleaned():
|
|
"""
|
|
This function will retrieve the cleaned dataset from s3 which has the cleaned
|
|
descriptions for the epc dataset
|
|
|
|
This data is stored in MessagePack format and therefore needs to be decoded
|
|
:return:
|
|
"""
|
|
|
|
cleaned = read_from_s3(
|
|
s3_file_name="cleaned_epc_data/cleaned.bson",
|
|
bucket_name="retrofit-data-{environment}".format(environment=get_settings().ENVIRONMENT)
|
|
)
|
|
|
|
cleaned = msgpack.unpackb(cleaned, raw=False)
|
|
|
|
return cleaned
|
|
|
|
|
|
def patch_epc(patch, epc_records):
|
|
"""
|
|
This utility function is useful to patch the epc data if we have data from the customer
|
|
:return:
|
|
"""
|
|
|
|
for patch_variable, patch_value in patch.items():
|
|
|
|
if patch_variable in ["address", "postcode"]:
|
|
continue
|
|
|
|
if patch_value == "":
|
|
continue
|
|
if patch_variable in epc_records["original_epc"]:
|
|
epc_records["original_epc"][patch_variable] = patch_value
|
|
|
|
return epc_records
|
|
|
|
|
|
def extract_property_request_data(
|
|
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
|
|
):
|
|
patch_has_uprn = "uprn" in patches[0] if patches else True
|
|
if patch_has_uprn:
|
|
patch = next((
|
|
x for x in patches if str(x["uprn"]) == str(config["uprn"])
|
|
), {})
|
|
else:
|
|
patch = next((
|
|
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
property_already_installed = next((
|
|
x for x in already_installed if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), [])
|
|
|
|
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
|
|
# we need to check existence of uprn
|
|
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
|
|
if has_uprn:
|
|
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
|
|
|
|
if has_uprn:
|
|
property_non_invasive_recommendations = next((
|
|
x for x in non_invasive_recommendations if
|
|
(str(x["uprn"]) == str(uprn))
|
|
), {})
|
|
|
|
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
|
|
else:
|
|
property_non_invasive_recommendations = next((
|
|
x for x in non_invasive_recommendations if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), {})
|
|
|
|
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
|
|
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
|
|
property_non_invasive_recommendations["recommendations"]
|
|
)
|
|
transformed = []
|
|
for rec in property_non_invasive_recommendations["recommendations"]:
|
|
if isinstance(rec, str):
|
|
transformed.append({"type": rec, })
|
|
else:
|
|
transformed.append(rec)
|
|
|
|
property_non_invasive_recommendations["recommendations"] = transformed
|
|
|
|
# Check if the valuation data has uprn
|
|
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
|
|
if valuation_has_uprn:
|
|
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
|
|
|
|
if valuation_has_uprn:
|
|
property_valuation = next((
|
|
float(x["valuation"]) for x in valuation_data if
|
|
(str(x["uprn"]) == str(uprn))
|
|
), None)
|
|
else:
|
|
property_valuation = next((
|
|
float(x["valuation"]) for x in valuation_data if
|
|
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
|
), None)
|
|
|
|
# Return data class to give a structured format
|
|
return PropertyRequestData(
|
|
patch=patch,
|
|
already_installed=property_already_installed,
|
|
non_invasive_recommendations=property_non_invasive_recommendations,
|
|
valuation=property_valuation
|
|
)
|
|
|
|
|
|
def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
|
|
None, None, None, list]:
|
|
solar_identification = config.get("solar_reason", None)
|
|
cavity_identification = config.get("cavity_reason", None)
|
|
if not solar_identification and not cavity_identification:
|
|
return None, None, None, []
|
|
|
|
landlord_heating_system = config["landlord_heating_system"]
|
|
# This is the initial version of tackling "already installed" measures
|
|
already_installed = []
|
|
if landlord_heating_system == "air source heat pump":
|
|
already_installed.append("air_source_heat_pump")
|
|
|
|
# We map the categories to the desired measures and upgrade targets
|
|
# We note that the categories are placeholder until we move the standardised asset list
|
|
|
|
identification_map = {
|
|
"Solar Eligible": {
|
|
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
|
|
"target_sap": 86, # High B
|
|
"plan_type": "solar_eco4"
|
|
},
|
|
"Solar Eligible, Solid Wall Uninsulated, EPC E or Below": {
|
|
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
|
|
"target_sap": 86, # High B
|
|
"plan_type": "solar_eco4"
|
|
},
|
|
"Solar Eligible, Needs Heating Upgrade": {
|
|
"measures": ["solar_pv", "loft_insulation", "high_heat_retention_storage_heaters",
|
|
"mechanical_ventilation"],
|
|
"target_sap": 86, # High B
|
|
"plan_type": "solar_hhrsh_eco4"
|
|
},
|
|
"Non-Intrusive Data Shows Empty Cavity": {
|
|
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
|
"target_sap": 69, # Low C
|
|
"plan_type": "empty_cavity_eco"
|
|
},
|
|
'Non-Intrusive Data Shows Empty Cavity, built after 2002': {
|
|
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
|
"target_sap": 69, # Low C
|
|
"plan_type": "empty_cavity_eco"
|
|
},
|
|
"EPC Shows Empty Cavity, inspections show retro drilled": {
|
|
# EPC Indicates it's empty, so we simulate a fill
|
|
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
|
"target_sap": 69, # Low C
|
|
"plan_type": "extraction_eco"
|
|
},
|
|
"EPC Shows Empty Cavity, inspections show filled at build": {
|
|
# EPC Indicates it's empty, so we simulate a fill
|
|
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
|
"target_sap": 69, # Low C
|
|
"plan_type": "extraction_eco"
|
|
},
|
|
"EPC Shows Empty Cavity": {
|
|
# EPC Indicates it's empty, so we simulate a fill
|
|
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
|
"target_sap": 69, # Low C
|
|
"plan_type": "empty_cavity_eco"
|
|
}
|
|
}
|
|
|
|
# Always prioritise solar
|
|
if solar_identification:
|
|
_key = solar_identification.split(":")[0]
|
|
else:
|
|
_key = cavity_identification.split(":")[0]
|
|
|
|
mapped = identification_map[_key]
|
|
measures = mapped["measures"]
|
|
|
|
# If we have already installed an ASHP, we adjust the measures
|
|
if "air_source_heat_pump" in already_installed:
|
|
if "high_heat_retention_storage_heaters" in measures:
|
|
# If we have a HHRSH already, we remove it
|
|
measures.remove("high_heat_retention_storage_heaters")
|
|
# Add in ASHP (replacing HHRSH if already had)
|
|
measures.append("air_source_heat_pump")
|
|
|
|
current_sap = prepared_epc.current_energy_efficiency
|
|
# If we have a solar package, and the property is a D or above, we don't need to do lofts
|
|
if "solar_eco4" in mapped["plan_type"] and current_sap >= 55:
|
|
if "loft_insulation" in measures:
|
|
measures.remove("loft_insulation")
|
|
|
|
return measures, mapped["target_sap"], mapped["plan_type"], already_installed
|
|
|
|
|
|
def handle_error(session, msg, e, subtask_id, status=500):
|
|
# When the pipeline fails, handles error process
|
|
SubTaskInterface().update_subtask_status(
|
|
subtask_id=UUID(subtask_id),
|
|
status="failed",
|
|
outputs=str(e)
|
|
)
|
|
logger.error(msg, exc_info=True)
|
|
session.rollback()
|
|
return Response(status_code=status, content=msg)
|