survey-extraction/deployment/lambda/lambda_example/docker/app.py
2025-11-14 15:30:32 +00:00

181 lines
5.3 KiB
Python

import os
import json
import time
import requests
# ============================================================
# CONFIG
# ============================================================
API_BASE = "https://api.dev.hestia.homes/v1/tasks/" # e.g. https://api.myserver.com/tasks
API_TOKEN = "4QPwbB6hEdUloDVtbBJCUTfGBdBgWwpeavWQ7t5Z"
# ============================================================
# Low-level API wrapper
# ============================================================
def api(method, path, body=None):
"""Generic function to call your backend Task API."""
url = f"{API_BASE}{path}"
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
}
print(f"[API] {method} {url}")
if body:
print(f"[API] Payload: {json.dumps(body)}")
response = requests.request(
method,
url,
headers=headers,
data=json.dumps(body) if body else None
)
print(f"[API] Response: {response.status_code}")
if response.status_code >= 400:
print(f"[API ERROR] {response.text}")
raise Exception(f"API Error {response.status_code}: {response.text}")
return response.json()
# ============================================================
# CloudWatch Logs URL generator (with time range)
# ============================================================
def build_cloudwatch_url(context, start_ms, end_ms):
region = context.invoked_function_arn.split(":")[3]
log_group = context.log_group_name
log_stream = context.log_stream_name
log_group_enc = log_group.replace("/", "$252F")
log_stream_enc = log_stream.replace("/", "$252F")
return (
f"https://{region}.console.aws.amazon.com/cloudwatch/home"
f"?region={region}#logsV2:log-groups/log-group/{log_group_enc}"
f"/log-events/{log_stream_enc}"
f"$3Fstart$3D{start_ms}$26end$3D{end_ms}"
)
# ============================================================
# Subtask lifecycle action helpers
# ============================================================
def mark_in_progress(subtask_id):
print(f"[SUBTASK] Marking {subtask_id} as IN PROGRESS")
return api("PUT", f"/subtasks/{subtask_id}/status", {"status": "in progress"})
def mark_complete(subtask_id, outputs, cloud_logs):
print(f"[SUBTASK] Marking {subtask_id} as COMPLETE")
return api("POST", f"/subtasks/{subtask_id}/finalize", {
"status": "complete",
"outputs": outputs,
"cloud_logs_url": cloud_logs
})
def mark_failed(subtask_id, error_message, cloud_logs):
print(f"[SUBTASK] Marking {subtask_id} as FAILED")
return api("POST", f"/subtasks/{subtask_id}/finalize", {
"status": "failed",
"outputs": {"error": error_message},
"cloud_logs_url": cloud_logs
})
# ============================================================
# Lambda Handler
# ============================================================
def handler(event, context):
"""
Example Lambda that:
- marks subtask in progress
- prints simulated work steps
- marks complete or failed
- attaches CloudWatch Log URL
"""
subtask_id = event["subtask_id"]
print("=====================================")
print(" 🚀 LAMBDA INVOCATION STARTED")
print(f" 🧩 Subtask ID: {subtask_id}")
print("=====================================")
start_ms = int(time.time() * 1000)
try:
# ---- Step 1: Mark subtask IN PROGRESS ----
mark_in_progress(subtask_id)
print("\n---- Simulating Work ----")
print("Step 1: Loading resources...")
time.sleep(0.5)
print("Step 2: Calling an external service...")
time.sleep(0.7)
print("Step 3: Processing data...")
time.sleep(1.0)
# UNCOMMENT TO TEST FAILURE
# raise Exception("Simulated failure for testing")
print("Step 4: Finalizing outputs...")
time.sleep(0.5)
outputs = {
"result": "success",
"calculated_value": 123,
"details": "This is example output from the handler.",
"input_params": event.get("params", {})
}
print("Work completed successfully.")
end_ms = int(time.time() * 1000)
cloud_logs_url = build_cloudwatch_url(context, start_ms, end_ms)
print(f"Generated CloudWatch URL:\n{cloud_logs_url}\n")
# ---- Step 2: Mark COMPLETE ----
mark_complete(subtask_id, outputs, cloud_logs_url)
print("=====================================")
print(" ✅ LAMBDA EXECUTION COMPLETE")
print("=====================================")
return {
"statusCode": 200,
"body": outputs,
"cloudwatch_logs": cloud_logs_url
}
except Exception as e:
print("=====================================")
print(" ❌ ERROR IN LAMBDA EXECUTION")
print("=====================================")
print(f"Error: {e}")
end_ms = int(time.time() * 1000)
cloud_logs_url = build_cloudwatch_url(context, start_ms, end_ms)
# ---- Step 3: Mark FAILED ----
mark_failed(subtask_id, str(e), cloud_logs_url)
return {
"statusCode": 500,
"error": str(e),
"cloudwatch_logs": cloud_logs_url
}