mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge pull request #560 from Hestia-Homes/eco-eligiblity-bug
added cloudwatch url to logs
This commit is contained in:
commit
a8da2d9d1c
3 changed files with 66 additions and 17 deletions
|
|
@ -55,13 +55,14 @@ class SubTaskInterface:
|
|||
# UPDATE STATUS (in progress, complete, failed)
|
||||
# --------------------------------------------------------
|
||||
def update_subtask_status(
|
||||
self, subtask_id: UUID, status: str, outputs=None
|
||||
self, subtask_id: UUID, status: str, outputs=None, cloud_logs_url=None
|
||||
):
|
||||
"""
|
||||
Update the status of a subtask, and recalculate the parent task progress.
|
||||
:param subtask_id: UUID of the subtask to update
|
||||
:param status: New status (in progress, complete, failed)
|
||||
:param outputs: Optional outputs to set
|
||||
:param cloud_logs_url: Optional cloud logs URL to set
|
||||
:return:
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
|
|
@ -86,6 +87,9 @@ class SubTaskInterface:
|
|||
if outputs is not None:
|
||||
subtask.outputs = json.dumps(outputs)
|
||||
|
||||
if cloud_logs_url is not None:
|
||||
subtask.cloud_logs_url = cloud_logs_url
|
||||
|
||||
session.add(subtask)
|
||||
session.commit()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
import os
|
||||
import time
|
||||
import msgpack
|
||||
from uuid import UUID
|
||||
from typing import Any
|
||||
|
|
@ -213,12 +215,40 @@ def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str],
|
|||
return measures, mapped["target_sap"], mapped["plan_type"], already_installed
|
||||
|
||||
|
||||
def handle_error(session, msg, e, subtask_id, status=500):
|
||||
def build_cloudwatch_log_url(start_ms: int, end_ms: int) -> str:
|
||||
"""
|
||||
Build a CloudWatch Logs URL for the current Lambda invocation,
|
||||
including timestamp window from start_ms to end_ms (epoch ms).
|
||||
"""
|
||||
region = os.environ["AWS_REGION"]
|
||||
log_group = os.environ["AWS_LAMBDA_LOG_GROUP_NAME"]
|
||||
log_stream = os.environ["AWS_LAMBDA_LOG_STREAM_NAME"]
|
||||
|
||||
# CloudWatch console requires / encoded as $252F
|
||||
encoded_group = log_group.replace("/", "$252F")
|
||||
encoded_stream = log_stream.replace("/", "$252F")
|
||||
|
||||
# Return the full URL with time range
|
||||
return (
|
||||
f"https://console.aws.amazon.com/cloudwatch/home?"
|
||||
f"region={region}"
|
||||
f"#logsV2:log-groups/log-group/{encoded_group}"
|
||||
f"/log-events/{encoded_stream}"
|
||||
f"$3Fstart={start_ms}"
|
||||
f"$26end={end_ms}"
|
||||
)
|
||||
|
||||
|
||||
def handle_error(session, msg, e, subtask_id, status=500, start_ms=None):
|
||||
# When the pipeline fails, handles error process
|
||||
end_ms = int(time.time() * 1000)
|
||||
cloud_logs_url = build_cloudwatch_log_url(start_ms, end_ms)
|
||||
|
||||
SubTaskInterface().update_subtask_status(
|
||||
subtask_id=UUID(subtask_id),
|
||||
status="failed",
|
||||
outputs=str(e)
|
||||
outputs=str(e),
|
||||
cloud_logs_url=cloud_logs_url
|
||||
)
|
||||
logger.error(msg, exc_info=True)
|
||||
session.rollback()
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
import os
|
||||
import time
|
||||
import json
|
||||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
|
|
@ -23,7 +25,7 @@ from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
|||
from backend.app.db.models.portfolio import rating_lookup
|
||||
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
|
||||
from backend.app.plan.utils import (
|
||||
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error
|
||||
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url
|
||||
)
|
||||
from backend.app.utils import sap_to_epc
|
||||
import backend.app.assumptions as assumptions
|
||||
|
|
@ -409,6 +411,7 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
logger.info("Connecting to db")
|
||||
session = sessionmaker(bind=db_engine)()
|
||||
created_at = datetime.now().isoformat()
|
||||
start_ms = int(time.time() * 1000)
|
||||
|
||||
# TODO: if the measure is already installed, it should actually be the very first phase
|
||||
|
||||
|
|
@ -619,19 +622,28 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
# if we have a remote assment data type, we pull the additional data and include it
|
||||
epc_page_source = {}
|
||||
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
|
||||
logger.info("Retrieving find my epc data")
|
||||
try:
|
||||
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
|
||||
epc_searcher.newest_epc, epc_page, rrn=rrn
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to retrieve without cleaning address {e}")
|
||||
for k in ["address", "address1"]:
|
||||
epc_searcher.newest_epc[k] = epc_searcher.address_clean
|
||||
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
|
||||
epc_searcher.newest_epc, epc_page, rrn=rrn
|
||||
)
|
||||
|
||||
try:
|
||||
epc_to_use = deepcopy(epc_searcher.newest_epc)
|
||||
for k in ["address", "address1"]:
|
||||
epc_to_use[k] = epc_searcher.address_clean
|
||||
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
|
||||
epc_to_use, epc_page, rrn=rrn
|
||||
)
|
||||
except Exception as e:
|
||||
# Final attempt
|
||||
logger.error(f"Failed to retrieve without cleaning address {e}")
|
||||
epc_to_use = deepcopy(epc_searcher.newest_epc)
|
||||
for k in ["address", "address1"]:
|
||||
epc_to_use[k] = config["address"]
|
||||
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
|
||||
epc_to_use, epc_page, rrn=rrn
|
||||
)
|
||||
# If we have a property type, this means when we pull the epc data, we might need to make a patch
|
||||
|
||||
epc_records = patch_epc(patch, epc_records)
|
||||
|
|
@ -1267,20 +1279,23 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
|
||||
# Commit final changes
|
||||
session.commit()
|
||||
|
||||
except IntegrityError as e:
|
||||
return handle_error(session, "Database integrity error.", e, body.subtask_id, 500)
|
||||
return handle_error(session, "Database integrity error.", e, body.subtask_id, 500, start_ms)
|
||||
except OperationalError as e:
|
||||
return handle_error(session, "Database operational error.", e, body.subtask_id, 500)
|
||||
return handle_error(session, "Database operational error.", e, body.subtask_id, 500, start_ms)
|
||||
except ValueError as e:
|
||||
return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400)
|
||||
return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400, start_ms)
|
||||
except Exception as e: # General exception handling
|
||||
return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500)
|
||||
return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500, start_ms)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
end_ms = int(time.time() * 1000)
|
||||
cloud_logs_url = build_cloudwatch_log_url(start_ms, end_ms)
|
||||
# Mark the subtask as successful
|
||||
SubTaskInterface().update_subtask_status(subtask_id=UUID(body.subtask_id), status="failed")
|
||||
SubTaskInterface().update_subtask_status(
|
||||
subtask_id=UUID(body.subtask_id), status="complete", cloud_logs_url=cloud_logs_url
|
||||
)
|
||||
|
||||
logger.info("Model Engine completed successfully")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue