added implementation for associated uprn filtering

This commit is contained in:
Khalim Conn-Kowlessar 2025-11-10 22:01:21 +00:00
parent 4151b58dea
commit cce58e0152
4 changed files with 142 additions and 9 deletions

View file

@ -156,7 +156,8 @@ class SearchEpc:
size=None,
property_type=None,
fast=False,
heating_system: [str, None] = None
heating_system: [str, None] = None,
associated_uprns: [List[int] | None] = None
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@ -172,6 +173,11 @@ class SearchEpc:
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
default
:param property_type: str, optional, the property type of the property, if known before hand
:param fast: bool, optional, if true, the extract_epc_data method will skip some processing to return
results faster
:param heating_system: str, optional, the heating system of the property, if known before hand
:param associated_uprns: list of int, optional, list of associated uprns for the property. E.g. other
units in a block of flats
"""
self.address1 = address1
@ -180,6 +186,7 @@ class SearchEpc:
self.uprn = uprn
self.house_number = self.get_house_number(self.address1)
self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number)
self.associated_uprns = associated_uprns if associated_uprns is not None else []
# property attributes
self.heating_system = heating_system
@ -576,7 +583,8 @@ class SearchEpc:
built_form: str = "",
property_type: str = "",
exclude_old: bool = False,
heating_system: [str, None] = None
heating_system: [str, None] = None,
associated_uprns: [List[int] | None] = None
):
"""
Fetches and processes EPC data for a given initial postcode, applying successive trimming
@ -597,9 +605,12 @@ class SearchEpc:
:param property_type: The 'property-type' value to be used for filtering the EPC data.
:param exclude_old: Flag to exclude EPC data older than 10 years.
:param heating_system: Optional heating system type for additional filtering.
:param associated_uprns: Optional list of associated UPRNs for additional filtering.
:return:
"""
associated_uprns_to_apply = [] if associated_uprns is None else associated_uprns.copy()
property_type_api_map = {
"Bungalow": "bungalow",
"Flat": "flat",
@ -701,7 +712,16 @@ class SearchEpc:
has_missing_built_form = not estimation_built_form
if is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form:
# If we have associated UPRNS, we just filter as such, otherwise
# we filter with built form and property type
if any(str(x) in epc_data["uprn"].astype(str).values for x in associated_uprns_to_apply):
# We check at least one UPRN is in the data
epc_data = epc_data[epc_data["uprn"].isin(associated_uprns_to_apply)]
# After we run this, we empty associated_uprns_to_apply.
# That ensures we don't keep re-applying this filter if we shorten the postcode again
# since we'll keep ending up in the same results
associated_uprns_to_apply = []
elif is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form:
epc_data = epc_data[epc_data["property-type"] == estimation_property_type]
else:
epc_data = epc_data[
@ -723,7 +743,10 @@ class SearchEpc:
# If loop finishes without a valid response, raise an exception
raise Exception("Unable to find postcode data after trimming - investigate me")
def estimate_epc(self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None):
def estimate_epc(
self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None,
associated_uprns=None
):
"""
For a property that does not have an EPC, we retrieve the EPC data for the closest properties
and estimate the EPC for the property in question.
@ -739,6 +762,7 @@ class SearchEpc:
:param exclude_old: Used to drop any expired EPCs (more than 10 years old)
:param heating_system: The heating system of the property we are estimating, if known. Will aim to filter EPCs
to matching heating systems
:param associated_uprns: List of associated UPRNs for the property. E.g. other units in a block of flats
:return:
"""
@ -750,7 +774,8 @@ class SearchEpc:
built_form=built_form,
property_type=property_type,
exclude_old=exclude_old,
heating_system=heating_system
heating_system=heating_system,
associated_uprns=associated_uprns
)
# Check if it's a new build EPC. A property that doesn't have an EPC is not going to be a new build
@ -921,7 +946,8 @@ class SearchEpc:
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form,
heating_system=self.heating_system
heating_system=self.heating_system,
associated_uprns=self.associated_uprns
)
self.newest_epc = estimated_epc
self.older_epcs = []

View file

@ -0,0 +1,64 @@
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import func
from backend.app.db.models.addresses import PostcodeSearch
from utils.logger import setup_logger
logger = setup_logger()
def _get_associated_records(results, uprn, uprn_key="UPRN"):
matched_record = []
for x in results:
if "DPA" in x:
if x["DPA"].get(uprn_key) == str(uprn):
matched_record.append(x["DPA"])
else:
if x["LPI"].get(uprn_key) == str(uprn):
matched_record.append(x["LPI"])
return matched_record
def get_associated_uprns(session: Session, postcode: str, uprn: str):
"""
Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based
on parent UPRN. This will be properties in the same building
Parent UPRN is referenced in the following docs:
https://static.geoplace.co.uk/downloads/GeoPlace-Data-Entry-Conventions-Best-Practice-for-Addresses.pdf
:param session: The database session
:param postcode: The postcode string to search for
:param uprn: The UPRN string to match
:return: The matching PostcodeSearch record, or None if not found
"""
try:
record = (
session.query(PostcodeSearch)
.filter(func.upper(PostcodeSearch.postcode) == postcode)
.first()
)
matched_record = _get_associated_records(results=record.result_data["results"], uprn=uprn)
if len(matched_record) != 1:
logger.error("Something went wrong, about to return nothing")
return []
if not matched_record[0].get("PARENT_UPRN"):
logger.info("No parent UPRN found, cannot get associated records")
return []
associated_records = _get_associated_records(
results=record.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN"
)
# We now fetch all UPRNS with the same parent UPRN
associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)]
return associated_uprns
except SQLAlchemyError as e:
session.rollback()
raise e

View file

@ -0,0 +1,34 @@
from sqlalchemy import (
Column,
Integer,
String,
JSON,
TIMESTAMP,
func,
UniqueConstraint,
)
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class PostcodeSearch(Base):
__tablename__ = "postcode_search"
id = Column(Integer, primary_key=True, autoincrement=True)
# Normalized postcode (uppercase, no spaces)
postcode = Column(String, nullable=False, unique=True)
# Full OS Places API response (stored as JSONB)
result_data = Column(JSON, nullable=False)
# Timestamp for when the entry was first created
created_at = Column(TIMESTAMP(timezone=False), server_default=func.now(), nullable=False)
__table_args__ = (
UniqueConstraint("postcode", name="uq_postcode_search_postcode"),
)
def __repr__(self):
return f"<PostcodeSearch(id={self.id}, postcode='{self.postcode}')>"

View file

@ -1,9 +1,7 @@
import ast
import json
from copy import deepcopy
from datetime import datetime
from sqlalchemy import Nullable
from tqdm import tqdm
import pandas as pd
import numpy as np
@ -26,6 +24,8 @@ from backend.app.db.functions.recommendations_functions import (
)
from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.functions.address_functions import get_associated_uprns
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
from backend.app.plan.utils import (
@ -524,6 +524,14 @@ async def model_engine(body: PlanTriggerRequest):
full_address = config["domna_full_address"] if body.file_format == "domna_asset_list" else None
heating_system = parse_heating_system(config)
associated_uprns = []
if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat":
# We're running a remote assessment for a flat - we go and grab the associated
# UPRNS for other units in the same building
associated_uprns = get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
epc_searcher = SearchEpc(
address1=address1,
postcode=config["postcode"],
@ -531,7 +539,8 @@ async def model_engine(body: PlanTriggerRequest):
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key="",
full_address=full_address,
heating_system=heating_system
heating_system=heating_system,
associated_uprns=associated_uprns
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)