Correct maths and prepare to trigger sqs from api

This commit is contained in:
Daniel Roth 2026-02-26 12:58:33 +00:00
parent a962105436
commit add53a1949
7 changed files with 218 additions and 144 deletions

View file

@ -61,8 +61,12 @@ class SubTaskInterface:
# UPDATE STATUS (in progress, complete, failed) # UPDATE STATUS (in progress, complete, failed)
# -------------------------------------------------------- # --------------------------------------------------------
def update_subtask_status( def update_subtask_status(
self, subtask_id: UUID, status: str, outputs=None, cloud_logs_url=None self,
): subtask_id: UUID,
status: str,
outputs: Optional[Dict[str, str]] = None,
cloud_logs_url: Optional[str] = None,
) -> SubTask:
""" """
Update the status of a subtask, and recalculate the parent task progress. Update the status of a subtask, and recalculate the parent task progress.
:param subtask_id: UUID of the subtask to update :param subtask_id: UUID of the subtask to update

View file

@ -52,18 +52,20 @@ async def trigger_categorisation(
logger.info("API triggered with body: %s", payload) logger.info("API triggered with body: %s", payload)
property_ids: List[int] = get_property_ids(payload.portfolio_id) property_ids: list[int] = get_property_ids(payload.portfolio_id)
property_ids.sort() property_ids.sort()
num_scenarios: int = get_scenarios_count_by_portfolio_id(payload.portfolio_id) num_scenarios: int = get_scenarios_count_by_portfolio_id(payload.portfolio_id)
batch_size: int = ( total_plans_to_update: int = len(property_ids) * num_scenarios
math.ceil(1000 / num_scenarios) if num_scenarios > 1000 else num_scenarios
)
num_property_buckets: int = max(1, math.ceil(len(property_ids) / batch_size))
print("num_scenarios", num_scenarios) max_writes_per_batch: int = 1000
print("batch_size", batch_size) properties_per_batch: int = max(1, max_writes_per_batch // num_scenarios)
print("num_property_buckets", num_property_buckets)
num_property_batches: int = math.ceil(len(property_ids) / properties_per_batch)
print("total_plans_to_update", total_plans_to_update)
print("properties_per_batch", properties_per_batch)
print("num_property_buckets", num_property_batches)
# Create task # Create task
# task_id, _ = TasksInterface.create_task( # task_id, _ = TasksInterface.create_task(
@ -76,32 +78,43 @@ async def trigger_categorisation(
# Dispatch requests to lambdas # Dispatch requests to lambdas
# subtask_interface = SubTaskInterface() # subtask_interface = SubTaskInterface()
# for bucket_index in range(num_property_buckets): for batch_index in range(num_property_batches):
# bucket_property_ids: List[int] = [
# pid for pid in property_ids if pid % num_property_buckets == bucket_index
# ]
# bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest(
# portfolio_id=payload.portfolio_id,
# scenarios_to_consider=payload.scenarios_to_consider,
# scenario_priority_order=payload.scenario_priority_order,
# min_property_id=min(bucket_property_ids),
# max_property_id=max(bucket_property_ids),
# )
# # Create sub-task for each
# subtask_id: UUID = subtask_interface.create_subtask(
# task_id=task_id, inputs=bucket_request.model_dump()
# )
# response = sqs_client.send_message( start: int = batch_index * properties_per_batch
# QueueUrl="categorisation-queue-dev", end: int = start + properties_per_batch
# MessageBody=bucket_request.model_dump_json(),
# )
# logger.info( batch_property_ids: List[int] = property_ids[start:end]
# f"Chunk {bucket_index} sent to SQS. Property IDs {min(bucket_property_ids)}{max(bucket_property_ids)}. Message ID: {response.get('MessageId')}"
# )
# await asyncio.sleep(0.05) # Small delay to avoid SQS throttling if not batch_property_ids:
continue
# bucket_property_ids: List[int] = [
# pid for pid in property_ids if pid % num_buckets == bucket_index
# ]
# bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest(
# portfolio_id=payload.portfolio_id,
# scenarios_to_consider=payload.scenarios_to_consider,
# scenario_priority_order=payload.scenario_priority_order,
# min_property_id=min(bucket_property_ids),
# max_property_id=max(bucket_property_ids),
# )
# # Create sub-task for each
# subtask_id: UUID = subtask_interface.create_subtask(
# task_id=task_id, inputs=bucket_request.model_dump()
# )
# bucket_request.subtask_id = str(subtask_id)
# response = sqs_client.send_message(
# QueueUrl="categorisation-queue-dev",
# MessageBody=bucket_request.model_dump_json(),
# )
logger.info(
# f"Chunk {bucket_index} sent to SQS. Property IDs {min(bucket_property_ids)}{max(bucket_property_ids)}. Message ID: {response.get('MessageId')}"
f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}{max(batch_property_ids)}"
)
await asyncio.sleep(0.05) # Small delay to avoid SQS throttling
return {"message": "Categorisation jobs distributed"} return {"message": "Categorisation jobs distributed"}

View file

@ -1,5 +1,6 @@
import ast import ast
import os import os
from typing import Optional
import msgpack import msgpack
from uuid import UUID from uuid import UUID
from utils.s3 import read_from_s3 from utils.s3 import read_from_s3
@ -24,7 +25,7 @@ def get_cleaned():
cleaned = read_from_s3( cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson", s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name=get_settings().DATA_BUCKET bucket_name=get_settings().DATA_BUCKET,
) )
cleaned = msgpack.unpackb(cleaned, raw=False) cleaned = msgpack.unpackb(cleaned, raw=False)
@ -56,32 +57,45 @@ def extract_property_request_data(
): ):
patch_has_uprn = "uprn" in patches[0] if patches else True patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn: if patch_has_uprn:
patch = next(( patch = next((x for x in patches if str(x["uprn"]) == str(address.uprn)), {})
x for x in patches if str(x["uprn"]) == str(address.uprn)
), {})
else: else:
patch = next(( patch = next(
x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode) (
), {}) x
for x in patches
if (x["address"] == address.address)
and (x["postcode"] == address.postcode)
),
{},
)
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn # we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False has_uprn = (
"uprn" in non_invasive_recommendations[0]
if non_invasive_recommendations
else False
)
if has_uprn: if has_uprn:
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None] has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
if has_uprn: if has_uprn:
property_non_invasive_recommendations = next(( property_non_invasive_recommendations = next(
x for x in non_invasive_recommendations if (x for x in non_invasive_recommendations if (str(x["uprn"]) == str(uprn))),
(str(x["uprn"]) == str(uprn)) {},
), {}) )
# We patch the non-invasive recs that are ['cavity_extract_and_refill'] # We patch the non-invasive recs that are ['cavity_extract_and_refill']
else: else:
property_non_invasive_recommendations = next(( property_non_invasive_recommendations = next(
x for x in non_invasive_recommendations if (
(x["address"] == address.address) and (x["postcode"] == address.postcode) x
), {}) for x in non_invasive_recommendations
if (x["address"] == address.address)
and (x["postcode"] == address.postcode)
),
{},
)
if isinstance(property_non_invasive_recommendations.get("recommendations"), str): if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
property_non_invasive_recommendations["recommendations"] = ast.literal_eval( property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
@ -90,7 +104,11 @@ def extract_property_request_data(
transformed = [] transformed = []
for rec in property_non_invasive_recommendations["recommendations"]: for rec in property_non_invasive_recommendations["recommendations"]:
if isinstance(rec, str): if isinstance(rec, str):
transformed.append({"type": rec, }) transformed.append(
{
"type": rec,
}
)
else: else:
transformed.append(rec) transformed.append(rec)
@ -102,26 +120,36 @@ def extract_property_request_data(
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None] valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
if valuation_has_uprn: if valuation_has_uprn:
property_valuation = next(( property_valuation = next(
float(x["valuation"]) for x in valuation_data if (
(str(x["uprn"]) == str(uprn)) float(x["valuation"])
), None) for x in valuation_data
if (str(x["uprn"]) == str(uprn))
),
None,
)
else: else:
property_valuation = next(( property_valuation = next(
float(x["valuation"]) for x in valuation_data if (
(x["address"] == address.address) and (x["postcode"] == address.postcode) float(x["valuation"])
), None) for x in valuation_data
if (x["address"] == address.address)
and (x["postcode"] == address.postcode)
),
None,
)
# Return data class to give a structured format # Return data class to give a structured format
return PropertyRequestData( return PropertyRequestData(
patch=patch, patch=patch,
non_invasive_recommendations=property_non_invasive_recommendations, non_invasive_recommendations=property_non_invasive_recommendations,
valuation=property_valuation valuation=property_valuation,
) )
def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[ def parse_eco_packages(
None, None, None, list]: addr: Address, prepared_epc
) -> tuple[list[str], int, str, list[str]] | tuple[None, None, None, list]:
solar_identification = addr.solar_reason solar_identification = addr.solar_reason
cavity_identification = addr.cavity_reason cavity_identification = addr.cavity_reason
if not solar_identification and not cavity_identification: if not solar_identification and not cavity_identification:
@ -140,47 +168,51 @@ def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str
"Solar Eligible": { "Solar Eligible": {
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"], "measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
"target_sap": 86, # High B "target_sap": 86, # High B
"plan_type": "solar_eco4" "plan_type": "solar_eco4",
}, },
"Solar Eligible, Solid Wall Uninsulated, EPC E or Below": { "Solar Eligible, Solid Wall Uninsulated, EPC E or Below": {
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"], "measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
"target_sap": 86, # High B "target_sap": 86, # High B
"plan_type": "solar_eco4" "plan_type": "solar_eco4",
}, },
"Solar Eligible, Needs Heating Upgrade": { "Solar Eligible, Needs Heating Upgrade": {
"measures": ["solar_pv", "loft_insulation", "high_heat_retention_storage_heaters", "measures": [
"mechanical_ventilation"], "solar_pv",
"loft_insulation",
"high_heat_retention_storage_heaters",
"mechanical_ventilation",
],
"target_sap": 86, # High B "target_sap": 86, # High B
"plan_type": "solar_hhrsh_eco4" "plan_type": "solar_hhrsh_eco4",
}, },
"Non-Intrusive Data Shows Empty Cavity": { "Non-Intrusive Data Shows Empty Cavity": {
"measures": ["cavity_wall_insulation", "mechanical_ventilation"], "measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C "target_sap": 69, # Low C
"plan_type": "empty_cavity_eco" "plan_type": "empty_cavity_eco",
}, },
'Non-Intrusive Data Shows Empty Cavity, built after 2002': { "Non-Intrusive Data Shows Empty Cavity, built after 2002": {
"measures": ["cavity_wall_insulation", "mechanical_ventilation"], "measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C "target_sap": 69, # Low C
"plan_type": "empty_cavity_eco" "plan_type": "empty_cavity_eco",
}, },
"EPC Shows Empty Cavity, inspections show retro drilled": { "EPC Shows Empty Cavity, inspections show retro drilled": {
# EPC Indicates it's empty, so we simulate a fill # EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"], "measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C "target_sap": 69, # Low C
"plan_type": "extraction_eco" "plan_type": "extraction_eco",
}, },
"EPC Shows Empty Cavity, inspections show filled at build": { "EPC Shows Empty Cavity, inspections show filled at build": {
# EPC Indicates it's empty, so we simulate a fill # EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"], "measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C "target_sap": 69, # Low C
"plan_type": "extraction_eco" "plan_type": "extraction_eco",
}, },
"EPC Shows Empty Cavity": { "EPC Shows Empty Cavity": {
# EPC Indicates it's empty, so we simulate a fill # EPC Indicates it's empty, so we simulate a fill
"measures": ["cavity_wall_insulation", "mechanical_ventilation"], "measures": ["cavity_wall_insulation", "mechanical_ventilation"],
"target_sap": 69, # Low C "target_sap": 69, # Low C
"plan_type": "empty_cavity_eco" "plan_type": "empty_cavity_eco",
} },
} }
# Always prioritise solar # Always prioritise solar
@ -232,15 +264,21 @@ def build_cloudwatch_log_url(start_ms: int) -> str:
) )
def handle_error(msg, e, subtask_id, status=500, start_ms=None): def handle_error(
msg: str,
exception: Exception,
subtask_id: str,
status_code: int = 500,
start_ms: Optional[int] = None,
):
# When the pipeline fails, handles error process # When the pipeline fails, handles error process
cloud_logs_url = build_cloudwatch_log_url(start_ms) cloud_logs_url = build_cloudwatch_log_url(start_ms)
SubTaskInterface().update_subtask_status( SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id), subtask_id=UUID(subtask_id),
status="failed", status="failed",
outputs=str(e), outputs=str(exception),
cloud_logs_url=cloud_logs_url cloud_logs_url=cloud_logs_url,
) )
logger.error(msg, exc_info=True) logger.error(msg, exc_info=True)
return Response(status_code=status, content=msg) return Response(status_code=status_code, content=msg)

View file

@ -11,7 +11,6 @@ class CategorisationTriggerRequest(BaseModel):
min_property_id: Optional[int] = None min_property_id: Optional[int] = None
max_property_id: Optional[int] = None max_property_id: Optional[int] = None
task_id: Optional[str] = None
subtask_id: Optional[str] = None subtask_id: Optional[str] = None

View file

@ -1,6 +1,9 @@
import json import json
import time
from typing import Any, Mapping from typing import Any, Mapping
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.plan.utils import build_cloudwatch_log_url
from backend.categorisation.categorisation_trigger_request import ( from backend.categorisation.categorisation_trigger_request import (
CategorisationTriggerRequest, CategorisationTriggerRequest,
) )
@ -26,7 +29,6 @@ def handler(event: Mapping[str, Any], context: Any) -> None:
logger.debug("Successfully validated request body") logger.debug("Successfully validated request body")
process_portfolio(payload) process_portfolio(payload)
except Exception as e: except Exception as e:
logger.info("Handler exception") logger.info("Handler exception")
logger.error(f"Failed to process record: {e}") logger.error(f"Failed to process record: {e}")

View file

@ -1,10 +1,10 @@
sqlmodel sqlmodel
pydantic-settings pydantic-settings
psycopg2-binary==2.9.10 psycopg2-binary==2.9.10
starlette
# Not used but needed to satisfy imports # Not used but needed to satisfy imports
pytz==2024.2 pytz==2024.2
msgpack==1.1.0 msgpack==1.1.0
numpy<2 numpy<2
pandas==2.2.3 pandas==2.2.3
starlette

View file

@ -2,6 +2,7 @@ import time
from collections import defaultdict from collections import defaultdict
from typing import Dict, List, Optional from typing import Dict, List, Optional
from uuid import UUID from uuid import UUID
from starlette.responses import Response
from backend.app.db.functions.recommendations_functions import ( from backend.app.db.functions.recommendations_functions import (
bulk_update_plans, bulk_update_plans,
@ -14,7 +15,7 @@ from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.models.recommendations import PlanModel, ScenarioModel from backend.app.db.models.recommendations import PlanModel, ScenarioModel
from backend.app.domain.classes.plan import Plan from backend.app.domain.classes.plan import Plan
from backend.app.domain.classes.scenario import Scenario from backend.app.domain.classes.scenario import Scenario
from backend.app.plan.utils import build_cloudwatch_log_url from backend.app.plan.utils import build_cloudwatch_log_url, handle_error
from backend.categorisation.categorisation_trigger_request import ( from backend.categorisation.categorisation_trigger_request import (
CategorisationTriggerRequest, CategorisationTriggerRequest,
) )
@ -25,7 +26,7 @@ logger = setup_logger()
def process_portfolio( def process_portfolio(
body: CategorisationTriggerRequest, body: CategorisationTriggerRequest,
) -> None: # TODO: make this a class ) -> Response: # TODO: make this a class
portfolio_id: int = body.portfolio_id portfolio_id: int = body.portfolio_id
scenarios_to_consider: Optional[List[int]] = body.scenarios_to_consider scenarios_to_consider: Optional[List[int]] = body.scenarios_to_consider
scenario_priority_order: Optional[List[int]] = body.scenario_priority_order scenario_priority_order: Optional[List[int]] = body.scenario_priority_order
@ -36,74 +37,91 @@ def process_portfolio(
logger.info(f"Processing portfolio {portfolio_id}") logger.info(f"Processing portfolio {portfolio_id}")
start_ms = int(time.time() * 1000) start_ms = int(time.time() * 1000)
all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id) try:
plans_by_id: Dict[int, Plan] = {} # TODO: make this an in-memory repository class
if scenarios_to_consider: all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id)
if len(scenarios_to_consider) < 2: plans_by_id: Dict[int, Plan] = (
raise ValueError( {}
"Cannot run auto categorisation for fewer than 2 scenarios" ) # TODO: make this an in-memory repository class
)
# first get all plans that we're interested in if scenarios_to_consider:
plans_for_consideration: List[Plan] = _load_plans_for_portfolio( if len(scenarios_to_consider) < 2:
portfolio_id, raise ValueError(
all_scenarios, "Cannot run auto categorisation for fewer than 2 scenarios"
scenarios_to_consider, )
min_property_id,
max_property_id,
)
for plan in plans_for_consideration:
if plan.id is not None: # just in case
plans_by_id[plan.id] = plan
# then unset existing defaults on domain objects regardless of whether they're under consideration or not # first get all plans that we're interested in
default_plans: List[Plan] = _get_default_plans( plans_for_consideration: List[Plan] = _load_plans_for_portfolio(
portfolio_id, all_scenarios, min_property_id, max_property_id portfolio_id,
) all_scenarios,
for plan in default_plans: scenarios_to_consider,
plan.set_default(False) min_property_id,
if plan.id is not None: # just in case max_property_id,
plans_by_id[plan.id] = plan )
for plan in plans_for_consideration:
logger.info(f"Successfully unset {len(default_plans)} default plan(s)")
# then set new defaults on domain objects under consideration
plans_for_consideration_by_property: Dict[int, List[Plan]] = (
_group_plans_by_property(plans_for_consideration)
)
for property_id, property_plans in plans_for_consideration_by_property.items():
if not property_plans:
raise ValueError(f"No plans for property {property_id}")
try:
cheapest_plan = choose_cheapest_relevant_plan(
property_plans, scenario_priority_order
)
except Exception:
logger.error(f"Failed to find cheapest plan for property {property_id}")
raise
property_plans = _update_plan_objects(property_plans, cheapest_plan)
for plan in property_plans:
if plan.id is not None: # just in case if plan.id is not None: # just in case
plans_by_id[plan.id] = plan plans_by_id[plan.id] = plan
logger.info("Successfully set defaults on Plan objects in memory") # then unset existing defaults on domain objects regardless of whether they're under consideration or not
default_plans: List[Plan] = _get_default_plans(
# then pass all domain objects to database to update (regardless of whether they've changed) portfolio_id, all_scenarios, min_property_id, max_property_id
_update_plans_in_db(list(plans_by_id.values()))
logger.info(f"Successfully updated {len(plans_by_id)} Plans in database")
# Mark the subtask as successful
if subtask_id:
cloud_logs_url = build_cloudwatch_log_url(start_ms)
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="complete",
cloud_logs_url=cloud_logs_url,
) )
for plan in default_plans:
plan.set_default(False)
if plan.id is not None: # just in case
plans_by_id[plan.id] = plan
logger.info(f"Successfully unset {len(default_plans)} default plan(s)")
# then set new defaults on domain objects under consideration
plans_for_consideration_by_property: Dict[int, List[Plan]] = (
_group_plans_by_property(plans_for_consideration)
)
for property_id, property_plans in plans_for_consideration_by_property.items():
if not property_plans:
raise ValueError(f"No plans for property {property_id}")
try:
cheapest_plan = choose_cheapest_relevant_plan(
property_plans, scenario_priority_order
)
except Exception:
logger.error(f"Failed to find cheapest plan for property {property_id}")
raise
property_plans = _update_plan_objects(property_plans, cheapest_plan)
for plan in property_plans:
if plan.id is not None: # just in case
plans_by_id[plan.id] = plan
logger.info("Successfully set defaults on Plan objects in memory")
# then pass all domain objects to database to update (regardless of whether they've changed)
_update_plans_in_db(list(plans_by_id.values()))
# Mark the subtask as successful
if body.subtask_id:
cloud_logs_url = build_cloudwatch_log_url(start_ms)
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="complete",
cloud_logs_url=cloud_logs_url,
)
logger.info(f"Successfully updated {len(plans_by_id)} Plans in database")
return Response(status_code=200)
except Exception as e:
if subtask_id:
return handle_error(
"Exception during Categorisation processing.",
e,
subtask_id,
500,
start_ms,
)
raise
def choose_cheapest_relevant_plan( def choose_cheapest_relevant_plan(