fixing new loading

This commit is contained in:
Khalim Conn-Kowlessar 2026-03-25 23:29:12 +00:00
parent e946b7254a
commit 2da419df71
3 changed files with 146 additions and 137 deletions

View file

@ -34,7 +34,9 @@ class Addresses:
for row in plan_input:
addresses.append(row_parser(row, body))
return cls(addresses)
addresses = cls(addresses)
addresses.validate_uprns()
return addresses
def get_uprns(self):
return [x.uprn for x in self._addresses if x.uprn is not None]
@ -53,6 +55,12 @@ class Addresses:
def get_property_requests(self):
return [x.request_data for x in self._addresses]
def validate_uprns(self):
"""Raise ValueError if any address has a non-int UPRN."""
for addr in self._addresses:
if addr.uprn is not None and not isinstance(addr.uprn, int):
raise ValueError(f"Address with non-integer UPRN detected: {addr.uprn} in {addr}")
@staticmethod
def parse_ara_row(row: dict, body) -> Address:
"""
@ -113,6 +121,8 @@ class Addresses:
return None
uprn = clean_uprn(row.get("uprn"))
if uprn is None:
raise ValueError(f"Invalid or missing UPRN in row: {row}")
address = row.get("address")
if not address and body.file_format == "domna_asset_list":
@ -128,12 +138,15 @@ class Addresses:
postcode = str(row["postcode"]).strip().upper()
address_1 = str(address).strip() if address else ""
full_address = str(full_address).strip() if full_address else ""
landlord_property_id = str(row["landlord_property_id"]) if row.get("landlord_property_id") else ""
return Address(
uprn=uprn,
landlord_property_id=str(row["landlord_property_id"])
if row.get("landlord_property_id") else None,
address_1=str(address).strip() if address else None,
full_address=str(full_address).strip() if full_address else None,
landlord_property_id=landlord_property_id,
address_1=address_1,
full_address=full_address,
postcode=postcode,
landlord_property_type=row.get("property_type"),
landlord_built_form=row.get("built_form"),

View file

@ -0,0 +1,109 @@
import logging
import numpy as np
import pandas as pd
from backend.addresses.Addresses import Addresses
from backend.app.config import get_settings
from utils.s3 import read_csv_from_s3, read_excel_from_s3
class PlanInputProcessor:
def __init__(self, body):
self.body = body
self.logger = logging.getLogger(__name__)
self.plan_input = None
self.valuation_data = []
self.index_start = getattr(body, 'index_start', None)
self.index_end = getattr(body, 'index_end', None)
def process(self):
if self.body.file_type == "xlsx":
self.logger.info("Getting the plan input")
self.plan_input = read_excel_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET,
file_key=self.body.trigger_file_path,
sheet_name=self.body.sheet_name,
header_row=0,
)
self.logger.info("Got the plan input from excel")
if self.body.file_format == "domna_asset_list":
self._process_domna_asset_list()
elif self.body.file_format == "ara_property_list":
self._process_ara_property_list()
else:
raise ValueError("Other formats not yet supported")
else:
self.logger.info("Getting the plan input from csv")
self.plan_input = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=self.body.trigger_file_path
)
self.logger.info("Got the plan input from csv")
# Slice if needed
if self.index_start is not None and self.index_end is not None:
self.plan_input = self.plan_input[self.index_start:self.index_end]
# Extract valuation data if present
self._extract_valuation_data()
return self.to_addresses()
def _extract_valuation_data(self):
# Only for domna_asset_list, extract domna_valuation if present
if self.body.file_format == "domna_asset_list" and self.plan_input:
first = self.plan_input[0]
if "domna_valuation" in first:
self.valuation_data = [
{"uprn": x.get("uprn"), "valuation": x.get("domna_valuation")}
for x in self.plan_input if x.get("domna_valuation") is not None
]
# Could add more formats here in future
def _process_domna_asset_list(self):
df = self.plan_input
df = df.rename(
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
)
if "estimated" not in df.columns:
df["estimated"] = False
df["uprn"] = np.where(
df["estimated"].isin([1, True]) & ((df["uprn"] < 0) | pd.isnull(df["uprn"])), None, df["uprn"]
)
df["property_type"] = df["landlord_property_type"].copy()
if "landlord_built_form" in df.columns:
df["built_form"] = df["landlord_built_form"].copy()
else:
df["built_form"] = None
if "epc_property_type" not in df.columns:
df["epc_property_type"] = None
df["property_type"] = np.where(
df["property_type"] == "unknown", df["epc_property_type"], df["property_type"]
)
if "epc_archetype" not in df.columns:
df["epc_archetype"] = None
df["built_form"] = np.where(
df["built_form"] == "unknown", df["epc_archetype"], df["built_form"]
)
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat",
}
built_form_map = {
"mid-terrace": "Mid-Terrace",
"end-terrace": "End-Terrace",
"semi-detached": "Semi-Detached",
"detached": "Detached",
"enclosed end-terrace": "Enclosed End-Terrace",
"enclosed mid-terrace": "Enclosed Mid-Terrace",
}
df["property_type"] = df["property_type"].map(property_type_map).fillna(df["property_type"])
df["built_form"] = df["built_form"].map(built_form_map).fillna(df["built_form"])
self.plan_input = df.to_dict("records")
def _process_ara_property_list(self):
df = self.plan_input
self.plan_input = df.to_dict("records")
def to_addresses(self):
return Addresses.from_plan_input(self.plan_input, self.body)

View file

@ -3,7 +3,6 @@ import json
from copy import deepcopy
from datetime import datetime
import pandas as pd
import numpy as np
from uuid import UUID
from tqdm import tqdm
@ -44,7 +43,9 @@ from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
from backend.app.plan.plan_input_processor import PlanInputProcessor
logger = setup_logger()
@ -296,7 +297,7 @@ def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
"old_data": epc_searcher.older_epcs.copy()
}, energy_assessment_is_newer
# In this case, our EPC is older than the newest publically avaible one, but is not contained in
# In this case, our EPC is older than the newest publically availe one, but is not contained in
# the historicals, so it can't have been lodged, so we include it in the old data
return {
'original_epc': newest_epc,
@ -478,132 +479,22 @@ async def model_engine(body: PlanTriggerRequest):
try:
logger.info("Getting the inputs")
if body.file_type == "xlsx":
logger.info("Getting the plan input")
plan_input = read_excel_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET,
file_key=body.trigger_file_path,
sheet_name=body.sheet_name,
header_row=0,
)
logger.info("Got the plan input from excel")
# We now handle the case where the input data is a Domna standardised assset list
if body.file_format == "domna_asset_list":
# We rename the columns to match the expected format
plan_input = plan_input.rename(
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
)
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN
# This will be reflexted
if "estimated" not in plan_input.columns:
plan_input["estimated"] = False
plan_input["uprn"] = np.where(
plan_input["estimated"].isin([1, True]) & (
(plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"])
), None, plan_input["uprn"]
)
# We handle the landlord property type and built form
plan_input["property_type"] = plan_input["landlord_property_type"].copy()
if "landlord_built_form" in plan_input.columns:
plan_input["built_form"] = plan_input["landlord_built_form"].copy()
else:
plan_input["built_form"] = None
if "epc_property_type" not in plan_input.columns:
plan_input["epc_property_type"] = None
plan_input["property_type"] = np.where(
plan_input["property_type"] == "unknown",
plan_input["epc_property_type"],
plan_input["property_type"]
)
if "epc_archetype" not in plan_input.columns:
plan_input["epc_archetype"] = None
plan_input["built_form"] = np.where(
plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"]
)
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat",
}
built_form_map = {
"mid-terrace": "Mid-Terrace",
"end-terrace": "End-Terrace",
"semi-detached": "Semi-Detached",
"detached": "Detached",
"enclosed end-terrace": "Enclosed End-Terrace",
"enclosed mid-terrace": "Enclosed Mid-Terrace",
}
# We remap the values to match the EPC expected formats
# This syntax will actually retain any original values, if they don't get mapped
plan_input["property_type"] = (
plan_input["property_type"]
.map(property_type_map)
.fillna(plan_input["property_type"])
)
plan_input["built_form"] = (
plan_input["built_form"]
.map(built_form_map)
.fillna(plan_input["built_form"])
)
plan_input = plan_input.to_dict("records")
else:
raise ValueError("Other formats not yet supported")
else:
logger.info("Getting the plan input from csv")
plan_input = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path
)
logger.info("Got the plan input from csv")
# TODO: New onboarding process
if body.file_format == "ara_property_list":
plan_input = read_excel_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key=body.trigger_file_path, sheet_name=body.sheet_name,
header_row=0
)
plan_input = plan_input.to_dict('records')
# We then slide it on the indexes if they are provided
if body.index_start is not None and body.index_end is not None:
plan_input = plan_input[body.index_start:body.index_end]
# Use PlanInputProcessor for all plan input processing
plan_input_processor = PlanInputProcessor(body)
addresses = plan_input_processor.process()
valuation_data = plan_input_processor.valuation_data
# Confirm no duplicate UPRNS
check_duplicate_uprns(plan_input)
check_duplicate_uprns([a.uprn for a in addresses])
# If we have patches or overrides, we should read them in here
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
if body.file_type == "xlsx" and body.file_format == "domna_asset_list":
# We check if we have valution data
if not valuation_data and body.valuation_file_path in [None, ""]:
# We check plan_input
if "domna_valuation" in plan_input[0]:
valuation_data = [{"uprn": x["uprn"], "valuation": x["domna_valuation"]} for x in plan_input]
patches, already_installed, non_invasive_recommendations, _ = get_request_property_data(body)
logger.info("Getting cleaning_data")
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
# Prepare input data
addresses = Addresses.from_plan_input(plan_input, body)
logger.info("Checking database for existing properties")
uprns = addresses.get_uprns()
@ -662,8 +553,8 @@ async def model_engine(body: PlanTriggerRequest):
logger.info("Processing each property for model input preparation")
input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, []
for addr, config in tqdm(
zip(addresses, plan_input),
for addr in tqdm(
addresses,
total=len(addresses),
desc="Processing properties",
):
@ -684,7 +575,7 @@ async def model_engine(body: PlanTriggerRequest):
property_already_installed = list(already_installed_by_uprn[addr.uprn])
epc_searcher = SearchEpc(
address1=addr.address_1,
address1=addr.address1,
postcode=addr.postcode,
uprn=addr.uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
@ -693,16 +584,15 @@ async def model_engine(body: PlanTriggerRequest):
heating_system=addr.landlord_heating_system,
associated_uprns=associated_uprns
)
epc_searcher.ordnance_survey_client.built_form = addr.landlord_built_form
epc_searcher.ordnance_survey_client.property_type = addr.landlord_property_type
epc_searcher.ordnance_survey_client.built_form = addr.built_form
epc_searcher.ordnance_survey_client.property_type = addr.property_type
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True)
epc_searcher.set_uprn_source(file_format=body.file_format)
lookup_key = (
("uprn", addr.uprn) if addr.uprn is not None
else ("landlord_property_id", addr.landlord_property_id)
("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id)
)
property_id = property_lookup[lookup_key]
@ -744,16 +634,14 @@ async def model_engine(body: PlanTriggerRequest):
epc_page=epc_page,
rrn=rrn,
cleaned_address=epc_searcher.address_clean,
config_address=addr.address_1,
config_address=addr.address,
address_postal_town=epc_searcher.address_postal_town
)
)
epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, address_metadata=addr
)
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data)
input_properties.append(
Property(
@ -763,13 +651,12 @@ async def model_engine(body: PlanTriggerRequest):
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed,
already_installed=property_already_installed + eco_packages.get(property_id)[3],
find_my_epc_components=find_my_epc_components,
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
inspections=inspections_map.get(property_id),
**Property.extract_kwargs(config), # TODO: Depraecate this
)
)