diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 777dde0e..0f735f2a 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -9,6 +9,8 @@ import re from typing import Set import json import requests +from uuid import UUID +from backend.app.db.functions.tasks.Tasks import SubTaskInterface logger = setup_logger() @@ -323,32 +325,41 @@ def get_uprn_candidates( ) -def get_uprn( +def get_uprn_with_epc_df( user_inputed_address: str, - postcode: str, + epc_df: pd.DataFrame, return_address=False, return_EPC=False, return_score=True, ): """ - Return uprn (str) - Return False if failed to find a sensible matching epc - Return Nons when epc found but no UPRN - """ - df = get_epc_data_with_postcode(postcode=postcode) + Return uprn (str) using a pre-fetched EPC dataframe. + This avoids calling the API multiple times for the same postcode. - if df.empty: + Args: + user_inputed_address: The user's address string + epc_df: Pre-fetched EPC data for the postcode + return_address: Whether to return the matched address + return_EPC: Whether to return the EPC rating + return_score: Whether to return the lexiscore + + Returns: + uprn (str), or tuple if return_address/return_EPC/return_score are True + Returns None if no match found, lexiscore < 0.7, or UPRN is empty + """ + if epc_df.empty: return None scored_df = get_uprn_candidates( - df, + epc_df, user_address=user_inputed_address, ) # Best score best_score = scored_df.iloc[0]["lexiscore"] - if best_score <= 0: + # Return None if score is below threshold + if best_score < 0.7: return None # All rank-1 rows (possible draw) @@ -386,6 +397,32 @@ def get_uprn( return found_uprn +def get_uprn( + user_inputed_address: str, + postcode: str, + return_address=False, + return_EPC=False, + return_score=True, +): + """ + Return uprn (str) + Return False if failed to find a sensible matching epc + Return None when epc found but no UPRN + + This function fetches EPC data via API for a single postcode. + For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead. + """ + df = get_epc_data_with_postcode(postcode=postcode) + + return get_uprn_with_epc_df( + user_inputed_address=user_inputed_address, + epc_df=df, + return_address=return_address, + return_EPC=return_EPC, + return_score=return_score, + ) + + def resolve_uprns_for_postcode_group( group_df: pd.DataFrame, epc_df: pd.DataFrame, @@ -508,20 +545,246 @@ def run_all_test(): ) -def handler(event, context): +def handler(event, context, local=False): print("=== Address2UPRN Lambda Handler ===") print(f"Function: {context.function_name}") print(f"Request ID: {context.aws_request_id}") + + # Handle local testing + if local is True: + event = { + "Records": [ + { + "body": json.dumps({ + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "rows": [ + { + "landlord_property_id": "00000002POR", + "UPRN": "766019911", + "Address 1": "9 Redland Way", + "Address 2": "Aylesbury Vale", + "postcode": "HP21 9RJ", + "landlord_property_type": "House", + "postcode_clean": "HP219RJ" + }, + { + "landlord_property_id": "00000003MTR", + "UPRN": "100120781544", + "Address 1": "16 Lime Crescent", + "Address 2": "BICESTER", + "postcode": "OX26 3XJ", + "landlord_property_type": "House", + "postcode_clean": "OX263XJ" + }, + { + "landlord_property_id": "00000004HBY", + "UPRN": "14033542", + "Address 1": "14 Dunbar Drive", + "Address 2": "Woodley", + "postcode": "RG5 4HA", + "landlord_property_type": "House", + "postcode_clean": "RG54HA" + } + ] + }) + } + ] + } + print(f"Event: {json.dumps(event, indent=2, default=str)}") - print(f"Context: {context}") print("===================================") - return {"statusCode": 200, "body": "hello world"} + # Handle both single event and batch events (SQS, etc.) + records = event.get("Records", [event]) + results = [] + errors = [] + subtask_interface = SubTaskInterface() -# TO do function dispatcher, + for record in records: + task_id = None + subtask_id = None + try: + # Parse body (inputs) + if isinstance(record.get("body"), str): + body = json.loads(record["body"]) + else: + body = record.get("body", {}) -# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate) -# fix that -# Look again at flat 1 -# pandas reader the seperate postcode_splitter -# dump into s3 + # Validate required fields + task_id = body.get("task_id") + rows = body.get("rows", []) + + if not task_id: + errors.append({"error": "Missing required field: task_id"}) + continue + + if not rows: + errors.append({"error": "Missing or empty rows data"}) + continue + + # Convert task_id to UUID + try: + task_id = UUID(task_id) if isinstance(task_id, str) else task_id + except ValueError as e: + errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) + continue + + # Create a subtask for this batch + subtask_id = subtask_interface.create_subtask( + task_id=task_id, inputs={"row_count": len(rows)} + ) + logger.info(f"Created subtask {subtask_id} for task {task_id} with {len(rows)} rows") + + # Process the rows + logger.info(f"Processing {len(rows)} rows for task {task_id}") + + # Convert rows to DataFrame + df = pd.DataFrame(rows) + + # Create user_input column by concatenating Address 1 and Address 2 + df["user_input"] = (df["Address 1"].fillna("") + " " + df["Address 2"].fillna("")).str.strip() + logger.info(f"Created user_input column from Address 1 and Address 2") + + clean_df = df.dropna(subset=["postcode_clean"]) + + postcode_to_addresses = { + postcode: group.to_dict(orient="records") + for postcode, group in clean_df.groupby("postcode_clean", sort=False) + } + + logger.info(f"Total postcodes: {len(postcode_to_addresses)}") + + # Process each postcode group + postcodes_processed = 0 + addresses_processed = 0 + uprns_found = 0 + results_data = [] + + for postcode, postcode_rows in postcode_to_addresses.items(): + logger.info(f"Processing postcode: {postcode} with {len(postcode_rows)} rows") + + # Validate postcode before processing + if not is_valid_postcode(postcode): + logger.warning(f"Postcode {postcode} is invalid, skipping") + continue + + # Fetch EPC data once per postcode + try: + epc_df = get_epc_data_with_postcode(postcode=postcode) + logger.info(f"Fetched {len(epc_df)} EPC records for postcode {postcode}") + except Exception as e: + logger.error(f"Failed to fetch EPC data for postcode {postcode}: {e}") + continue + + # Process each address in this postcode with the same EPC data + for row in postcode_rows: + try: + user_input = row.get("user_input", "") + if not user_input: + logger.warning(f"Skipping row with missing user_input for postcode {postcode}") + continue + + # Get UPRN using the pre-fetched EPC data with all return options + result = get_uprn_with_epc_df( + user_inputed_address=user_input, + epc_df=epc_df, + return_address=True, + return_EPC=True, + return_score=True + ) + + # Parse result tuple if successful + if result: + uprn, found_address, epc, score = result + uprns_found += 1 + logger.info(f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})") + + results_data.append({ + **row, # Include all original data + "found_uprn": uprn, + "found_address": found_address, + "epc_rating": epc, + "lexiscore": score + }) + else: + logger.warning(f"No UPRN found for {user_input} in {postcode}") + results_data.append({ + **row, # Include all original data + "found_uprn": None, + "found_address": None, + "epc_rating": None, + "lexiscore": None + }) + + addresses_processed += 1 + + except Exception as e: + logger.error(f"Error processing address {row.get('user_input', 'unknown')}: {e}") + # Still add the row with error markers + results_data.append({ + **row, + "found_uprn": None, + "found_address": None, + "epc_rating": None, + "score": None, + "error": str(e) + }) + continue + + postcodes_processed += 1 + + # Create results DataFrame + result_df = pd.DataFrame(results_data) + logger.info(f"Created results DataFrame with {len(result_df)} rows") + + results.append({ + "subtask_id": str(subtask_id), + "rows_processed": len(rows), + "postcodes_processed": postcodes_processed, + "addresses_processed": addresses_processed, + "uprns_found": uprns_found, + "status": "processed" + }) + + # Mark subtask as completed + try: + subtask_interface.update_subtask_status( + subtask_id, "completed", outputs={"rows_processed": len(rows)} + ) + logger.info(f"Marked subtask {subtask_id} as completed") + except Exception as db_error: + logger.error(f"Failed to mark subtask as completed: {db_error}") + + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in request body: {e}") + errors.append({"error": "Invalid JSON in request body", "details": str(e)}) + # Mark subtask as failed if we have one + if subtask_id: + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(e)} + ) + except Exception as db_error: + logger.error(f"Failed to update subtask status: {db_error}") + except Exception as e: + logger.error(f"Unexpected error processing record: {e}", exc_info=True) + errors.append({"error": "Unexpected error", "details": str(e)}) + # Mark subtask as failed if we have one + if subtask_id: + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(e)} + ) + except Exception as db_error: + logger.error(f"Failed to update subtask status: {db_error}") + + # Return error if all records failed + if errors and not results: + return {"statusCode": 500, "body": json.dumps({"errors": errors})} + + return { + "statusCode": 200, + "body": json.dumps( + {"processed": results, "errors": errors if errors else None} + ), + } diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index 1841cf3f..9470710d 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -28,12 +28,12 @@ from sqlalchemy import func # PORTFOLIO_ID = 206 # SCENARIOS = [389] -PORTFOLIO_ID = 544 +PORTFOLIO_ID = 476 SCENARIOS = [ - 1027, + 953, ] scenario_names = { - 1027: "EPC C", + 953: "All Properties, Most Economic", } project_name = "manchester" @@ -330,6 +330,30 @@ for scenario_id in SCENARIOS: df[df["predicted_post_works_sap"] == ""] + # Expected columns list + expected_columns = [ + "suspended_floor_insulation", + "solid_floor_insulation", + "external_wall_insulation", + "internal_wall_insulation", + "cavity_wall_insulation", + "loft_insulation", + "flat_roof_insulation", + "room_roof_insulation", + "secondary_glazing", + "double_glazing", + "solar_pv", + "high_heat_retention_storage_heaters", + "air_source_heat_pump", + "boiler_upgrade", + "roomstat_programmer_trvs", + "time_temperature_zone_control", + ] + # Add missing columns with default values + for col in expected_columns: + if col not in df.columns: + df[col] = "" + # Create excel to store to filename = f"{scenario_names[scenario_id]} - {project_name}.xlsx" with pd.ExcelWriter(filename) as writer: