mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
329 lines
10 KiB
Python
329 lines
10 KiB
Python
import os
|
|
import sys
|
|
|
|
print("=" * 60)
|
|
print("ENVIRONMENT AT STARTUP:")
|
|
print("=" * 60)
|
|
for k, v in sorted(os.environ.items()):
|
|
print(f"{k}={v}")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
import json
|
|
|
|
print("✓ json imported")
|
|
import pandas as pd
|
|
|
|
print("✓ pandas imported")
|
|
import requests
|
|
|
|
print("✓ requests imported")
|
|
from uuid import UUID
|
|
|
|
print("✓ UUID imported")
|
|
from urllib.parse import unquote
|
|
|
|
print("✓ urllib.parse imported")
|
|
from utils.s3 import read_csv_from_s3 as read_csv_from_s3_dict
|
|
|
|
print("✓ utils.s3 imported")
|
|
from tqdm import tqdm
|
|
|
|
print("✓ tqdm imported")
|
|
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
|
|
|
print("✓ SubTaskInterface imported")
|
|
from backend.address2UPRN.main import (
|
|
resolve_uprns_for_postcode_group,
|
|
get_epc_data_with_postcode,
|
|
)
|
|
|
|
print("✓ backend.address2UPRN imported")
|
|
except Exception as e:
|
|
print(f"✗ IMPORT ERROR: {type(e).__name__}: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
|
|
def parse_s3_console_url(s3_uri: str) -> tuple[str, str]:
|
|
"""
|
|
Parse AWS console S3 URL to extract bucket and key.
|
|
|
|
Format: https://account-id-hash.region.console.aws.amazon.com/s3/object/bucket?region=...&prefix=path
|
|
"""
|
|
if "console.aws.amazon.com" in s3_uri and "?prefix=" in s3_uri:
|
|
base, query = s3_uri.split("?", 1)
|
|
path_parts = base.split("/s3/object/")
|
|
if len(path_parts) > 1:
|
|
bucket = path_parts[1]
|
|
params = dict(item.split("=") for item in query.split("&") if "=" in item)
|
|
key = unquote(params.get("prefix", ""))
|
|
return bucket, key
|
|
raise ValueError(f"Could not parse S3 URI: {s3_uri}")
|
|
|
|
|
|
def sanitise_postcode(postcode: str) -> str | None:
|
|
"""
|
|
Normalise postcode for grouping.
|
|
|
|
- Uppercase
|
|
- Remove all whitespace
|
|
"""
|
|
if pd.isna(postcode):
|
|
return None
|
|
|
|
return postcode.upper().replace(" ", "")
|
|
|
|
|
|
def is_valid_postcode(postcode_clean: str) -> bool:
|
|
"""
|
|
Validate postcode using postcodes.io.
|
|
|
|
Expects a sanitised postcode (e.g. E84SQ).
|
|
Returns True if valid, False otherwise.
|
|
"""
|
|
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
|
|
if not postcode_clean:
|
|
return False
|
|
|
|
try:
|
|
resp = requests.get(
|
|
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
|
timeout=5,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json().get("result", False)
|
|
except requests.RequestException:
|
|
# Network issues, rate limits, etc.
|
|
return False
|
|
|
|
|
|
def main():
|
|
df = pd.read_excel("hackney.xlsx", sheet_name="Sustainability")
|
|
df = df.head(500)
|
|
|
|
# Sanitise postcodes
|
|
df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode)
|
|
|
|
# --- validate AFTER grouping (save API calls) ---
|
|
|
|
# Get unique, non-null postcodes
|
|
unique_postcodes = df["postcode_clean"].dropna().unique()
|
|
|
|
# Validate each postcode once, TODOadd a progress bar
|
|
postcode_validity = {
|
|
pc: is_valid_postcode(pc)
|
|
for pc in tqdm(unique_postcodes, total=len(unique_postcodes))
|
|
}
|
|
|
|
# Map validity back onto dataframe
|
|
df["postcode_valid"] = df["postcode_clean"].map(postcode_validity)
|
|
|
|
results = []
|
|
|
|
for postcode, group_df in tqdm(
|
|
df[df["postcode_valid"]].groupby("postcode_clean"),
|
|
desc="Resolving UPRNs by postcode",
|
|
):
|
|
try:
|
|
epc_df = get_epc_data_with_postcode(postcode)
|
|
|
|
if epc_df.empty:
|
|
tmp = group_df.copy()
|
|
tmp["found_uprn"] = None
|
|
tmp["status"] = "no_epc_results"
|
|
results.append(tmp)
|
|
continue
|
|
|
|
resolved = resolve_uprns_for_postcode_group(
|
|
group_df=group_df,
|
|
epc_df=epc_df,
|
|
)
|
|
|
|
results.append(resolved)
|
|
|
|
except Exception as e:
|
|
tmp = group_df.copy()
|
|
tmp["found_uprn"] = None
|
|
tmp["status"] = "exception"
|
|
tmp["error"] = str(e)
|
|
results.append(tmp)
|
|
|
|
final_df = pd.concat(results, ignore_index=True)
|
|
a = final_df[
|
|
[
|
|
"best_match_lexiscore",
|
|
"Address 1",
|
|
"best_match_address",
|
|
"Postcode",
|
|
"UPRN",
|
|
"best_match_uprn",
|
|
]
|
|
] # add levi score to viewing
|
|
b = final_df[final_df["best_match_lexiscore"] > 0] # add levi score to viewing
|
|
b = b[
|
|
[
|
|
"best_match_lexiscore",
|
|
"Address 1",
|
|
"best_match_address",
|
|
"Postcode",
|
|
"UPRN",
|
|
"best_match_uprn",
|
|
]
|
|
]
|
|
|
|
|
|
def handler(event, context):
|
|
print("=" * 60)
|
|
print("HANDLER INVOKED")
|
|
print("=" * 60)
|
|
print(f"Function: {context.function_name}")
|
|
print(f"Request ID: {context.aws_request_id}")
|
|
print(f"Event received: {type(event)}")
|
|
print(f"Event keys: {event.keys() if isinstance(event, dict) else 'N/A'}")
|
|
|
|
# Example SQS message for testing (copy and paste into SQS):
|
|
# {
|
|
# "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
|
# "s3_uri": "https://337213553626-7ovirzjr.eu-west-2.console.aws.amazon.com/s3/object/retrofit-data-dev?region=eu-west-2&prefix=ara_raw_inputs/peabody/2025_11_11+-+Peabody+-+Data+Extracts+for+Domna_transformed.csv",
|
|
# }
|
|
|
|
# Handle both single event and batch events (SQS, etc.)
|
|
print("Extracting records from event...")
|
|
records = event.get("Records", [event])
|
|
print(f"Found {len(records)} record(s) to process")
|
|
results = []
|
|
errors = []
|
|
|
|
print("Initializing SubTaskInterface...")
|
|
subtask_interface = SubTaskInterface()
|
|
print("✓ SubTaskInterface initialized")
|
|
|
|
for record in records:
|
|
print("Processing record...")
|
|
task_id = None
|
|
subtask_id = None
|
|
try:
|
|
# Parse body
|
|
print("Parsing body from record...")
|
|
if isinstance(record.get("body"), str):
|
|
body = json.loads(record["body"])
|
|
else:
|
|
body = record.get("body", {})
|
|
print(f"Body parsed: {body}")
|
|
|
|
# Validate required fields
|
|
task_id = body.get("task_id")
|
|
s3_uri = body.get("s3_uri")
|
|
print(f"task_id: {task_id}, s3_uri: {s3_uri}")
|
|
|
|
if not task_id:
|
|
errors.append({"error": "Missing required field: task_id"})
|
|
continue
|
|
|
|
if not s3_uri:
|
|
errors.append({"error": "Missing required field: s3_uri"})
|
|
continue
|
|
|
|
# Convert task_id to UUID
|
|
print("Converting task_id to UUID...")
|
|
try:
|
|
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
|
|
print(f"UUID conversion successful: {task_id}")
|
|
except ValueError as e:
|
|
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
|
|
continue
|
|
|
|
# Create a new subtask for this postcode splitter invocation
|
|
print(f"Creating subtask for task {task_id}...")
|
|
subtask_id = subtask_interface.create_subtask(
|
|
task_id=task_id, inputs={"s3_uri": s3_uri}
|
|
)
|
|
print(f"Created subtask {subtask_id} for task {task_id}")
|
|
|
|
# Process normal flow
|
|
print(f"Processing task_id: {task_id}")
|
|
print(f"Processing s3_uri: {s3_uri}")
|
|
|
|
# Read CSV from S3
|
|
print("Parsing S3 URI...")
|
|
bucket, key = parse_s3_console_url(s3_uri)
|
|
print(f"Bucket: {bucket}, Key: {key}")
|
|
|
|
print("Fetching CSV from S3...")
|
|
csv_data = read_csv_from_s3_dict(bucket, key)
|
|
print(f"CSV fetched: {len(csv_data)} rows")
|
|
|
|
print("Creating DataFrame...")
|
|
df = pd.DataFrame(csv_data)
|
|
print(f"DataFrame created: {len(df)} rows, {len(df.columns)} columns")
|
|
|
|
# Get head for demo
|
|
print("Getting DataFrame head...")
|
|
df_head = df.head()
|
|
print("DataFrame head:")
|
|
print(df_head)
|
|
df_head_dict = df_head.to_dict("records")
|
|
|
|
print("Appending result...")
|
|
results.append(
|
|
{
|
|
"message": "Postcode splitter processing started",
|
|
"task_id": str(task_id),
|
|
"s3_uri": s3_uri,
|
|
"subtask_id": str(subtask_id),
|
|
}
|
|
)
|
|
print("Result appended")
|
|
|
|
# Mark subtask as complete after successful processing
|
|
print("Updating subtask status to complete...")
|
|
subtask_interface.update_subtask_status(
|
|
subtask_id,
|
|
"complete",
|
|
outputs={
|
|
"status": "processing_complete",
|
|
"s3_uri": s3_uri,
|
|
"rows_processed": len(df),
|
|
},
|
|
)
|
|
print(f"Subtask {subtask_id} marked as complete")
|
|
|
|
except json.JSONDecodeError as e:
|
|
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
|
|
# Mark subtask as failed if we have one
|
|
if subtask_id:
|
|
try:
|
|
subtask_interface.update_subtask_status(
|
|
subtask_id, "failed", outputs={"error": str(e)}
|
|
)
|
|
except Exception as db_error:
|
|
print(f"Failed to update subtask status: {db_error}")
|
|
except Exception as e:
|
|
print(f"Unexpected error processing record: {e}")
|
|
errors.append({"error": "Unexpected error", "details": str(e)})
|
|
# Mark subtask as failed if we have one
|
|
if subtask_id:
|
|
try:
|
|
subtask_interface.update_subtask_status(
|
|
subtask_id, "failed", outputs={"error": str(e)}
|
|
)
|
|
except Exception as db_error:
|
|
print(f"Failed to update subtask status: {db_error}")
|
|
|
|
# Return error if all records failed
|
|
if errors and not results:
|
|
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
|
|
|
|
return {
|
|
"statusCode": 200,
|
|
"body": json.dumps(
|
|
{"processed": results, "errors": errors if errors else None}
|
|
),
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|