Model/backend/app/plan/utils.py
2025-12-14 21:54:50 +08:00

252 lines
9.3 KiB
Python

import os
import time
import msgpack
from uuid import UUID
from typing import Any
from utils.s3 import read_from_s3
from backend.app.config import get_settings
from backend.app.plan.data_classes import PropertyRequestData
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from starlette.responses import Response
from utils.logger import setup_logger
logger = setup_logger()
def get_cleaned():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-{environment}".format(environment=get_settings().ENVIRONMENT)
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def patch_epc(patch, epc_records):
"""
This utility function is useful to patch the epc data if we have data from the customer
:return:
"""
for patch_variable, patch_value in patch.items():
if patch_variable in ["address", "postcode"]:
continue
if patch_value in ["", None]:
continue
if patch_variable in epc_records["original_epc"]:
epc_records["original_epc"][patch_variable] = patch_value
return epc_records
def extract_property_request_data(
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
patch = next((
x for x in patches if str(x["uprn"]) == str(config["uprn"])
), {})
else:
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
property_already_installed = next((
x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), [])
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
if has_uprn:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(str(x["uprn"]) == str(uprn))
), {})
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
else:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), {})
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
property_non_invasive_recommendations["recommendations"]
)
transformed = []
for rec in property_non_invasive_recommendations["recommendations"]:
if isinstance(rec, str):
transformed.append({"type": rec, })
else:
transformed.append(rec)
property_non_invasive_recommendations["recommendations"] = transformed
# Check if the valuation data has uprn
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
if valuation_has_uprn:
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
if valuation_has_uprn:
property_valuation = next((
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
else:
property_valuation = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
), None)
# Return data class to give a structured format
return PropertyRequestData(
patch=patch,
already_installed=property_already_installed,
non_invasive_recommendations=property_non_invasive_recommendations,
valuation=property_valuation
)
def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
None, None, None, list]:
solar_identification = config.get("solar_reason", None)
cavity_identification = config.get("cavity_reason", None)
if not solar_identification and not cavity_identification:
return None, None, None, []
landlord_heating_system = config["landlord_heating_system"]
# This is the initial version of tackling "already installed" measures
already_installed = []
if landlord_heating_system == "air source heat pump":
already_installed.append("air_source_heat_pump")
# We map the categories to the desired measures and upgrade targets
# We note that the categories are placeholder until we move the standardised asset list
identification_map = {
"Solar Eligible": {
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
"target_sap": 86, # High B
"plan_type": "solar_eco4"
},
"Solar Eligible, Solid Wall Uninsulated, EPC E or Below": {
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
"target_sap": 86, # High B
"plan_type": "solar_eco4"
},
"Solar Eligible, Needs Heating Upgrade": {
"measures": ["solar_pv", "loft_insulation", "high_heat_retention_storage_heaters",
"mechanical_ventilation"],
"target_sap": 86, # High B
"plan_type": "solar_hhrsh_eco4"
},
"Non-Intrusive Data Shows Empty Cavity": {
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "empty_cavity_eco"
},
'Non-Intrusive Data Shows Empty Cavity, built after 2002': {
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "empty_cavity_eco"
},
"EPC Shows Empty Cavity, inspections show retro drilled": {
# EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "extraction_eco"
},
"EPC Shows Empty Cavity, inspections show filled at build": {
# EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "extraction_eco"
},
"EPC Shows Empty Cavity": {
# EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C
"plan_type": "empty_cavity_eco"
}
}
# Always prioritise solar
if solar_identification:
_key = solar_identification.split(":")[0]
else:
_key = cavity_identification.split(":")[0]
mapped = identification_map[_key]
measures = mapped["measures"]
# If we have already installed an ASHP, we adjust the measures
if "air_source_heat_pump" in already_installed:
if "high_heat_retention_storage_heaters" in measures:
# If we have a HHRSH already, we remove it
measures.remove("high_heat_retention_storage_heaters")
# Add in ASHP (replacing HHRSH if already had)
measures.append("air_source_heat_pump")
current_sap = prepared_epc.current_energy_efficiency
# If we have a solar package, and the property is a D or above, we don't need to do lofts
if "solar_eco4" in mapped["plan_type"] and current_sap >= 55:
if "loft_insulation" in measures:
measures.remove("loft_insulation")
return measures, mapped["target_sap"], mapped["plan_type"], already_installed
def build_cloudwatch_log_url(start_ms: int) -> str:
"""
Build a CloudWatch Logs URL for the current Lambda invocation,
including timestamp window from start_ms to end_ms (epoch ms).
"""
region = os.environ["AWS_REGION"]
log_group = os.environ["AWS_LAMBDA_LOG_GROUP_NAME"]
log_stream = os.environ["AWS_LAMBDA_LOG_STREAM_NAME"]
# CloudWatch console requires / encoded as $252F
encoded_group = log_group.replace("/", "$252F")
encoded_stream = log_stream.replace("/", "$252F")
# Return the full URL with time range
return (
f"https://console.aws.amazon.com/cloudwatch/home?"
f"region={region}"
f"#logsV2:log-groups/log-group/{encoded_group}"
f"/log-events/{encoded_stream}"
f"$3Fstart={start_ms}"
)
def handle_error(msg, e, subtask_id, status=500, start_ms=None):
# When the pipeline fails, handles error process
cloud_logs_url = build_cloudwatch_log_url(start_ms)
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="failed",
outputs=str(e),
cloud_logs_url=cloud_logs_url
)
logger.error(msg, exc_info=True)
return Response(status_code=status, content=msg)