run this end to end

This commit is contained in:
Jun-te Kim 2026-02-11 16:37:55 +00:00
parent 655d7dbd6f
commit 9b414924d0
2 changed files with 309 additions and 22 deletions

View file

@ -9,6 +9,8 @@ import re
from typing import Set
import json
import requests
from uuid import UUID
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
logger = setup_logger()
@ -323,32 +325,41 @@ def get_uprn_candidates(
)
def get_uprn(
def get_uprn_with_epc_df(
user_inputed_address: str,
postcode: str,
epc_df: pd.DataFrame,
return_address=False,
return_EPC=False,
return_score=True,
):
"""
Return uprn (str)
Return False if failed to find a sensible matching epc
Return Nons when epc found but no UPRN
"""
df = get_epc_data_with_postcode(postcode=postcode)
Return uprn (str) using a pre-fetched EPC dataframe.
This avoids calling the API multiple times for the same postcode.
if df.empty:
Args:
user_inputed_address: The user's address string
epc_df: Pre-fetched EPC data for the postcode
return_address: Whether to return the matched address
return_EPC: Whether to return the EPC rating
return_score: Whether to return the lexiscore
Returns:
uprn (str), or tuple if return_address/return_EPC/return_score are True
Returns None if no match found, lexiscore < 0.7, or UPRN is empty
"""
if epc_df.empty:
return None
scored_df = get_uprn_candidates(
df,
epc_df,
user_address=user_inputed_address,
)
# Best score
best_score = scored_df.iloc[0]["lexiscore"]
if best_score <= 0:
# Return None if score is below threshold
if best_score < 0.7:
return None
# All rank-1 rows (possible draw)
@ -386,6 +397,32 @@ def get_uprn(
return found_uprn
def get_uprn(
user_inputed_address: str,
postcode: str,
return_address=False,
return_EPC=False,
return_score=True,
):
"""
Return uprn (str)
Return False if failed to find a sensible matching epc
Return None when epc found but no UPRN
This function fetches EPC data via API for a single postcode.
For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead.
"""
df = get_epc_data_with_postcode(postcode=postcode)
return get_uprn_with_epc_df(
user_inputed_address=user_inputed_address,
epc_df=df,
return_address=return_address,
return_EPC=return_EPC,
return_score=return_score,
)
def resolve_uprns_for_postcode_group(
group_df: pd.DataFrame,
epc_df: pd.DataFrame,
@ -508,20 +545,246 @@ def run_all_test():
)
def handler(event, context):
def handler(event, context, local=False):
print("=== Address2UPRN Lambda Handler ===")
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
# Handle local testing
if local is True:
event = {
"Records": [
{
"body": json.dumps({
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"rows": [
{
"landlord_property_id": "00000002POR",
"UPRN": "766019911",
"Address 1": "9 Redland Way",
"Address 2": "Aylesbury Vale",
"postcode": "HP21 9RJ",
"landlord_property_type": "House",
"postcode_clean": "HP219RJ"
},
{
"landlord_property_id": "00000003MTR",
"UPRN": "100120781544",
"Address 1": "16 Lime Crescent",
"Address 2": "BICESTER",
"postcode": "OX26 3XJ",
"landlord_property_type": "House",
"postcode_clean": "OX263XJ"
},
{
"landlord_property_id": "00000004HBY",
"UPRN": "14033542",
"Address 1": "14 Dunbar Drive",
"Address 2": "Woodley",
"postcode": "RG5 4HA",
"landlord_property_type": "House",
"postcode_clean": "RG54HA"
}
]
})
}
]
}
print(f"Event: {json.dumps(event, indent=2, default=str)}")
print(f"Context: {context}")
print("===================================")
return {"statusCode": 200, "body": "hello world"}
# Handle both single event and batch events (SQS, etc.)
records = event.get("Records", [event])
results = []
errors = []
subtask_interface = SubTaskInterface()
# TO do function dispatcher,
for record in records:
task_id = None
subtask_id = None
try:
# Parse body (inputs)
if isinstance(record.get("body"), str):
body = json.loads(record["body"])
else:
body = record.get("body", {})
# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate)
# fix that
# Look again at flat 1
# pandas reader the seperate postcode_splitter
# dump into s3
# Validate required fields
task_id = body.get("task_id")
rows = body.get("rows", [])
if not task_id:
errors.append({"error": "Missing required field: task_id"})
continue
if not rows:
errors.append({"error": "Missing or empty rows data"})
continue
# Convert task_id to UUID
try:
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
except ValueError as e:
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
continue
# Create a subtask for this batch
subtask_id = subtask_interface.create_subtask(
task_id=task_id, inputs={"row_count": len(rows)}
)
logger.info(f"Created subtask {subtask_id} for task {task_id} with {len(rows)} rows")
# Process the rows
logger.info(f"Processing {len(rows)} rows for task {task_id}")
# Convert rows to DataFrame
df = pd.DataFrame(rows)
# Create user_input column by concatenating Address 1 and Address 2
df["user_input"] = (df["Address 1"].fillna("") + " " + df["Address 2"].fillna("")).str.strip()
logger.info(f"Created user_input column from Address 1 and Address 2")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
}
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
# Process each postcode group
postcodes_processed = 0
addresses_processed = 0
uprns_found = 0
results_data = []
for postcode, postcode_rows in postcode_to_addresses.items():
logger.info(f"Processing postcode: {postcode} with {len(postcode_rows)} rows")
# Validate postcode before processing
if not is_valid_postcode(postcode):
logger.warning(f"Postcode {postcode} is invalid, skipping")
continue
# Fetch EPC data once per postcode
try:
epc_df = get_epc_data_with_postcode(postcode=postcode)
logger.info(f"Fetched {len(epc_df)} EPC records for postcode {postcode}")
except Exception as e:
logger.error(f"Failed to fetch EPC data for postcode {postcode}: {e}")
continue
# Process each address in this postcode with the same EPC data
for row in postcode_rows:
try:
user_input = row.get("user_input", "")
if not user_input:
logger.warning(f"Skipping row with missing user_input for postcode {postcode}")
continue
# Get UPRN using the pre-fetched EPC data with all return options
result = get_uprn_with_epc_df(
user_inputed_address=user_input,
epc_df=epc_df,
return_address=True,
return_EPC=True,
return_score=True
)
# Parse result tuple if successful
if result:
uprn, found_address, epc, score = result
uprns_found += 1
logger.info(f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})")
results_data.append({
**row, # Include all original data
"found_uprn": uprn,
"found_address": found_address,
"epc_rating": epc,
"lexiscore": score
})
else:
logger.warning(f"No UPRN found for {user_input} in {postcode}")
results_data.append({
**row, # Include all original data
"found_uprn": None,
"found_address": None,
"epc_rating": None,
"lexiscore": None
})
addresses_processed += 1
except Exception as e:
logger.error(f"Error processing address {row.get('user_input', 'unknown')}: {e}")
# Still add the row with error markers
results_data.append({
**row,
"found_uprn": None,
"found_address": None,
"epc_rating": None,
"score": None,
"error": str(e)
})
continue
postcodes_processed += 1
# Create results DataFrame
result_df = pd.DataFrame(results_data)
logger.info(f"Created results DataFrame with {len(result_df)} rows")
results.append({
"subtask_id": str(subtask_id),
"rows_processed": len(rows),
"postcodes_processed": postcodes_processed,
"addresses_processed": addresses_processed,
"uprns_found": uprns_found,
"status": "processed"
})
# Mark subtask as completed
try:
subtask_interface.update_subtask_status(
subtask_id, "completed", outputs={"rows_processed": len(rows)}
)
logger.info(f"Marked subtask {subtask_id} as completed")
except Exception as db_error:
logger.error(f"Failed to mark subtask as completed: {db_error}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in request body: {e}")
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
except Exception as e:
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
errors.append({"error": "Unexpected error", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
# Return error if all records failed
if errors and not results:
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
return {
"statusCode": 200,
"body": json.dumps(
{"processed": results, "errors": errors if errors else None}
),
}

View file

@ -28,12 +28,12 @@ from sqlalchemy import func
# PORTFOLIO_ID = 206
# SCENARIOS = [389]
PORTFOLIO_ID = 544
PORTFOLIO_ID = 476
SCENARIOS = [
1027,
953,
]
scenario_names = {
1027: "EPC C",
953: "All Properties, Most Economic",
}
project_name = "manchester"
@ -330,6 +330,30 @@ for scenario_id in SCENARIOS:
df[df["predicted_post_works_sap"] == ""]
# Expected columns list
expected_columns = [
"suspended_floor_insulation",
"solid_floor_insulation",
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
"secondary_glazing",
"double_glazing",
"solar_pv",
"high_heat_retention_storage_heaters",
"air_source_heat_pump",
"boiler_upgrade",
"roomstat_programmer_trvs",
"time_temperature_zone_control",
]
# Add missing columns with default values
for col in expected_columns:
if col not in df.columns:
df[col] = ""
# Create excel to store to
filename = f"{scenario_names[scenario_id]} - {project_name}.xlsx"
with pd.ExcelWriter(filename) as writer: