From 3933942cd8e1c5de27ee6b27176c3aa6b1caf855 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 27 Nov 2025 12:51:25 +0000 Subject: [PATCH] added cloudwatch url --- backend/app/db/functions/tasks/Tasks.py | 6 +++- backend/app/plan/utils.py | 34 +++++++++++++++++-- backend/engine/engine.py | 43 +++++++++++++++++-------- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 12a2e51b..30acf370 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -55,13 +55,14 @@ class SubTaskInterface: # UPDATE STATUS (in progress, complete, failed) # -------------------------------------------------------- def update_subtask_status( - self, subtask_id: UUID, status: str, outputs=None + self, subtask_id: UUID, status: str, outputs=None, cloud_logs_url=None ): """ Update the status of a subtask, and recalculate the parent task progress. :param subtask_id: UUID of the subtask to update :param status: New status (in progress, complete, failed) :param outputs: Optional outputs to set + :param cloud_logs_url: Optional cloud logs URL to set :return: """ now = datetime.now(timezone.utc) @@ -86,6 +87,9 @@ class SubTaskInterface: if outputs is not None: subtask.outputs = json.dumps(outputs) + if cloud_logs_url is not None: + subtask.cloud_logs_url = cloud_logs_url + session.add(subtask) session.commit() diff --git a/backend/app/plan/utils.py b/backend/app/plan/utils.py index 3f2e0e1f..569eafd1 100644 --- a/backend/app/plan/utils.py +++ b/backend/app/plan/utils.py @@ -1,3 +1,5 @@ +import os +import time import msgpack from uuid import UUID from typing import Any @@ -213,12 +215,40 @@ def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], return measures, mapped["target_sap"], mapped["plan_type"], already_installed -def handle_error(session, msg, e, subtask_id, status=500): +def build_cloudwatch_log_url(start_ms: int, end_ms: int) -> str: + """ + Build a CloudWatch Logs URL for the current Lambda invocation, + including timestamp window from start_ms to end_ms (epoch ms). + """ + region = os.environ["AWS_REGION"] + log_group = os.environ["AWS_LAMBDA_LOG_GROUP_NAME"] + log_stream = os.environ["AWS_LAMBDA_LOG_STREAM_NAME"] + + # CloudWatch console requires / encoded as $252F + encoded_group = log_group.replace("/", "$252F") + encoded_stream = log_stream.replace("/", "$252F") + + # Return the full URL with time range + return ( + f"https://console.aws.amazon.com/cloudwatch/home?" + f"region={region}" + f"#logsV2:log-groups/log-group/{encoded_group}" + f"/log-events/{encoded_stream}" + f"$3Fstart={start_ms}" + f"$26end={end_ms}" + ) + + +def handle_error(session, msg, e, subtask_id, status=500, start_ms=None): # When the pipeline fails, handles error process + end_ms = int(time.time() * 1000) + cloud_logs_url = build_cloudwatch_log_url(start_ms, end_ms) + SubTaskInterface().update_subtask_status( subtask_id=UUID(subtask_id), status="failed", - outputs=str(e) + outputs=str(e), + cloud_logs_url=cloud_logs_url ) logger.error(msg, exc_info=True) session.rollback() diff --git a/backend/engine/engine.py b/backend/engine/engine.py index be770d8e..ce0505e2 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -1,3 +1,5 @@ +import os +import time import json from copy import deepcopy from datetime import datetime @@ -23,7 +25,7 @@ from backend.app.db.functions.tasks.Tasks import SubTaskInterface from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES from backend.app.plan.utils import ( - get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error + get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url ) from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions @@ -409,6 +411,7 @@ async def model_engine(body: PlanTriggerRequest): logger.info("Connecting to db") session = sessionmaker(bind=db_engine)() created_at = datetime.now().isoformat() + start_ms = int(time.time() * 1000) # TODO: if the measure is already installed, it should actually be the very first phase @@ -619,19 +622,28 @@ async def model_engine(body: PlanTriggerRequest): # if we have a remote assment data type, we pull the additional data and include it epc_page_source = {} if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")): - logger.info("Retrieving find my epc data") try: property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( epc_searcher.newest_epc, epc_page, rrn=rrn ) except Exception as e: logger.error(f"Failed to retrieve without cleaning address {e}") - for k in ["address", "address1"]: - epc_searcher.newest_epc[k] = epc_searcher.address_clean - property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( - epc_searcher.newest_epc, epc_page, rrn=rrn - ) - + try: + epc_to_use = deepcopy(epc_searcher.newest_epc) + for k in ["address", "address1"]: + epc_to_use[k] = epc_searcher.address_clean + property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( + epc_to_use, epc_page, rrn=rrn + ) + except Exception as e: + # Final attempt + logger.error(f"Failed to retrieve without cleaning address {e}") + epc_to_use = deepcopy(epc_searcher.newest_epc) + for k in ["address", "address1"]: + epc_to_use[k] = config["address"] + property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc( + epc_to_use, epc_page, rrn=rrn + ) # If we have a property type, this means when we pull the epc data, we might need to make a patch epc_records = patch_epc(patch, epc_records) @@ -1267,20 +1279,23 @@ async def model_engine(body: PlanTriggerRequest): # Commit final changes session.commit() - except IntegrityError as e: - return handle_error(session, "Database integrity error.", e, body.subtask_id, 500) + return handle_error(session, "Database integrity error.", e, body.subtask_id, 500, start_ms) except OperationalError as e: - return handle_error(session, "Database operational error.", e, body.subtask_id, 500) + return handle_error(session, "Database operational error.", e, body.subtask_id, 500, start_ms) except ValueError as e: - return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400) + return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400, start_ms) except Exception as e: # General exception handling - return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500) + return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500, start_ms) finally: session.close() + end_ms = int(time.time() * 1000) + cloud_logs_url = build_cloudwatch_log_url(start_ms, end_ms) # Mark the subtask as successful - SubTaskInterface().update_subtask_status(subtask_id=UUID(body.subtask_id), status="failed") + SubTaskInterface().update_subtask_status( + subtask_id=UUID(body.subtask_id), status="complete", cloud_logs_url=cloud_logs_url + ) logger.info("Model Engine completed successfully")