From 2da419df713e6e393c955856f55af0ace857a2ad Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 25 Mar 2026 23:29:12 +0000 Subject: [PATCH] fixing new loading --- backend/addresses/Addresses.py | 23 +++- backend/app/plan/plan_input_processor.py | 109 ++++++++++++++++ backend/engine/engine.py | 151 +++-------------------- 3 files changed, 146 insertions(+), 137 deletions(-) create mode 100644 backend/app/plan/plan_input_processor.py diff --git a/backend/addresses/Addresses.py b/backend/addresses/Addresses.py index 41f47d28..510de698 100644 --- a/backend/addresses/Addresses.py +++ b/backend/addresses/Addresses.py @@ -34,7 +34,9 @@ class Addresses: for row in plan_input: addresses.append(row_parser(row, body)) - return cls(addresses) + addresses = cls(addresses) + addresses.validate_uprns() + return addresses def get_uprns(self): return [x.uprn for x in self._addresses if x.uprn is not None] @@ -53,6 +55,12 @@ class Addresses: def get_property_requests(self): return [x.request_data for x in self._addresses] + def validate_uprns(self): + """Raise ValueError if any address has a non-int UPRN.""" + for addr in self._addresses: + if addr.uprn is not None and not isinstance(addr.uprn, int): + raise ValueError(f"Address with non-integer UPRN detected: {addr.uprn} in {addr}") + @staticmethod def parse_ara_row(row: dict, body) -> Address: """ @@ -113,6 +121,8 @@ class Addresses: return None uprn = clean_uprn(row.get("uprn")) + if uprn is None: + raise ValueError(f"Invalid or missing UPRN in row: {row}") address = row.get("address") if not address and body.file_format == "domna_asset_list": @@ -128,12 +138,15 @@ class Addresses: postcode = str(row["postcode"]).strip().upper() + address_1 = str(address).strip() if address else "" + full_address = str(full_address).strip() if full_address else "" + landlord_property_id = str(row["landlord_property_id"]) if row.get("landlord_property_id") else "" + return Address( uprn=uprn, - landlord_property_id=str(row["landlord_property_id"]) - if row.get("landlord_property_id") else None, - address_1=str(address).strip() if address else None, - full_address=str(full_address).strip() if full_address else None, + landlord_property_id=landlord_property_id, + address_1=address_1, + full_address=full_address, postcode=postcode, landlord_property_type=row.get("property_type"), landlord_built_form=row.get("built_form"), diff --git a/backend/app/plan/plan_input_processor.py b/backend/app/plan/plan_input_processor.py new file mode 100644 index 00000000..72695868 --- /dev/null +++ b/backend/app/plan/plan_input_processor.py @@ -0,0 +1,109 @@ +import logging +import numpy as np +import pandas as pd +from backend.addresses.Addresses import Addresses +from backend.app.config import get_settings +from utils.s3 import read_csv_from_s3, read_excel_from_s3 + + +class PlanInputProcessor: + def __init__(self, body): + self.body = body + self.logger = logging.getLogger(__name__) + self.plan_input = None + self.valuation_data = [] + self.index_start = getattr(body, 'index_start', None) + self.index_end = getattr(body, 'index_end', None) + + def process(self): + if self.body.file_type == "xlsx": + self.logger.info("Getting the plan input") + self.plan_input = read_excel_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, + file_key=self.body.trigger_file_path, + sheet_name=self.body.sheet_name, + header_row=0, + ) + self.logger.info("Got the plan input from excel") + if self.body.file_format == "domna_asset_list": + self._process_domna_asset_list() + elif self.body.file_format == "ara_property_list": + self._process_ara_property_list() + else: + raise ValueError("Other formats not yet supported") + else: + self.logger.info("Getting the plan input from csv") + self.plan_input = read_csv_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=self.body.trigger_file_path + ) + self.logger.info("Got the plan input from csv") + # Slice if needed + if self.index_start is not None and self.index_end is not None: + self.plan_input = self.plan_input[self.index_start:self.index_end] + # Extract valuation data if present + self._extract_valuation_data() + return self.to_addresses() + + def _extract_valuation_data(self): + # Only for domna_asset_list, extract domna_valuation if present + if self.body.file_format == "domna_asset_list" and self.plan_input: + first = self.plan_input[0] + if "domna_valuation" in first: + self.valuation_data = [ + {"uprn": x.get("uprn"), "valuation": x.get("domna_valuation")} + for x in self.plan_input if x.get("domna_valuation") is not None + ] + # Could add more formats here in future + + def _process_domna_asset_list(self): + df = self.plan_input + df = df.rename( + columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"} + ) + if "estimated" not in df.columns: + df["estimated"] = False + df["uprn"] = np.where( + df["estimated"].isin([1, True]) & ((df["uprn"] < 0) | pd.isnull(df["uprn"])), None, df["uprn"] + ) + df["property_type"] = df["landlord_property_type"].copy() + if "landlord_built_form" in df.columns: + df["built_form"] = df["landlord_built_form"].copy() + else: + df["built_form"] = None + if "epc_property_type" not in df.columns: + df["epc_property_type"] = None + df["property_type"] = np.where( + df["property_type"] == "unknown", df["epc_property_type"], df["property_type"] + ) + if "epc_archetype" not in df.columns: + df["epc_archetype"] = None + df["built_form"] = np.where( + df["built_form"] == "unknown", df["epc_archetype"], df["built_form"] + ) + property_type_map = { + "house": "House", + "flat": "Flat", + "maisonette": "Maisonette", + "bungalow": "Bungalow", + "block house": "House", + "coach house": "House", + "bedsit": "Flat", + } + built_form_map = { + "mid-terrace": "Mid-Terrace", + "end-terrace": "End-Terrace", + "semi-detached": "Semi-Detached", + "detached": "Detached", + "enclosed end-terrace": "Enclosed End-Terrace", + "enclosed mid-terrace": "Enclosed Mid-Terrace", + } + df["property_type"] = df["property_type"].map(property_type_map).fillna(df["property_type"]) + df["built_form"] = df["built_form"].map(built_form_map).fillna(df["built_form"]) + self.plan_input = df.to_dict("records") + + def _process_ara_property_list(self): + df = self.plan_input + self.plan_input = df.to_dict("records") + + def to_addresses(self): + return Addresses.from_plan_input(self.plan_input, self.body) diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 043e77b7..9bcb2ccd 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -3,7 +3,6 @@ import json from copy import deepcopy from datetime import datetime import pandas as pd -import numpy as np from uuid import UUID from tqdm import tqdm @@ -44,7 +43,9 @@ from etl.spatial.OpenUprnClient import OpenUprnClient from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc from utils.logger import setup_logger -from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 +from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 + +from backend.app.plan.plan_input_processor import PlanInputProcessor logger = setup_logger() @@ -296,7 +297,7 @@ def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict): "old_data": epc_searcher.older_epcs.copy() }, energy_assessment_is_newer - # In this case, our EPC is older than the newest publically avaible one, but is not contained in + # In this case, our EPC is older than the newest publically availe one, but is not contained in # the historicals, so it can't have been lodged, so we include it in the old data return { 'original_epc': newest_epc, @@ -478,132 +479,22 @@ async def model_engine(body: PlanTriggerRequest): try: logger.info("Getting the inputs") - - if body.file_type == "xlsx": - logger.info("Getting the plan input") - plan_input = read_excel_from_s3( - bucket_name=get_settings().PLAN_TRIGGER_BUCKET, - file_key=body.trigger_file_path, - sheet_name=body.sheet_name, - header_row=0, - ) - logger.info("Got the plan input from excel") - - # We now handle the case where the input data is a Domna standardised assset list - if body.file_format == "domna_asset_list": - # We rename the columns to match the expected format - plan_input = plan_input.rename( - columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"} - ) - # Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN - # This will be reflexted - if "estimated" not in plan_input.columns: - plan_input["estimated"] = False - - plan_input["uprn"] = np.where( - plan_input["estimated"].isin([1, True]) & ( - (plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"]) - ), None, plan_input["uprn"] - ) - # We handle the landlord property type and built form - plan_input["property_type"] = plan_input["landlord_property_type"].copy() - if "landlord_built_form" in plan_input.columns: - plan_input["built_form"] = plan_input["landlord_built_form"].copy() - else: - plan_input["built_form"] = None - - if "epc_property_type" not in plan_input.columns: - plan_input["epc_property_type"] = None - - plan_input["property_type"] = np.where( - plan_input["property_type"] == "unknown", - plan_input["epc_property_type"], - plan_input["property_type"] - ) - - if "epc_archetype" not in plan_input.columns: - plan_input["epc_archetype"] = None - - plan_input["built_form"] = np.where( - plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"] - ) - property_type_map = { - "house": "House", - "flat": "Flat", - "maisonette": "Maisonette", - "bungalow": "Bungalow", - "block house": "House", - "coach house": "House", - "bedsit": "Flat", - } - - built_form_map = { - "mid-terrace": "Mid-Terrace", - "end-terrace": "End-Terrace", - "semi-detached": "Semi-Detached", - "detached": "Detached", - "enclosed end-terrace": "Enclosed End-Terrace", - "enclosed mid-terrace": "Enclosed Mid-Terrace", - } - # We remap the values to match the EPC expected formats - - # This syntax will actually retain any original values, if they don't get mapped - plan_input["property_type"] = ( - plan_input["property_type"] - .map(property_type_map) - .fillna(plan_input["property_type"]) - ) - - plan_input["built_form"] = ( - plan_input["built_form"] - .map(built_form_map) - .fillna(plan_input["built_form"]) - ) - - plan_input = plan_input.to_dict("records") - - else: - raise ValueError("Other formats not yet supported") - - else: - logger.info("Getting the plan input from csv") - plan_input = read_csv_from_s3( - bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path - ) - logger.info("Got the plan input from csv") - - # TODO: New onboarding process - if body.file_format == "ara_property_list": - plan_input = read_excel_from_s3( - bucket_name=get_settings().DATA_BUCKET, file_key=body.trigger_file_path, sheet_name=body.sheet_name, - header_row=0 - ) - plan_input = plan_input.to_dict('records') - - # We then slide it on the indexes if they are provided - if body.index_start is not None and body.index_end is not None: - plan_input = plan_input[body.index_start:body.index_end] + # Use PlanInputProcessor for all plan input processing + plan_input_processor = PlanInputProcessor(body) + addresses = plan_input_processor.process() + valuation_data = plan_input_processor.valuation_data # Confirm no duplicate UPRNS - check_duplicate_uprns(plan_input) + check_duplicate_uprns([a.uprn for a in addresses]) # If we have patches or overrides, we should read them in here - patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body) - - if body.file_type == "xlsx" and body.file_format == "domna_asset_list": - # We check if we have valution data - if not valuation_data and body.valuation_file_path in [None, ""]: - # We check plan_input - if "domna_valuation" in plan_input[0]: - valuation_data = [{"uprn": x["uprn"], "valuation": x["domna_valuation"]} for x in plan_input] + patches, already_installed, non_invasive_recommendations, _ = get_request_property_data(body) logger.info("Getting cleaning_data") cleaning_data = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) - # Prepare input data - addresses = Addresses.from_plan_input(plan_input, body) logger.info("Checking database for existing properties") uprns = addresses.get_uprns() @@ -662,8 +553,8 @@ async def model_engine(body: PlanTriggerRequest): logger.info("Processing each property for model input preparation") input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, [] - for addr, config in tqdm( - zip(addresses, plan_input), + for addr in tqdm( + addresses, total=len(addresses), desc="Processing properties", ): @@ -684,7 +575,7 @@ async def model_engine(body: PlanTriggerRequest): property_already_installed = list(already_installed_by_uprn[addr.uprn]) epc_searcher = SearchEpc( - address1=addr.address_1, + address1=addr.address1, postcode=addr.postcode, uprn=addr.uprn, auth_token=get_settings().EPC_AUTH_TOKEN, @@ -693,16 +584,15 @@ async def model_engine(body: PlanTriggerRequest): heating_system=addr.landlord_heating_system, associated_uprns=associated_uprns ) - epc_searcher.ordnance_survey_client.built_form = addr.landlord_built_form - epc_searcher.ordnance_survey_client.property_type = addr.landlord_property_type + epc_searcher.ordnance_survey_client.built_form = addr.built_form + epc_searcher.ordnance_survey_client.property_type = addr.property_type # For the moment, our OS API access is unavailable, so we skip and interpolate epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True) epc_searcher.set_uprn_source(file_format=body.file_format) lookup_key = ( - ("uprn", addr.uprn) if addr.uprn is not None - else ("landlord_property_id", addr.landlord_property_id) + ("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id) ) property_id = property_lookup[lookup_key] @@ -744,16 +634,14 @@ async def model_engine(body: PlanTriggerRequest): epc_page=epc_page, rrn=rrn, cleaned_address=epc_searcher.address_clean, - config_address=addr.address_1, + config_address=addr.address, address_postal_town=epc_searcher.address_postal_town ) ) epc_records = patch_epc(patch, epc_records) - prepared_epc = EPCRecord( - epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, address_metadata=addr - ) + prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) input_properties.append( Property( @@ -763,13 +651,12 @@ async def model_engine(body: PlanTriggerRequest): address=epc_searcher.address_clean, postcode=epc_searcher.postcode_clean, epc_record=prepared_epc, - already_installed=property_already_installed, + already_installed=property_already_installed + eco_packages.get(property_id)[3], find_my_epc_components=find_my_epc_components, property_valuation=req_data.valuation, non_invasive_recommendations=property_non_invasive_recommendations, energy_assessment=energy_assessment, inspections=inspections_map.get(property_id), - **Property.extract_kwargs(config), # TODO: Depraecate this ) )