refactoring database fetches

This commit is contained in:
Khalim Conn-Kowlessar 2025-12-19 08:41:59 +08:00
parent de145f5970
commit d7b9803090
11 changed files with 646 additions and 98 deletions

View file

@ -997,7 +997,15 @@ class AssetList:
# Keep a record of duplicates
self.duplicated_addresses = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy()
][[self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname, self.postcode_colname]].copy()
df = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].isin(
self.duplicated_addresses[self.DOMNA_PROPERTY_ID])
][[self.landlord_property_id, self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname,
self.postcode_colname]].copy()
df = df.sort_values(by=[self.DOMNA_PROPERTY_ID])
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()

View file

@ -59,6 +59,39 @@ def app():
Property UPRN
"""
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Warmfront/SCIS")
data_filename = "SCIS_Historic_Deemed_Combined_Workings.xlsx"
sheet_name = "SCIS"
postcode_column = 'POSTCODE'
address1_column = "NO"
address1_method = None
fulladdress_column = None
address_cols_to_concat = ["NO", "Street / Block Name", "Town/Area"]
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = None
landlord_property_type = "PROPERTY TYPE As per table emailed"
landlord_built_form = "PROPERTY TYPE As per table emailed"
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "Row ID"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
outcomes_postcode = None
outcomes_houseno = None
outcomes_id = None
outcomes_address = None
master_filepaths = []
master_id_colnames = []
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = None
asset_list_header = 0
landlord_block_reference = None
# Peabody data for cleaning
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation")

View file

@ -464,6 +464,60 @@ BUILT_FORM_MAPPINGS = {
'EnclosedEndTerrace': 'enclosed end-terrace',
'EndTerrace': 'end-terrace',
'SemiDetached': 'semi-detached',
'MidTerrace': 'mid-terrace'
'MidTerrace': 'mid-terrace',
'1st FLOOR FLAT': 'mid-floor',
'END TERRACE HOUSE': 'end-terrace',
'BUNGALOW-END TERRACE': 'end-terrace',
'BUNGALOW END TERRACE': 'end-terrace',
'END-TERRACE': 'end-terrace',
'SEMI DETACHED': 'semi-detached',
'Mid flat Ground Floor': 'ground floor',
'MID TERRACED': 'mid-terrace',
'Mid Terrace bungalow': 'mid-terrace',
'BUNGLAOW SEMI DETACHED': 'detached',
'Bungalow ENd Terrace': 'end-terrace',
'Bungalow Semi detached': 'detached',
'BUNGALOW - SEMI DETACHED': 'detached',
'Bungalow mid terrace': 'mid-terrace',
'BUNGALOW - MID TERRACED': 'mid-terrace',
'BUNGALOW - MID TERRACE': 'mid-terrace',
'Bungalow end terrace': 'end-terrace',
'BUNGALOW SEMI-DETACHED': 'detached',
'MID TERR': 'mid-terrace',
'Bungalow - mid terrace': 'mid-terrace',
'MID-TERRACE': 'mid-terrace',
'Bunagalow Semi Detached': 'semi-detached',
'SEMI DETACHED BUNGALOW': 'semi-detached',
'MID TERRACE HOUSE': 'mid-terrace',
'END - TERRACE': 'end-terrace',
'BUNGALOW-SEMI DETACHED': 'semi-detached',
'Semi-Detached': 'semi-detached',
'End-Terrace house': 'end-terrace',
'BUNGALOW MID TERRACE': 'mid-terrace',
'SEMI DETACHED HOUSE': 'semi-detached',
'BUNGALOW SEMI DETACHED': 'detached',
'MID - TERRACE': 'mid-terrace',
'3 EXT WALL FLAT': 'end-terrace',
'3 Ext wall flat': 'end-terrace',
'3 EX WALL FLAT': 'end-terrace',
'2 ext wall flats': 'mid-terrace',
'2 EXT WALLS': 'mid-terrace',
'3.EXT.WALL FLAT': 'end-terrace',
'FLAT 3 WALLS': 'end-terrace',
'2 Ext Wall flat': 'mid-terrace',
'DETATCHED HOUSE': 'detached',
'3 EXT. WALL FLAT': 'end-terrace',
'3 ext wall flat': 'end-terrace',
'3 EXT WALLS': 'end-terrace',
'3 EXT WALL - NOW 2 EXT': 'unknown',
'3 EXT-WALL FLAT': 'end-terrace',
'FLAT 2 WALLS': 'mid-terrace',
'3 EX WALL MAISONETTE': 'end-terrace',
'3 Ext Wall Flat': 'end-terrace',
'Semi Bungalow': 'semi-detached',
'2 EXT WALL FLAT': 'mid-terrace',
'2.EXT.WALL FLAT': 'mid-terrace',
'2 EXT. WALL FLAT': 'mid-terrace',
}

View file

@ -362,6 +362,71 @@ PROPERTY_MAPPING = {
'Maisonette: Semi Detached: Mid Floor': 'maisonette',
'Maisonette: Detached: Mid Floor': 'maisonette',
'House: EnclosedMidTerrace': 'house'
'House: EnclosedMidTerrace': 'house',
'3 EXT WALL FLAT': 'flat',
'1st FLOOR FLAT': 'flat',
'3 Ext wall flat': 'flat',
'3 EX WALL FLAT': 'flat',
'END TERRACE HOUSE': 'house',
'BUNGALOW-END TERRACE': 'bungalow',
'BUNGALOW END TERRACE': 'bungalow',
'2 ext wall flats': 'flat',
'Mid flat Ground Floor': 'flat',
'3.EXT.WALL FLAT': 'flat',
'FLAT 3 WALLS': 'flat',
'Mid Terrace bungalow': 'bungalow',
'Bungalow ENd Terrace': 'bungalow',
'2 Ext Wall flat': 'flat',
'DETATCHED HOUSE': 'house',
'Bungalow Semi detached': 'bungalow',
'BUNGALOW - SEMI DETACHED': 'bungalow',
'Bungalow mid terrace': 'bungalow',
'BUNGALOW - MID TERRACED': 'bungalow',
'BUNGALOW - MID TERRACE': 'bungalow',
'Bungalow end terrace': 'bungalow',
'3 EXT. WALL FLAT': 'flat',
'3 ext wall flat': 'flat',
'BUNGALOW SEMI-DETACHED': 'bungalow',
'3 EXT-WALL FLAT': 'flat',
'Bungalow - mid terrace': 'bungalow',
'SEMI DETACHED BUNGALOW': 'bungalow',
'FLAT 2 WALLS': 'flat',
'MID TERRACE HOUSE': 'house',
'3 EX WALL MAISONETTE': 'maisonette',
'BUNGALOW-SEMI DETACHED': 'bungalow',
'3 Ext Wall Flat': 'flat',
'Semi Bungalow': 'bungalow',
'End-Terrace house': 'house',
'BUNGALOW MID TERRACE': 'bungalow',
'Mid-terrace house': 'house',
'SEMI DETACHED HOUSE': 'house',
'Semi-detached house': 'house',
'2 EXT WALL FLAT': 'flat',
'2.EXT.WALL FLAT': 'flat',
'BUNGALOW SEMI DETACHED': 'bungalow',
'2 EXT. WALL FLAT': 'flat',
'END-TERRACE': 'unknown',
'SEMI DETACHED': 'unknown',
'2 EXT WALLS': 'unknown',
'MID TERRACED': 'unknown',
'BUNGLAOW SEMI DETACHED': 'bungalow',
'END TERRACE': 'unknown',
'3 EXT WALLS': 'unknown',
'Mid Terrace': 'unknown',
'3 EXT WALL - NOW 2 EXT': 'unknown',
'MID TERR': 'unknown',
'DETACHED': 'unknown',
'MID-TERRACE': 'unknown',
'Bunagalow Semi Detached': 'bungalow',
'End-terrace': 'unknown',
'END - TERRACE': 'unknown',
'SEMI-DETACHED': 'unknown',
'Semi-Detached': 'unknown',
'MID TERRACE': 'unknown',
'End Terrace': 'unknown',
'Detached': 'unknown',
'Mid-terrace': 'unknown',
'MID - TERRACE': 'unknown'
}

View file

@ -0,0 +1,65 @@
from dataclasses import dataclass
from typing import Optional
@dataclass(slots=True)
class Address:
uprn: Optional[int]
landlord_property_id: Optional[str]
address: Optional[str]
full_address: Optional[str]
postcode: str
property_type: Optional[str]
built_form: Optional[str]
estimated: bool
# Additional address data, associated to a standardised asset list
domna_full_address: Optional[str]
domna_address_1: Optional[str]
landlord_heating_system: Optional[str] = None
@property
def address1(self):
if self.domna_address_1 is not None:
address1 = self.domna_address_1
else:
address1 = self.address
# Format
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
return address1
@property
def request_data(self) -> dict[str, Optional[str]]:
"""
Canonical request payload for downstream services.
"""
data = {
"uprn": self.uprn,
"landlord_property_id": self.landlord_property_id,
"postcode": self.postcode,
"address1": self.address1,
"full_address": self.full_address,
}
# Drop nulls
return {k: v for k, v in data.items() if v is not None}
@property
def heating_system(self):
"""
Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited,
placeholder function to cover some initial immediate cases.
:return:
"""
ll_heating = self.landlord_property_id
if not ll_heating:
return None
if ll_heating == "electric storage heaters":
# Return with the same format at the EPC
return "Electric storage heaters"
return None

View file

@ -0,0 +1,84 @@
from backend.addresses.Address import Address
class Addresses:
def __init__(self, addresses: list[Address]):
self._addresses = addresses
# self._identity_index = self._build_identity_index()
def __getitem__(self, index: int) -> Address:
return self._addresses[index]
def __len__(self) -> int:
return len(self._addresses)
@classmethod
def from_plan_input(cls, plan_input: list[dict], body) -> "Addresses":
addresses = []
for row in plan_input:
addresses.append(cls._parse_row(row, body))
return cls(addresses)
def get_uprns(self):
return [x.uprn for x in self._addresses if x.uprn is not None]
def get_landlord_ids(self):
return [x.landlord_property_id for x in self._addresses if x.landlord_property_id is not None]
def get_unique_postcodes(self):
return list({x.postcode for x in self._addresses})
def get_postcodes_for_flats(self):
# Method to extract all of the postcodes associated to a flat, which is used for remote assessments
# on flats
return [x.postcode for x in self._addresses if x.property_type in ["Flat", "flat"]]
def get_property_requests(self):
return [x.request_data for x in self._addresses]
@staticmethod
def _parse_row(row: dict, body) -> Address:
def clean_uprn(v):
try:
return int(float(v))
except (TypeError, ValueError):
return None
uprn = clean_uprn(row.get("uprn"))
address = row.get("address")
if not address and body.file_format == "domna_asset_list":
address = row.get("domna_address_1")
full_address = (
row.get("domna_full_address")
if body.file_format == "domna_asset_list"
else None
)
if not isinstance(full_address, str):
full_address = None
postcode = str(row["postcode"]).strip().upper()
return Address(
uprn=uprn,
landlord_property_id=str(row["landlord_property_id"])
if row.get("landlord_property_id") else None,
address=str(address).strip() if address else None,
full_address=str(full_address).strip() if full_address else None,
postcode=postcode,
property_type=row.get("property_type"),
built_form=row.get("built_form"),
estimated=bool(row.get("estimated", False)),
domna_full_address=row.get("domna_full_address"),
domna_address_1=row.get("domna_address_1"),
)
# def _build_identity_index(self) -> dict:
# index = {}
# for addr in self._addresses:
# key = addr.identity_key()
# if key in index:
# raise ValueError(f"Duplicate address identity detected: {key}")
# index[key] = addr
# return index

View file

@ -20,7 +20,7 @@ def _get_associated_records(results, uprn, uprn_key="UPRN"):
return matched_record
def get_associated_uprns(session: Session, postcode: str, uprn: str):
def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str):
"""
Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based
on parent UPRN. This will be properties in the same building
@ -28,40 +28,83 @@ def get_associated_uprns(session: Session, postcode: str, uprn: str):
Parent UPRN is referenced in the following docs:
https://static.geoplace.co.uk/downloads/GeoPlace-Data-Entry-Conventions-Best-Practice-for-Addresses.pdf
:param session: The database session
:param postcode: The postcode string to search for
:param PostcodeSearch postcode_search: The postcode search record
:param uprn: The UPRN string to match
:return: The matching PostcodeSearch record, or None if not found
"""
try:
record = (
session.query(PostcodeSearch)
.filter(func.upper(PostcodeSearch.postcode) == postcode)
.first()
)
if not record:
# No record found for this postcode
return []
if not postcode_search:
return []
matched_record = _get_associated_records(results=record.result_data["results"], uprn=uprn)
matched_record = _get_associated_records(results=postcode_search.result_data["results"], uprn=uprn)
if len(matched_record) != 1:
logger.error("Something went wrong, about to return nothing")
return []
if len(matched_record) != 1:
return []
if not matched_record[0].get("PARENT_UPRN"):
logger.info("No parent UPRN found, cannot get associated records")
return []
if not matched_record[0].get("PARENT_UPRN"):
logger.info("No parent UPRN found, cannot get associated records")
return []
associated_records = _get_associated_records(
results=record.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN"
)
# We now fetch all UPRNS with the same parent UPRN
associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)]
associated_records = _get_associated_records(
results=postcode_search.result_data["results"], uprn=matched_record[0]["PARENT_UPRN"], uprn_key="PARENT_UPRN"
)
# We now fetch all UPRNS with the same parent UPRN
associated_uprns = [int(x["UPRN"]) for x in associated_records if x["UPRN"] != str(uprn)]
return associated_uprns
return associated_uprns
except SQLAlchemyError as e:
session.rollback()
raise e
def get_by_postcodes(session: Session, postcodes: list[str]) -> dict[str, PostcodeSearch]:
"""
Given a list of postcodes, retrieves postcode data from the database form the PostcodeSearch table
:param session:
:param postcodes:
:return:
"""
if not postcodes:
return {}
normalised = {p.upper() for p in postcodes if p}
records = (
session.query(PostcodeSearch)
.filter(func.upper(PostcodeSearch.postcode).in_(normalised))
.all()
)
return {r.postcode.upper(): r for r in records}
def get_associated_uprns_from_record(record: PostcodeSearch, uprn: str) -> list[int]:
"""
Given the postcode sra
:param record:
:param uprn:
:return:
"""
if not record:
return []
matched_record = _get_associated_records(
results=record.result_data["results"],
uprn=uprn
)
if len(matched_record) != 1:
return []
parent_uprn = matched_record[0].get("PARENT_UPRN")
if not parent_uprn:
return []
associated_records = _get_associated_records(
results=record.result_data["results"],
uprn=parent_uprn,
uprn_key="PARENT_UPRN"
)
return [
int(x["UPRN"])
for x in associated_records
if x["UPRN"] != str(uprn)
]

View file

@ -1,3 +1,4 @@
from typing import Iterable
from backend.app.db.models.energy_assessments import (
EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum
)
@ -63,27 +64,48 @@ def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> D
return uprn_to_assessment_id
def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]:
def get_latest_assessments_for_uprns(
session: Session,
uprns: Iterable[int],
) -> dict[int, dict]:
"""
Retrieve the latest energy assessment for a given UPRN based on the inspection date.
Fetch the latest energy assessment per UPRN in a single query.
:param session: The database session
:param uprn: The unique property reference number
:return: The latest EnergyAssessment object or None if not found
Returns a dict:
uprn -> assessment_dict | empty_response
"""
if not uprn:
return EnergyAssessment.empty_response()
uprns = [u for u in uprns if u]
if not uprns:
return {}
try:
# Query the EnergyAssessment model, filter by uprn, order by inspection_date in descending order
latest_assessment = session.query(EnergyAssessment).filter_by(uprn=uprn).order_by(
desc(EnergyAssessment.inspection_date)).first()
# DISTINCT ON requires matching ORDER BY
records = (
session.query(EnergyAssessment)
.filter(EnergyAssessment.uprn.in_(uprns))
.order_by(
EnergyAssessment.uprn,
desc(EnergyAssessment.inspection_date),
)
.distinct(EnergyAssessment.uprn)
.all()
)
return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response()
except Exception as e:
logger.info(f"An error occurred: {e}")
return None
result: dict[int, dict] = {}
for record in records:
result[record.uprn] = record.to_dict()
# Fill missing uprns with empty response
uprn_set = set(uprns)
found_set = set(result.keys())
missing_uprns = uprn_set - found_set
for uprn in missing_uprns:
result[uprn] = EnergyAssessment.empty_response()
return result
def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict):

View file

@ -1,3 +1,4 @@
from typing import List
from datetime import datetime, timedelta, timezone
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
@ -50,6 +51,77 @@ class EpcStoreService:
"epc_page_created_at": record.epc_page_created_at,
}
@classmethod
def get_epcs_for_uprns(cls, session: Session, uprns: List[int]) -> dict[int, dict]:
"""
Given a list of uprns, return a dict mapping each uprn to its EPC data status and content.
:param session:
:param uprns:
:return:
"""
if not uprns:
return {}
cutoff = datetime.now(timezone.utc) - timedelta(days=cls.FRESHNESS_DAYS)
records = (
session.query(EpcStore)
.filter(EpcStore.uprn.in_(uprns))
.all()
)
result: dict[int, dict] = {}
for record in records:
if not record.epc_api_created_at:
result[record.uprn] = {
"status": cls.MISSING,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
continue
if record.epc_api_created_at.date() < cutoff.date():
# We only expose epc_page when epc_api is fresh.
result[record.uprn] = {
"status": cls.EXPIRED,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
continue
result[record.uprn] = {
"status": cls.FRESH,
"epc_api": record.epc_api,
"epc_page": record.epc_page,
"epc_page_rrn": record.epc_page_rrn,
"epc_api_created_at": record.epc_api_created_at,
"epc_page_created_at": record.epc_page_created_at,
}
# For the uprns not found in records, mark them as missing
requested = set(uprns)
found = set(result.keys())
missing = requested - found
for uprn in missing:
result[uprn] = {
"status": cls.MISSING,
"epc_api": None,
"epc_page": None,
"epc_page_rrn": None,
"epc_api_created_at": None,
"epc_page_created_at": None,
}
return result
@classmethod
def check_insert_needed(cls, epc_cache, epc_estimated, uprn):
"""
@ -115,11 +187,7 @@ class EpcStoreService:
)
session.add(record)
session.flush()
session.commit()
return record
except SQLAlchemyError as e:
session.rollback()
raise e

View file

@ -1,14 +1,19 @@
###
# This script contains methods for interacting with the property table in the database
###
from typing import List
import datetime
import pytz
from sqlalchemy import select, or_
from sqlalchemy.orm import Session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.dialects.postgresql import insert
from backend.addresses.Address import Address
from backend.app.db.models.portfolio import (
PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel,
PropertyDetailsSpatial
)
from sqlalchemy.orm.exc import NoResultFound
def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str,
@ -203,3 +208,80 @@ def update_or_create_property_spatial_details(session: Session, uprn: int, prope
session.flush()
return True
def get_existing_properties(session, portfolio_id, uprns, landlord_ids):
"""
Bulk method for checking for existing properties
:param session:
:param portfolio_id:
:param uprns:
:param landlord_ids:
:return:
"""
return (
session.exec(
select(PropertyModel)
.where(PropertyModel.portfolio_id == portfolio_id)
.where(
or_(
PropertyModel.uprn.in_(uprns),
PropertyModel.landlord_property_id.in_(landlord_ids),
)
)
)
.scalars()
.all()
)
def bulk_create_properties(
session,
body,
addresses: list[Address], # these are *new* addresses
energy_assessment_by_uprn: dict[int, dict],
):
rows = []
for addr in addresses:
energy_assessment = energy_assessment_by_uprn.get(addr.uprn, {})
status = (
PortfolioStatus.ASSESSMENT.value
if not energy_assessment.get("epc")
else PortfolioStatus.SURVEY.value
)
rows.append(
{
"address": addr.address1,
"postcode": addr.postcode,
"portfolio_id": body.portfolio_id,
"uprn": addr.uprn,
"landlord_property_id": addr.landlord_property_id,
"creation_status": PropertyCreationStatus.LOADING,
"status": status,
"has_pre_condition_report": False,
"has_recommendations": False,
}
)
if not rows:
return []
stmt = (
insert(PropertyModel)
.values(rows)
.on_conflict_do_nothing(
index_elements=["portfolio_id", "uprn"]
)
.returning(
PropertyModel.id,
PropertyModel.uprn,
PropertyModel.landlord_property_id,
)
)
result = session.execute(stmt)
session.flush()
return result.fetchall()

View file

@ -34,6 +34,7 @@ import backend.app.assumptions as assumptions
from backend.ml_models.api import ModelApi
from backend.Property import Property
from backend.apis.GoogleSolarApi import GoogleSolarApi
from backend.addresses.Addresses import Addresses
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
@ -368,24 +369,6 @@ def get_funding_data():
return project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes
def parse_heating_system(config):
"""
Helper function to extract a heating system, which can be used to estimate EPC. This is a very limited,
placeholder function to cover some initial immediate cases.
:return:
"""
ll_heating = config.get("landlord_heating_system", None)
if not ll_heating:
return None
if ll_heating == "electric storage heaters":
# Return with the same format at the EPC
return "Electric storage heaters"
return None
def check_duplicate_uprns(plan_input):
"""
Simple function to check if the input data contains duplicated UPRNS.
@ -682,38 +665,81 @@ async def model_engine(body: PlanTriggerRequest):
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties, inspections_map, eco_packages = [], {}, {}
for config in tqdm(plan_input):
# Prepare input data
addresses = Addresses.from_plan_input(plan_input, body)
uprn, address1, full_address = extract_address_data(config, body)
heating_system = parse_heating_system(config)
uprns = addresses.get_uprns()
landlord_ids = addresses.get_landlord_ids()
postcodes = addresses.get_postcodes_for_flats()
# ---------- 1) fetch data ----------
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
with db_read_session() as session:
epc_cache = {}
if uprn:
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
with db_read_session() as session:
existing_properties = db_funcs.property_functions.get_existing_properties(
session, body.portfolio_id, uprns, landlord_ids
)
property_lookup = {}
for prop in existing_properties:
if prop.uprn:
property_lookup[("uprn", prop.uprn)] = prop.id
if prop.landlord_property_id:
property_lookup[("landlord_property_id", prop.landlord_property_id)] = prop.id
# For remote assessments of flats, we get associated UPRNs
associated_uprns = []
if body.event_type == "remote_assessment" and config.get("property_type") == "Flat":
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
# List of properties that need to be created in the db
to_create = []
for addr in addresses:
key = ("uprn", addr.uprn) if addr.uprn else ("landlord_property_id", addr.landlord_property_id)
if key not in property_lookup:
to_create.append(addr)
# We check for an energy assessment we have performed on this property:
energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn(
session, uprn
# Pre-requests to the db
with db_read_session() as session:
epc_cache_by_uprn = db_funcs.epc_functions.EpcStoreService.get_epcs_for_uprns(session, uprns)
postcode_searches = db_funcs.address_functions.get_by_postcodes(session, list(postcodes))
energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns(
session, uprns
)
# If we have properties that need to be created, we cerate them in bulk
if to_create:
with db_session() as session:
inserted = db_funcs.property_functions.bulk_create_properties(
session, body, to_create, energy_assessments_by_uprn
)
for prop_id, uprn, landlord_property_id in inserted:
if uprn is not None:
property_lookup[("uprn", uprn)] = prop_id
if landlord_property_id:
property_lookup[("landlord_property_id", landlord_property_id)] = prop_id
# We append the newly created properties to property_lookup
input_properties, inspections_map, eco_packages = [], {}, {}
for addr in tqdm(addresses):
# Identity data
uprn = addr.uprn
address1 = addr.address1
postcode = addr.postcode
full_address = addr.full_address
heating_system = addr.heating_system
# ---------- 1) filter fetched data ----------
epc_cache = epc_cache_by_uprn[uprn]
epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
# Extract from EPC cache
if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
# Extract associated UPRNs from the database response
associated_uprns = db_funcs.address_functions.get_associated_uprns(
postcode_searches.get(postcode.upper()), uprn=uprn
)
energy_assessment = energy_assessments_by_uprn.get(uprn)
epc_searcher = SearchEpc(
address1=address1,
postcode=config["postcode"],
postcode=postcode,
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key="",
@ -721,8 +747,8 @@ async def model_engine(body: PlanTriggerRequest):
heating_system=heating_system,
associated_uprns=associated_uprns
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
epc_searcher.ordnance_survey_client.built_form = addr.built_form
epc_searcher.ordnance_survey_client.property_type = addr.property_type
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True)
@ -731,14 +757,15 @@ async def model_engine(body: PlanTriggerRequest):
# ---------- 2) ensure property exists ----------
with db_session() as session:
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment,
landlord_property_id=config.get("landlord_property_id")
session, body, epc_searcher, energy_assessment, landlord_property_id=addr.landlord_property_id
)
if not property_id or (not is_new and not body.multi_plan):
continue
if is_new:
# TODO: We can probably make these queries in bulk at the front end and use a placeholder
# property ID, and then inject this information afterwards
with db_session() as session:
db_funcs.property_functions.create_property_targets(
session,
@ -783,11 +810,7 @@ async def model_engine(body: PlanTriggerRequest):
epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data,
)
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data)
# TODO: This is a temp function to handle a specific edge case with Peabody. We should
# factor this into EPCRecord as part of the cleaning however we need some more testing
@ -822,6 +845,7 @@ async def model_engine(body: PlanTriggerRequest):
# 2) A real EPC
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
# We store this data
# TODO: Upload in bulk
with db_session() as session:
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn