diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 13229447..0f987f3b 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -61,8 +61,12 @@ class SubTaskInterface: # UPDATE STATUS (in progress, complete, failed) # -------------------------------------------------------- def update_subtask_status( - self, subtask_id: UUID, status: str, outputs=None, cloud_logs_url=None - ): + self, + subtask_id: UUID, + status: str, + outputs: Optional[Dict[str, str]] = None, + cloud_logs_url: Optional[str] = None, + ) -> SubTask: """ Update the status of a subtask, and recalculate the parent task progress. :param subtask_id: UUID of the subtask to update diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index f45daea3..1ecd1f40 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -52,18 +52,20 @@ async def trigger_categorisation( logger.info("API triggered with body: %s", payload) - property_ids: List[int] = get_property_ids(payload.portfolio_id) + property_ids: list[int] = get_property_ids(payload.portfolio_id) property_ids.sort() num_scenarios: int = get_scenarios_count_by_portfolio_id(payload.portfolio_id) - batch_size: int = ( - math.ceil(1000 / num_scenarios) if num_scenarios > 1000 else num_scenarios - ) - num_property_buckets: int = max(1, math.ceil(len(property_ids) / batch_size)) + total_plans_to_update: int = len(property_ids) * num_scenarios - print("num_scenarios", num_scenarios) - print("batch_size", batch_size) - print("num_property_buckets", num_property_buckets) + max_writes_per_batch: int = 1000 + properties_per_batch: int = max(1, max_writes_per_batch // num_scenarios) + + num_property_batches: int = math.ceil(len(property_ids) / properties_per_batch) + + print("total_plans_to_update", total_plans_to_update) + print("properties_per_batch", properties_per_batch) + print("num_property_buckets", num_property_batches) # Create task # task_id, _ = TasksInterface.create_task( @@ -76,32 +78,43 @@ async def trigger_categorisation( # Dispatch requests to lambdas # subtask_interface = SubTaskInterface() - # for bucket_index in range(num_property_buckets): - # bucket_property_ids: List[int] = [ - # pid for pid in property_ids if pid % num_property_buckets == bucket_index - # ] - # bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest( - # portfolio_id=payload.portfolio_id, - # scenarios_to_consider=payload.scenarios_to_consider, - # scenario_priority_order=payload.scenario_priority_order, - # min_property_id=min(bucket_property_ids), - # max_property_id=max(bucket_property_ids), - # ) - # # Create sub-task for each - # subtask_id: UUID = subtask_interface.create_subtask( - # task_id=task_id, inputs=bucket_request.model_dump() - # ) + for batch_index in range(num_property_batches): - # response = sqs_client.send_message( - # QueueUrl="categorisation-queue-dev", - # MessageBody=bucket_request.model_dump_json(), - # ) + start: int = batch_index * properties_per_batch + end: int = start + properties_per_batch - # logger.info( - # f"Chunk {bucket_index} sent to SQS. Property IDs {min(bucket_property_ids)}–{max(bucket_property_ids)}. Message ID: {response.get('MessageId')}" - # ) + batch_property_ids: List[int] = property_ids[start:end] - # await asyncio.sleep(0.05) # Small delay to avoid SQS throttling + if not batch_property_ids: + continue + + # bucket_property_ids: List[int] = [ + # pid for pid in property_ids if pid % num_buckets == bucket_index + # ] + # bucket_request: CategorisationTriggerRequest = CategorisationTriggerRequest( + # portfolio_id=payload.portfolio_id, + # scenarios_to_consider=payload.scenarios_to_consider, + # scenario_priority_order=payload.scenario_priority_order, + # min_property_id=min(bucket_property_ids), + # max_property_id=max(bucket_property_ids), + # ) + # # Create sub-task for each + # subtask_id: UUID = subtask_interface.create_subtask( + # task_id=task_id, inputs=bucket_request.model_dump() + # ) + # bucket_request.subtask_id = str(subtask_id) + + # response = sqs_client.send_message( + # QueueUrl="categorisation-queue-dev", + # MessageBody=bucket_request.model_dump_json(), + # ) + + logger.info( + # f"Chunk {bucket_index} sent to SQS. Property IDs {min(bucket_property_ids)}–{max(bucket_property_ids)}. Message ID: {response.get('MessageId')}" + f"Chunk {batch_index} sent to SQS. Property IDs {min(batch_property_ids)}–{max(batch_property_ids)}" + ) + + await asyncio.sleep(0.05) # Small delay to avoid SQS throttling return {"message": "Categorisation jobs distributed"} diff --git a/backend/app/plan/utils.py b/backend/app/plan/utils.py index 10d7fb06..2237c38e 100644 --- a/backend/app/plan/utils.py +++ b/backend/app/plan/utils.py @@ -1,5 +1,6 @@ import ast import os +from typing import Optional import msgpack from uuid import UUID from utils.s3 import read_from_s3 @@ -24,7 +25,7 @@ def get_cleaned(): cleaned = read_from_s3( s3_file_name="cleaned_epc_data/cleaned.bson", - bucket_name=get_settings().DATA_BUCKET + bucket_name=get_settings().DATA_BUCKET, ) cleaned = msgpack.unpackb(cleaned, raw=False) @@ -56,32 +57,45 @@ def extract_property_request_data( ): patch_has_uprn = "uprn" in patches[0] if patches else True if patch_has_uprn: - patch = next(( - x for x in patches if str(x["uprn"]) == str(address.uprn) - ), {}) + patch = next((x for x in patches if str(x["uprn"]) == str(address.uprn)), {}) else: - patch = next(( - x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode) - ), {}) + patch = next( + ( + x + for x in patches + if (x["address"] == address.address) + and (x["postcode"] == address.postcode) + ), + {}, + ) # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN # we need to check existence of uprn - has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False + has_uprn = ( + "uprn" in non_invasive_recommendations[0] + if non_invasive_recommendations + else False + ) if has_uprn: has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None] if has_uprn: - property_non_invasive_recommendations = next(( - x for x in non_invasive_recommendations if - (str(x["uprn"]) == str(uprn)) - ), {}) + property_non_invasive_recommendations = next( + (x for x in non_invasive_recommendations if (str(x["uprn"]) == str(uprn))), + {}, + ) # We patch the non-invasive recs that are ['cavity_extract_and_refill'] else: - property_non_invasive_recommendations = next(( - x for x in non_invasive_recommendations if - (x["address"] == address.address) and (x["postcode"] == address.postcode) - ), {}) + property_non_invasive_recommendations = next( + ( + x + for x in non_invasive_recommendations + if (x["address"] == address.address) + and (x["postcode"] == address.postcode) + ), + {}, + ) if isinstance(property_non_invasive_recommendations.get("recommendations"), str): property_non_invasive_recommendations["recommendations"] = ast.literal_eval( @@ -90,7 +104,11 @@ def extract_property_request_data( transformed = [] for rec in property_non_invasive_recommendations["recommendations"]: if isinstance(rec, str): - transformed.append({"type": rec, }) + transformed.append( + { + "type": rec, + } + ) else: transformed.append(rec) @@ -102,26 +120,36 @@ def extract_property_request_data( valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None] if valuation_has_uprn: - property_valuation = next(( - float(x["valuation"]) for x in valuation_data if - (str(x["uprn"]) == str(uprn)) - ), None) + property_valuation = next( + ( + float(x["valuation"]) + for x in valuation_data + if (str(x["uprn"]) == str(uprn)) + ), + None, + ) else: - property_valuation = next(( - float(x["valuation"]) for x in valuation_data if - (x["address"] == address.address) and (x["postcode"] == address.postcode) - ), None) + property_valuation = next( + ( + float(x["valuation"]) + for x in valuation_data + if (x["address"] == address.address) + and (x["postcode"] == address.postcode) + ), + None, + ) # Return data class to give a structured format return PropertyRequestData( patch=patch, non_invasive_recommendations=property_non_invasive_recommendations, - valuation=property_valuation + valuation=property_valuation, ) -def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[ - None, None, None, list]: +def parse_eco_packages( + addr: Address, prepared_epc +) -> tuple[list[str], int, str, list[str]] | tuple[None, None, None, list]: solar_identification = addr.solar_reason cavity_identification = addr.cavity_reason if not solar_identification and not cavity_identification: @@ -140,47 +168,51 @@ def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str "Solar Eligible": { "measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"], "target_sap": 86, # High B - "plan_type": "solar_eco4" + "plan_type": "solar_eco4", }, "Solar Eligible, Solid Wall Uninsulated, EPC E or Below": { "measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"], "target_sap": 86, # High B - "plan_type": "solar_eco4" + "plan_type": "solar_eco4", }, "Solar Eligible, Needs Heating Upgrade": { - "measures": ["solar_pv", "loft_insulation", "high_heat_retention_storage_heaters", - "mechanical_ventilation"], + "measures": [ + "solar_pv", + "loft_insulation", + "high_heat_retention_storage_heaters", + "mechanical_ventilation", + ], "target_sap": 86, # High B - "plan_type": "solar_hhrsh_eco4" + "plan_type": "solar_hhrsh_eco4", }, "Non-Intrusive Data Shows Empty Cavity": { "measures": ["cavity_wall_insulation", "mechanical_ventilation"], "target_sap": 69, # Low C - "plan_type": "empty_cavity_eco" + "plan_type": "empty_cavity_eco", }, - 'Non-Intrusive Data Shows Empty Cavity, built after 2002': { + "Non-Intrusive Data Shows Empty Cavity, built after 2002": { "measures": ["cavity_wall_insulation", "mechanical_ventilation"], "target_sap": 69, # Low C - "plan_type": "empty_cavity_eco" + "plan_type": "empty_cavity_eco", }, "EPC Shows Empty Cavity, inspections show retro drilled": { # EPC Indicates it's empty, so we simulate a fill "measures": ["cavity_wall_insulation", "mechanical_ventilation"], "target_sap": 69, # Low C - "plan_type": "extraction_eco" + "plan_type": "extraction_eco", }, "EPC Shows Empty Cavity, inspections show filled at build": { # EPC Indicates it's empty, so we simulate a fill "measures": ["cavity_wall_insulation", "mechanical_ventilation"], "target_sap": 69, # Low C - "plan_type": "extraction_eco" + "plan_type": "extraction_eco", }, "EPC Shows Empty Cavity": { # EPC Indicates it's empty, so we simulate a fill "measures": ["cavity_wall_insulation", "mechanical_ventilation"], "target_sap": 69, # Low C - "plan_type": "empty_cavity_eco" - } + "plan_type": "empty_cavity_eco", + }, } # Always prioritise solar @@ -232,15 +264,21 @@ def build_cloudwatch_log_url(start_ms: int) -> str: ) -def handle_error(msg, e, subtask_id, status=500, start_ms=None): +def handle_error( + msg: str, + exception: Exception, + subtask_id: str, + status_code: int = 500, + start_ms: Optional[int] = None, +): # When the pipeline fails, handles error process cloud_logs_url = build_cloudwatch_log_url(start_ms) SubTaskInterface().update_subtask_status( subtask_id=UUID(subtask_id), status="failed", - outputs=str(e), - cloud_logs_url=cloud_logs_url + outputs=str(exception), + cloud_logs_url=cloud_logs_url, ) logger.error(msg, exc_info=True) - return Response(status_code=status, content=msg) + return Response(status_code=status_code, content=msg) diff --git a/backend/categorisation/categorisation_trigger_request.py b/backend/categorisation/categorisation_trigger_request.py index 17a5d916..62879b5d 100644 --- a/backend/categorisation/categorisation_trigger_request.py +++ b/backend/categorisation/categorisation_trigger_request.py @@ -11,7 +11,6 @@ class CategorisationTriggerRequest(BaseModel): min_property_id: Optional[int] = None max_property_id: Optional[int] = None - task_id: Optional[str] = None subtask_id: Optional[str] = None diff --git a/backend/categorisation/handler/handler.py b/backend/categorisation/handler/handler.py index eb532624..a1f69ea6 100644 --- a/backend/categorisation/handler/handler.py +++ b/backend/categorisation/handler/handler.py @@ -1,6 +1,9 @@ import json +import time from typing import Any, Mapping +from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from backend.app.plan.utils import build_cloudwatch_log_url from backend.categorisation.categorisation_trigger_request import ( CategorisationTriggerRequest, ) @@ -26,7 +29,6 @@ def handler(event: Mapping[str, Any], context: Any) -> None: logger.debug("Successfully validated request body") process_portfolio(payload) - except Exception as e: logger.info("Handler exception") logger.error(f"Failed to process record: {e}") diff --git a/backend/categorisation/handler/requirements.txt b/backend/categorisation/handler/requirements.txt index cbc2687a..6e737772 100644 --- a/backend/categorisation/handler/requirements.txt +++ b/backend/categorisation/handler/requirements.txt @@ -1,10 +1,10 @@ sqlmodel pydantic-settings psycopg2-binary==2.9.10 +starlette # Not used but needed to satisfy imports pytz==2024.2 msgpack==1.1.0 numpy<2 -pandas==2.2.3 -starlette \ No newline at end of file +pandas==2.2.3 \ No newline at end of file diff --git a/backend/categorisation/processor.py b/backend/categorisation/processor.py index 7a7d48ca..a212aac9 100644 --- a/backend/categorisation/processor.py +++ b/backend/categorisation/processor.py @@ -2,6 +2,7 @@ import time from collections import defaultdict from typing import Dict, List, Optional from uuid import UUID +from starlette.responses import Response from backend.app.db.functions.recommendations_functions import ( bulk_update_plans, @@ -14,7 +15,7 @@ from backend.app.db.functions.tasks.Tasks import SubTaskInterface from backend.app.db.models.recommendations import PlanModel, ScenarioModel from backend.app.domain.classes.plan import Plan from backend.app.domain.classes.scenario import Scenario -from backend.app.plan.utils import build_cloudwatch_log_url +from backend.app.plan.utils import build_cloudwatch_log_url, handle_error from backend.categorisation.categorisation_trigger_request import ( CategorisationTriggerRequest, ) @@ -25,7 +26,7 @@ logger = setup_logger() def process_portfolio( body: CategorisationTriggerRequest, -) -> None: # TODO: make this a class +) -> Response: # TODO: make this a class portfolio_id: int = body.portfolio_id scenarios_to_consider: Optional[List[int]] = body.scenarios_to_consider scenario_priority_order: Optional[List[int]] = body.scenario_priority_order @@ -36,74 +37,91 @@ def process_portfolio( logger.info(f"Processing portfolio {portfolio_id}") start_ms = int(time.time() * 1000) - all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id) - plans_by_id: Dict[int, Plan] = {} # TODO: make this an in-memory repository class + try: - if scenarios_to_consider: - if len(scenarios_to_consider) < 2: - raise ValueError( - "Cannot run auto categorisation for fewer than 2 scenarios" - ) + all_scenarios: List[Scenario] = _load_scenarios_for_portfolio(portfolio_id) + plans_by_id: Dict[int, Plan] = ( + {} + ) # TODO: make this an in-memory repository class - # first get all plans that we're interested in - plans_for_consideration: List[Plan] = _load_plans_for_portfolio( - portfolio_id, - all_scenarios, - scenarios_to_consider, - min_property_id, - max_property_id, - ) - for plan in plans_for_consideration: - if plan.id is not None: # just in case - plans_by_id[plan.id] = plan + if scenarios_to_consider: + if len(scenarios_to_consider) < 2: + raise ValueError( + "Cannot run auto categorisation for fewer than 2 scenarios" + ) - # then unset existing defaults on domain objects regardless of whether they're under consideration or not - default_plans: List[Plan] = _get_default_plans( - portfolio_id, all_scenarios, min_property_id, max_property_id - ) - for plan in default_plans: - plan.set_default(False) - if plan.id is not None: # just in case - plans_by_id[plan.id] = plan - - logger.info(f"Successfully unset {len(default_plans)} default plan(s)") - - # then set new defaults on domain objects under consideration - plans_for_consideration_by_property: Dict[int, List[Plan]] = ( - _group_plans_by_property(plans_for_consideration) - ) - - for property_id, property_plans in plans_for_consideration_by_property.items(): - if not property_plans: - raise ValueError(f"No plans for property {property_id}") - - try: - cheapest_plan = choose_cheapest_relevant_plan( - property_plans, scenario_priority_order - ) - except Exception: - logger.error(f"Failed to find cheapest plan for property {property_id}") - raise - - property_plans = _update_plan_objects(property_plans, cheapest_plan) - for plan in property_plans: + # first get all plans that we're interested in + plans_for_consideration: List[Plan] = _load_plans_for_portfolio( + portfolio_id, + all_scenarios, + scenarios_to_consider, + min_property_id, + max_property_id, + ) + for plan in plans_for_consideration: if plan.id is not None: # just in case plans_by_id[plan.id] = plan - logger.info("Successfully set defaults on Plan objects in memory") - - # then pass all domain objects to database to update (regardless of whether they've changed) - _update_plans_in_db(list(plans_by_id.values())) - logger.info(f"Successfully updated {len(plans_by_id)} Plans in database") - - # Mark the subtask as successful - if subtask_id: - cloud_logs_url = build_cloudwatch_log_url(start_ms) - SubTaskInterface().update_subtask_status( - subtask_id=UUID(subtask_id), - status="complete", - cloud_logs_url=cloud_logs_url, + # then unset existing defaults on domain objects regardless of whether they're under consideration or not + default_plans: List[Plan] = _get_default_plans( + portfolio_id, all_scenarios, min_property_id, max_property_id ) + for plan in default_plans: + plan.set_default(False) + if plan.id is not None: # just in case + plans_by_id[plan.id] = plan + + logger.info(f"Successfully unset {len(default_plans)} default plan(s)") + + # then set new defaults on domain objects under consideration + plans_for_consideration_by_property: Dict[int, List[Plan]] = ( + _group_plans_by_property(plans_for_consideration) + ) + + for property_id, property_plans in plans_for_consideration_by_property.items(): + if not property_plans: + raise ValueError(f"No plans for property {property_id}") + + try: + cheapest_plan = choose_cheapest_relevant_plan( + property_plans, scenario_priority_order + ) + except Exception: + logger.error(f"Failed to find cheapest plan for property {property_id}") + raise + + property_plans = _update_plan_objects(property_plans, cheapest_plan) + for plan in property_plans: + if plan.id is not None: # just in case + plans_by_id[plan.id] = plan + + logger.info("Successfully set defaults on Plan objects in memory") + + # then pass all domain objects to database to update (regardless of whether they've changed) + _update_plans_in_db(list(plans_by_id.values())) + + # Mark the subtask as successful + if body.subtask_id: + cloud_logs_url = build_cloudwatch_log_url(start_ms) + SubTaskInterface().update_subtask_status( + subtask_id=UUID(subtask_id), + status="complete", + cloud_logs_url=cloud_logs_url, + ) + logger.info(f"Successfully updated {len(plans_by_id)} Plans in database") + + return Response(status_code=200) + except Exception as e: + if subtask_id: + return handle_error( + "Exception during Categorisation processing.", + e, + subtask_id, + 500, + start_ms, + ) + + raise def choose_cheapest_relevant_plan(