diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index 82899a81..eb2b0b23 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -156,7 +156,8 @@ class SearchEpc: size=None, property_type=None, fast=False, - heating_system: [str, None] = None + heating_system: [str, None] = None, + associated_uprns: [List[int] | None] = None ): """ Address lines 1 and postcode are mandatory fields. The other address lines are optional @@ -172,6 +173,11 @@ class SearchEpc: :param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's default :param property_type: str, optional, the property type of the property, if known before hand + :param fast: bool, optional, if true, the extract_epc_data method will skip some processing to return + results faster + :param heating_system: str, optional, the heating system of the property, if known before hand + :param associated_uprns: list of int, optional, list of associated uprns for the property. E.g. other + units in a block of flats """ self.address1 = address1 @@ -180,6 +186,7 @@ class SearchEpc: self.uprn = uprn self.house_number = self.get_house_number(self.address1) self.numeric_house_number = self.extract_numeric_housenumber_part(self.house_number) + self.associated_uprns = associated_uprns if associated_uprns is not None else [] # property attributes self.heating_system = heating_system @@ -576,7 +583,8 @@ class SearchEpc: built_form: str = "", property_type: str = "", exclude_old: bool = False, - heating_system: [str, None] = None + heating_system: [str, None] = None, + associated_uprns: [List[int] | None] = None ): """ Fetches and processes EPC data for a given initial postcode, applying successive trimming @@ -597,9 +605,12 @@ class SearchEpc: :param property_type: The 'property-type' value to be used for filtering the EPC data. :param exclude_old: Flag to exclude EPC data older than 10 years. :param heating_system: Optional heating system type for additional filtering. + :param associated_uprns: Optional list of associated UPRNs for additional filtering. :return: """ + associated_uprns_to_apply = [] if associated_uprns is None else associated_uprns.copy() + property_type_api_map = { "Bungalow": "bungalow", "Flat": "flat", @@ -701,7 +712,16 @@ class SearchEpc: has_missing_built_form = not estimation_built_form - if is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form: + # If we have associated UPRNS, we just filter as such, otherwise + # we filter with built form and property type + if any(str(x) in epc_data["uprn"].astype(str).values for x in associated_uprns_to_apply): + # We check at least one UPRN is in the data + epc_data = epc_data[epc_data["uprn"].isin(associated_uprns_to_apply)] + # After we run this, we empty associated_uprns_to_apply. + # That ensures we don't keep re-applying this filter if we shorten the postcode again + # since we'll keep ending up in the same results + associated_uprns_to_apply = [] + elif is_maisonette_with_bad_built_form or is_park_home_without_built_form or has_missing_built_form: epc_data = epc_data[epc_data["property-type"] == estimation_property_type] else: epc_data = epc_data[ @@ -723,7 +743,10 @@ class SearchEpc: # If loop finishes without a valid response, raise an exception raise Exception("Unable to find postcode data after trimming - investigate me") - def estimate_epc(self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None): + def estimate_epc( + self, property_type, built_form, lmks_to_drop=None, exclude_old=False, heating_system=None, + associated_uprns=None + ): """ For a property that does not have an EPC, we retrieve the EPC data for the closest properties and estimate the EPC for the property in question. @@ -739,6 +762,7 @@ class SearchEpc: :param exclude_old: Used to drop any expired EPCs (more than 10 years old) :param heating_system: The heating system of the property we are estimating, if known. Will aim to filter EPCs to matching heating systems + :param associated_uprns: List of associated UPRNs for the property. E.g. other units in a block of flats :return: """ @@ -750,7 +774,8 @@ class SearchEpc: built_form=built_form, property_type=property_type, exclude_old=exclude_old, - heating_system=heating_system + heating_system=heating_system, + associated_uprns=associated_uprns ) # Check if it's a new build EPC. A property that doesn't have an EPC is not going to be a new build @@ -921,7 +946,8 @@ class SearchEpc: estimated_epc = self.estimate_epc( property_type=self.ordnance_survey_client.property_type, built_form=self.ordnance_survey_client.built_form, - heating_system=self.heating_system + heating_system=self.heating_system, + associated_uprns=self.associated_uprns ) self.newest_epc = estimated_epc self.older_epcs = [] diff --git a/backend/app/db/functions/address_functions.py b/backend/app/db/functions/address_functions.py new file mode 100644 index 00000000..34dc48c7 --- /dev/null +++ b/backend/app/db/functions/address_functions.py @@ -0,0 +1,64 @@ +from sqlalchemy.orm import Session +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import func +from backend.app.db.models.addresses import PostcodeSearch +from utils.logger import setup_logger + +logger = setup_logger() + + +def _get_associated_records(results, uprn, uprn_key="UPRN"): + matched_record = [] + for x in results: + if "DPA" in x: + if x["DPA"].get(uprn_key) == str(uprn): + matched_record.append(x["DPA"]) + else: + if x["LPI"].get(uprn_key) == str(uprn): + matched_record.append(x["LPI"]) + + return matched_record + + +def get_associated_uprns(session: Session, postcode: str, uprn: str): + """ + Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based + on parent UPRN. This will be properties in the same building + + Parent UPRN is referenced in the following docs: + https://static.geoplace.co.uk/downloads/GeoPlace-Data-Entry-Conventions-Best-Practice-for-Addresses.pdf + + :param session: The database session + :param postcode: The postcode string to search for + :param uprn: The UPRN string to match + :return: The matching PostcodeSearch record, or None if not found + """ + try: + + record = ( + session.query(PostcodeSearch) + .filter(func.upper(PostcodeSearch.postcode) == postcode) + .first() + ) + + matched_record = _get_associated_records(results=record.result_data["results"], uprn=uprn) + + if len(matched_record) != 1: + logger.error("Something went wrong, about to return nothing") + return [] + + if not matched_record[0].get("PARENT_UPRN"): + logger.info("No parent UPRN found, cannot get associated records") + return [] + + associated_records = _get_associated_records( + results=record.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN" + ) + # We now fetch all UPRNS with the same parent UPRN + associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)] + + return associated_uprns + + except SQLAlchemyError as e: + session.rollback() + raise e diff --git a/backend/app/db/models/addresses.py b/backend/app/db/models/addresses.py new file mode 100644 index 00000000..51e9540f --- /dev/null +++ b/backend/app/db/models/addresses.py @@ -0,0 +1,34 @@ +from sqlalchemy import ( + Column, + Integer, + String, + JSON, + TIMESTAMP, + func, + UniqueConstraint, +) +from sqlalchemy.orm import declarative_base + +Base = declarative_base() + + +class PostcodeSearch(Base): + __tablename__ = "postcode_search" + + id = Column(Integer, primary_key=True, autoincrement=True) + + # Normalized postcode (uppercase, no spaces) + postcode = Column(String, nullable=False, unique=True) + + # Full OS Places API response (stored as JSONB) + result_data = Column(JSON, nullable=False) + + # Timestamp for when the entry was first created + created_at = Column(TIMESTAMP(timezone=False), server_default=func.now(), nullable=False) + + __table_args__ = ( + UniqueConstraint("postcode", name="uq_postcode_search_postcode"), + ) + + def __repr__(self): + return f"" diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 1a1e75b8..1cd379b9 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -1,9 +1,7 @@ -import ast import json from copy import deepcopy from datetime import datetime -from sqlalchemy import Nullable from tqdm import tqdm import pandas as pd import numpy as np @@ -26,6 +24,8 @@ from backend.app.db.functions.recommendations_functions import ( ) from backend.app.db.functions.funding_functions import upload_funding from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn +from backend.app.db.functions.address_functions import get_associated_uprns + from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES from backend.app.plan.utils import ( @@ -524,6 +524,14 @@ async def model_engine(body: PlanTriggerRequest): full_address = config["domna_full_address"] if body.file_format == "domna_asset_list" else None heating_system = parse_heating_system(config) + associated_uprns = [] + if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat": + # We're running a remote assessment for a flat - we go and grab the associated + # UPRNS for other units in the same building + associated_uprns = get_associated_uprns( + session, postcode=config["postcode"], uprn=uprn + ) + epc_searcher = SearchEpc( address1=address1, postcode=config["postcode"], @@ -531,7 +539,8 @@ async def model_engine(body: PlanTriggerRequest): auth_token=get_settings().EPC_AUTH_TOKEN, os_api_key="", full_address=full_address, - heating_system=heating_system + heating_system=heating_system, + associated_uprns=associated_uprns ) epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)