run this end to end

This commit is contained in:
Jun-te Kim 2026-02-11 16:44:08 +00:00
parent 9b414924d0
commit 762dccde01
2 changed files with 109 additions and 73 deletions

View file

@ -555,38 +555,40 @@ def handler(event, context, local=False):
event = {
"Records": [
{
"body": json.dumps({
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"rows": [
{
"landlord_property_id": "00000002POR",
"UPRN": "766019911",
"Address 1": "9 Redland Way",
"Address 2": "Aylesbury Vale",
"postcode": "HP21 9RJ",
"landlord_property_type": "House",
"postcode_clean": "HP219RJ"
},
{
"landlord_property_id": "00000003MTR",
"UPRN": "100120781544",
"Address 1": "16 Lime Crescent",
"Address 2": "BICESTER",
"postcode": "OX26 3XJ",
"landlord_property_type": "House",
"postcode_clean": "OX263XJ"
},
{
"landlord_property_id": "00000004HBY",
"UPRN": "14033542",
"Address 1": "14 Dunbar Drive",
"Address 2": "Woodley",
"postcode": "RG5 4HA",
"landlord_property_type": "House",
"postcode_clean": "RG54HA"
}
]
})
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"rows": [
{
"landlord_property_id": "00000002POR",
"UPRN": "766019911",
"Address 1": "9 Redland Way",
"Address 2": "Aylesbury Vale",
"postcode": "HP21 9RJ",
"landlord_property_type": "House",
"postcode_clean": "HP219RJ",
},
{
"landlord_property_id": "00000003MTR",
"UPRN": "100120781544",
"Address 1": "16 Lime Crescent",
"Address 2": "BICESTER",
"postcode": "OX26 3XJ",
"landlord_property_type": "House",
"postcode_clean": "OX263XJ",
},
{
"landlord_property_id": "00000004HBY",
"UPRN": "14033542",
"Address 1": "14 Dunbar Drive",
"Address 2": "Woodley",
"postcode": "RG5 4HA",
"landlord_property_type": "House",
"postcode_clean": "RG54HA",
},
],
}
)
}
]
}
@ -633,7 +635,9 @@ def handler(event, context, local=False):
subtask_id = subtask_interface.create_subtask(
task_id=task_id, inputs={"row_count": len(rows)}
)
logger.info(f"Created subtask {subtask_id} for task {task_id} with {len(rows)} rows")
logger.info(
f"Created subtask {subtask_id} for task {task_id} with {len(rows)} rows"
)
# Process the rows
logger.info(f"Processing {len(rows)} rows for task {task_id}")
@ -642,11 +646,13 @@ def handler(event, context, local=False):
df = pd.DataFrame(rows)
# Create user_input column by concatenating Address 1 and Address 2
df["user_input"] = (df["Address 1"].fillna("") + " " + df["Address 2"].fillna("")).str.strip()
df["user_input"] = (
df["Address 1"].fillna("") + " " + df["Address 2"].fillna("")
).str.strip()
logger.info(f"Created user_input column from Address 1 and Address 2")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
@ -661,7 +667,9 @@ def handler(event, context, local=False):
results_data = []
for postcode, postcode_rows in postcode_to_addresses.items():
logger.info(f"Processing postcode: {postcode} with {len(postcode_rows)} rows")
logger.info(
f"Processing postcode: {postcode} with {len(postcode_rows)} rows"
)
# Validate postcode before processing
if not is_valid_postcode(postcode):
@ -671,9 +679,13 @@ def handler(event, context, local=False):
# Fetch EPC data once per postcode
try:
epc_df = get_epc_data_with_postcode(postcode=postcode)
logger.info(f"Fetched {len(epc_df)} EPC records for postcode {postcode}")
logger.info(
f"Fetched {len(epc_df)} EPC records for postcode {postcode}"
)
except Exception as e:
logger.error(f"Failed to fetch EPC data for postcode {postcode}: {e}")
logger.error(
f"Failed to fetch EPC data for postcode {postcode}: {e}"
)
continue
# Process each address in this postcode with the same EPC data
@ -681,7 +693,9 @@ def handler(event, context, local=False):
try:
user_input = row.get("user_input", "")
if not user_input:
logger.warning(f"Skipping row with missing user_input for postcode {postcode}")
logger.warning(
f"Skipping row with missing user_input for postcode {postcode}"
)
continue
# Get UPRN using the pre-fetched EPC data with all return options
@ -690,45 +704,57 @@ def handler(event, context, local=False):
epc_df=epc_df,
return_address=True,
return_EPC=True,
return_score=True
return_score=True,
)
# Parse result tuple if successful
if result:
uprn, found_address, epc, score = result
uprns_found += 1
logger.info(f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})")
logger.info(
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
)
results_data.append({
**row, # Include all original data
"found_uprn": uprn,
"found_address": found_address,
"epc_rating": epc,
"lexiscore": score
})
results_data.append(
{
**row, # Include all original data
"found_uprn": uprn,
"found_address": found_address,
"epc_rating": epc,
"lexiscore": score,
}
)
else:
logger.warning(f"No UPRN found for {user_input} in {postcode}")
results_data.append({
**row, # Include all original data
"found_uprn": None,
"found_address": None,
"epc_rating": None,
"lexiscore": None
})
logger.warning(
f"No UPRN found for {user_input} in {postcode}"
)
results_data.append(
{
**row, # Include all original data
"found_uprn": None,
"found_address": None,
"epc_rating": None,
"lexiscore": None,
}
)
addresses_processed += 1
except Exception as e:
logger.error(f"Error processing address {row.get('user_input', 'unknown')}: {e}")
logger.error(
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
)
# Still add the row with error markers
results_data.append({
**row,
"found_uprn": None,
"found_address": None,
"epc_rating": None,
"score": None,
"error": str(e)
})
results_data.append(
{
**row,
"found_uprn": None,
"found_address": None,
"epc_rating": None,
"score": None,
"error": str(e),
}
)
continue
postcodes_processed += 1
@ -737,14 +763,16 @@ def handler(event, context, local=False):
result_df = pd.DataFrame(results_data)
logger.info(f"Created results DataFrame with {len(result_df)} rows")
results.append({
"subtask_id": str(subtask_id),
"rows_processed": len(rows),
"postcodes_processed": postcodes_processed,
"addresses_processed": addresses_processed,
"uprns_found": uprns_found,
"status": "processed"
})
results.append(
{
"subtask_id": str(subtask_id),
"rows_processed": len(rows),
"postcodes_processed": postcodes_processed,
"addresses_processed": addresses_processed,
"uprns_found": uprns_found,
"status": "processed",
}
)
# Mark subtask as completed
try:
@ -788,3 +816,8 @@ def handler(event, context, local=False):
{"processed": results, "errors": errors if errors else None}
),
}
# TODO:
# Don't add results to return messages as its too verbose
# capture the exepection as e, into s3, to find the logs go to s3

View file

@ -37,3 +37,6 @@ variable "tags" {
type = map(string)
default = {}
}