mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
224 lines
7.8 KiB
Python
224 lines
7.8 KiB
Python
from typing import Any, Optional
|
|
import json
|
|
from utils.logger import setup_logger
|
|
import logging
|
|
from backend.utils.subtasks import subtask_handler
|
|
from utils.s3 import (
|
|
save_csv_to_s3,
|
|
read_csv_from_s3 as read_csv_from_s3_dict,
|
|
parse_s3_uri,
|
|
)
|
|
from backend.utils.addressMatch import AddressMatch
|
|
from backend.app.db.connection import get_db_session
|
|
from backend.app.db.models.postcode_search import PostcodeSearchModel
|
|
from backend.utils.ordnance_survey import (
|
|
lookup_os_places,
|
|
os_places_results_to_dataframe,
|
|
)
|
|
from backend.app.config import get_settings
|
|
from sqlalchemy import select
|
|
from datetime import datetime
|
|
import uuid
|
|
import os
|
|
|
|
import pandas as pd
|
|
|
|
logger: logging.Logger = setup_logger()
|
|
|
|
|
|
def check_if_post_code_exists_in_db_cache(postcode):
|
|
|
|
with get_db_session() as session:
|
|
result = (
|
|
session.execute(
|
|
select(PostcodeSearchModel).where(
|
|
PostcodeSearchModel.postcode == postcode
|
|
)
|
|
)
|
|
.scalars()
|
|
.first()
|
|
)
|
|
if result:
|
|
return os_places_results_to_dataframe(result.result_data)
|
|
|
|
# Cache miss — fetch from OS Places API
|
|
api_key = get_settings().ORDNANCE_SURVEY_API_KEY
|
|
response = lookup_os_places(postcode, api_key)
|
|
|
|
if response.get("status") != 200 or "data" not in response:
|
|
logger.error(f"OS Places API failed for {postcode}: {response}")
|
|
return pd.DataFrame()
|
|
|
|
# Save to cache
|
|
new_record = PostcodeSearchModel(
|
|
postcode=postcode,
|
|
result_data=response["data"],
|
|
)
|
|
session.add(new_record)
|
|
session.commit()
|
|
|
|
return os_places_results_to_dataframe(response["data"])
|
|
|
|
|
|
def get_ordance_survey_record(row, cache=None):
|
|
if cache is None:
|
|
cache = check_if_post_code_exists_in_db_cache(postcode)
|
|
|
|
# process cache with row
|
|
|
|
|
|
def save_results_to_s3(
|
|
results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
|
|
) -> bool:
|
|
"""
|
|
Save results DataFrame to S3 as CSV in a parent folder structure.
|
|
|
|
:param results_df: The DataFrame containing results
|
|
:param task_id: The task ID (used for file naming)
|
|
:param sub_task_id: The subtask ID (used for file naming)
|
|
:param bucket_name: The S3 bucket name (defaults to env variable)
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
if bucket_name is None:
|
|
bucket_name = os.getenv("S3_BUCKET_NAME")
|
|
|
|
if not bucket_name:
|
|
logger.error(
|
|
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
|
|
)
|
|
return False
|
|
|
|
try:
|
|
# Create a filename with timestamp and UUID
|
|
file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}"
|
|
file_key = f"ara_ordnance_survey_outputs/{task_id}/{sub_task_id}/ordnanceSurvey/{file_name}.csv"
|
|
|
|
# Save to S3
|
|
success = save_csv_to_s3(results_df, bucket_name, file_key)
|
|
|
|
if success:
|
|
logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to save results to S3")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving results to S3: {str(e)}")
|
|
return False
|
|
|
|
|
|
@subtask_handler() # This assumes task_id and subtask_id is defined in event.Records.body
|
|
def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
|
|
|
|
# delete this line after test
|
|
# local = True
|
|
# Example SQS message for testing (copy and paste into SQS):
|
|
if local is True:
|
|
body = {
|
|
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
|
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
|
|
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv",
|
|
"lexiscore_column": "address2uprn_lexiscore",
|
|
}
|
|
|
|
s3_uri: str = body.get("s3_uri", "")
|
|
lexiscore_threshold: float = body.get("lexiscore_threshold", 0.5)
|
|
lexiscore_column: Optional[str] = body.get("lexiscore_column", None)
|
|
task_id: str = body.get("task_id", "")
|
|
sub_task_id: str = body.get("sub_task_id", "")
|
|
|
|
if s3_uri == "":
|
|
raise RuntimeError("Missing s3_uri in message body")
|
|
|
|
bucket, key = parse_s3_uri(s3_uri)
|
|
|
|
# Assumption designing with address2uprn was ran first
|
|
csv_data = read_csv_from_s3_dict(bucket, key)
|
|
df = pd.DataFrame(csv_data)
|
|
# df = df.head(5)
|
|
|
|
# If lexiscore_column is specified, use it; otherwise process all rows
|
|
if lexiscore_column and lexiscore_column in df.columns:
|
|
df[lexiscore_column] = pd.to_numeric(df[lexiscore_column], errors="coerce")
|
|
needs_processing = df[
|
|
df[lexiscore_column].isna() | (df[lexiscore_column] < lexiscore_threshold)
|
|
]
|
|
else:
|
|
# Default: process all rows
|
|
needs_processing = df
|
|
|
|
grouped = needs_processing.groupby("postcode_clean")
|
|
|
|
# Initialise new columns
|
|
df["ordnance_survey_address"] = None
|
|
df["ordnance_survey_uprn"] = None
|
|
df["ordnance_survey_lexiscore"] = None
|
|
|
|
# Process each postcode group at a time
|
|
for postcode, group in grouped:
|
|
print(f"Processing postcode: {postcode} ({len(group)} rows)")
|
|
valid_group = AddressMatch.is_valid_postcode(postcode)
|
|
if not valid_group:
|
|
logger.warning(f"Postcode {postcode} is invalid, skipping")
|
|
for idx in group.index:
|
|
df.at[idx, "ordnance_survey_address"] = (
|
|
"postcode not found in ordnance survey"
|
|
)
|
|
df.at[idx, "ordnance_survey_uprn"] = (
|
|
"postcode not found in ordnance survey"
|
|
)
|
|
df.at[idx, "ordnance_survey_lexiscore"] = (
|
|
"postcode not found in ordnance survey"
|
|
)
|
|
continue
|
|
|
|
postcode_cache = check_if_post_code_exists_in_db_cache(postcode)
|
|
if postcode_cache.empty:
|
|
logger.warning(f"No OS Places data for {postcode}")
|
|
for idx in group.index:
|
|
df.at[idx, "ordnance_survey_address"] = (
|
|
"postcode not found in ordnance survey"
|
|
)
|
|
df.at[idx, "ordnance_survey_uprn"] = (
|
|
"postcode not found in ordnance survey"
|
|
)
|
|
df.at[idx, "ordnance_survey_lexiscore"] = (
|
|
"postcode not found in ordnance survey"
|
|
)
|
|
continue
|
|
|
|
for idx, row in group.iterrows():
|
|
# Concatenate Address columns directly
|
|
ordnancy_survey_user_input = (
|
|
str(row.get("Address 1", "")).strip()
|
|
+ " "
|
|
+ str(row.get("Address 2", "")).strip()
|
|
+ " "
|
|
+ str(row.get("Address 3", "")).strip()
|
|
).strip()
|
|
|
|
if not ordnancy_survey_user_input:
|
|
continue
|
|
|
|
# Score against OS Places addresses
|
|
scores = postcode_cache["ADDRESS"].apply(
|
|
lambda addr: AddressMatch.score(ordnancy_survey_user_input, addr)
|
|
)
|
|
best_idx = scores.idxmax()
|
|
best_score = scores[best_idx]
|
|
|
|
df.at[idx, "ordnance_survey_address"] = postcode_cache.at[
|
|
best_idx, "ADDRESS"
|
|
]
|
|
df.at[idx, "ordnance_survey_uprn"] = postcode_cache.at[best_idx, "UPRN"]
|
|
df.at[idx, "ordnance_survey_lexiscore"] = best_score
|
|
|
|
# Save results locally
|
|
if local:
|
|
df.to_csv("ordnance_survey_results.csv", index=False)
|
|
print(f"Results saved to ordnance_survey_results.csv ({len(df)} rows)")
|
|
|
|
# Save results to S3
|
|
if task_id and sub_task_id:
|
|
save_results_to_s3(df, task_id, sub_task_id)
|