From d20725d12b91a4fd0fd496139a2b7fa0ea0e0eaf Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 26 Nov 2025 21:42:09 +0000 Subject: [PATCH] adding tasks to engine --- backend/app/db/functions/tasks/Tasks.py | 13 ++++++++++++- backend/app/db/models/materials.py | 1 + backend/app/plan/router.py | 9 ++++----- backend/app/plan/utils.py | 13 ++++++++++--- backend/engine/engine.py | 24 ++++++++++++++---------- 5 files changed, 41 insertions(+), 19 deletions(-) diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 7508ab2e..5a3ae699 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -51,7 +51,16 @@ class SubTaskInterface: # -------------------------------------------------------- # UPDATE STATUS (in progress, complete, failed) # -------------------------------------------------------- - def update_subtask_status(self, subtask_id: UUID, status: str): + def update_subtask_status( + self, subtask_id: UUID, status: str, outputs=None + ): + """ + Update the status of a subtask, and recalculate the parent task progress. + :param subtask_id: UUID of the subtask to update + :param status: New status (in progress, complete, failed) + :param outputs: Optional outputs to set + :return: + """ now = datetime.now(timezone.utc) with get_db_session() as session: @@ -71,6 +80,8 @@ class SubTaskInterface: subtask.status = normalized subtask.updated_at = now + if outputs is not None: + subtask.outputs = json.dumps(outputs) session.add(subtask) session.commit() diff --git a/backend/app/db/models/materials.py b/backend/app/db/models/materials.py index 9b38addd..99759438 100644 --- a/backend/app/db/models/materials.py +++ b/backend/app/db/models/materials.py @@ -20,6 +20,7 @@ class MaterialType(enum.Enum): room_roof_insulation = "room_roof_insulation" windows_glazing = "windows_glazing" secondary_glazing = "secondary_glazing" + double_glazing = "double_glazing" cavity_wall_extraction = "cavity_wall_extraction" iwi_wall_demolition = "iwi_wall_demolition" diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 2b2306ee..d143dc95 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -15,6 +15,7 @@ from utils.logger import setup_logger from backend.app.db.connection import db_engine from backend.app.db.functions.recommendations_functions import create_scenario +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface logger = setup_logger() @@ -81,10 +82,8 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): # Insert the scenario ID into the data payload data["scenario_id"] = scenario_id - # Create a task, and associated sub-tasks - from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface # Create a main task - task_id = TasksInterface.create_task( + task_id, _ = TasksInterface.create_task( task_source="backend/plan/router.py:trigger_plan_entrypoint", service="plan_engine", inputs=data, @@ -92,7 +91,6 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): ) subtask_interface = SubTaskInterface() - for i in range(total_chunks): # Create an entry in the request logs table index_start = i * chunk_size @@ -101,7 +99,6 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): message_payload = { **data, "index_start": index_start, "index_end": index_end, } - message_body = json.dumps(message_payload) # Create a subtask for this chunk subtask_id = subtask_interface.create_subtask( @@ -113,6 +110,8 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): message_payload["task_id"] = str(task_id) message_payload["subtask_id"] = str(subtask_id) + message_body = json.dumps(message_payload) + response = sqs_client.send_message( QueueUrl=settings.ENGINE_SQS_URL, MessageBody=message_body diff --git a/backend/app/plan/utils.py b/backend/app/plan/utils.py index ea328d5b..3f2e0e1f 100644 --- a/backend/app/plan/utils.py +++ b/backend/app/plan/utils.py @@ -1,8 +1,10 @@ import msgpack +from uuid import UUID +from typing import Any from utils.s3 import read_from_s3 from backend.app.config import get_settings from backend.app.plan.data_classes import PropertyRequestData -from typing import Any +from backend.app.db.functions.tasks.Tasks import SubTaskInterface from starlette.responses import Response from utils.logger import setup_logger @@ -64,7 +66,7 @@ def extract_property_request_data( x for x in already_installed if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) ), []) - + # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN # we need to check existence of uprn has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False @@ -211,8 +213,13 @@ def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], return measures, mapped["target_sap"], mapped["plan_type"], already_installed -def handle_error(session, msg, status=500): +def handle_error(session, msg, e, subtask_id, status=500): # When the pipeline fails, handles error process + SubTaskInterface().update_subtask_status( + subtask_id=UUID(subtask_id), + status="failed", + outputs=str(e) + ) logger.error(msg, exc_info=True) session.rollback() return Response(status_code=status, content=msg) diff --git a/backend/engine/engine.py b/backend/engine/engine.py index a7743b90..a172972a 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -430,6 +430,9 @@ async def model_engine(body: PlanTriggerRequest): ) # Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN # This will be reflexted + if "estimated" not in plan_input.columns: + plan_input["estimated"] = False + plan_input["uprn"] = np.where( plan_input["estimated"].isin([1, True]) & ( (plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"]) @@ -527,7 +530,7 @@ async def model_engine(body: PlanTriggerRequest): address1 = config.get("address", None) # Handle domna address list format if pd.isnull(address1) and body.file_format == "domna_asset_list": - address1 = config.get("domna_full_address", None) + address1 = config.get("domna_address_1", None) address1 = str(int(address1)) if isinstance(address1, float) else str(address1) full_address = config["domna_full_address"] if body.file_format == "domna_asset_list" else None @@ -1179,8 +1182,9 @@ async def model_engine(body: PlanTriggerRequest): ) db_funcs.property_functions.create_property_details_epc(session, property_details_epc) - db_funcs.property_functions.update_or_create_property_spatial_details(session, p.uprn, - p.spatial) + db_funcs.property_functions.update_or_create_property_spatial_details( + session, p.uprn, p.spatial + ) property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) @@ -1262,14 +1266,14 @@ async def model_engine(body: PlanTriggerRequest): # Commit final changes session.commit() - except IntegrityError: - return handle_error(session, "Database integrity error.", 500) - except OperationalError: - return handle_error(session, "Database operational error.", 500) - except ValueError: - return handle_error(session, "Bad request: malformed data.", 400) + except IntegrityError as e: + return handle_error(session, "Database integrity error.", e, body.subtask_id, 500) + except OperationalError as e: + return handle_error(session, "Database operational error.", e, body.subtask_id, 500) + except ValueError as e: + return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400) except Exception as e: # General exception handling - return handle_error(session, "An unexpected error occurred.", 500) + return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500) finally: session.close()