refactoring db requests to run in bulk

This commit is contained in:
Khalim Conn-Kowlessar 2025-12-19 09:43:13 +08:00
parent d7b9803090
commit 9c34e202bc
6 changed files with 66 additions and 75 deletions

View file

@ -17,6 +17,8 @@ class Address:
domna_full_address: Optional[str]
domna_address_1: Optional[str]
landlord_heating_system: Optional[str] = None
solar_reason: Optional[str] = None
cavity_reason: Optional[str] = None
@property
def address1(self):

View file

@ -20,7 +20,7 @@ def _get_associated_records(results, uprn, uprn_key="UPRN"):
return matched_record
def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str):
def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str | int):
"""
Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based
on parent UPRN. This will be properties in the same building
@ -36,6 +36,10 @@ def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str):
if not postcode_search:
return []
if isinstance(uprn, int):
# For this, coerce to string
uprn = str(uprn)
matched_record = _get_associated_records(results=postcode_search.result_data["results"], uprn=uprn)
if len(matched_record) != 1:

View file

@ -2,7 +2,6 @@ import re
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, Type, TypeVar
from sqlalchemy.orm import Session
from datetime import timezone
from enum import Enum
from datetime import datetime, timedelta
@ -24,7 +23,6 @@ from backend.app.db.models.inspections import (
InspectionsCladding,
InspectionsAccessIssues,
)
from sqlalchemy.dialects.postgresql import insert
NON_INTRUSIVE_PREFIX = "non-intrusives:"

View file

@ -96,27 +96,23 @@ def create_plan(session: Session, plan):
raise e
def create_scenario(session: Session, scenario):
"""
This function will create a record for the scenario in the database if it does not exist.
:param session: The database session
:param scenario: dictionary of data representing a scenario to be created
"""
try:
def create_scenario(session: Session, scenario: dict) -> int:
existing_scenario = (
session.query(Scenario)
.filter_by(portfolio_id=scenario["portfolio_id"])
.first()
)
# Before creating a new scenario, we check if there is a scenario for this portfolio id already
# If there is, it means that any new scnario created will NOT be the default scenario
existing_scenario = session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first()
scenario["is_default"] = True if not existing_scenario else False
scenario["is_default"] = not bool(existing_scenario)
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush()
session.commit()
return new_scenario
except SQLAlchemyError as e:
session.rollback()
raise e
new_scenario = Scenario(**scenario)
session.add(new_scenario)
session.flush() # ensures ID is populated
scenario_id = new_scenario.id
session.commit()
return scenario_id
def create_recommendation(session: Session, recommendation):

View file

@ -1,9 +1,9 @@
import ast
import os
import time
import msgpack
from uuid import UUID
from typing import Any
from utils.s3 import read_from_s3
from backend.addresses.Address import Address
from backend.app.config import get_settings
from backend.app.plan.data_classes import PropertyRequestData
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
@ -52,21 +52,20 @@ def patch_epc(patch, epc_records):
def extract_property_request_data(
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
address: Address, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
patch = next((
x for x in patches if str(x["uprn"]) == str(config["uprn"])
x for x in patches if str(x["uprn"]) == str(address.uprn)
), {})
else:
patch = next((
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), {})
property_already_installed = next((
x for x in already_installed if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
x for x in already_installed if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), [])
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
@ -85,7 +84,7 @@ def extract_property_request_data(
else:
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
(x["address"] == address.address) and (x["postcode"] == address.postcode)
), {})
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
@ -114,7 +113,7 @@ def extract_property_request_data(
else:
property_valuation = next((
float(x["valuation"]) for x in valuation_data if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
(x["address"] == address.address) and (x["postcode"] == address.postcode)
), None)
# Return data class to give a structured format
@ -126,14 +125,14 @@ def extract_property_request_data(
)
def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[
None, None, None, list]:
solar_identification = config.get("solar_reason", None)
cavity_identification = config.get("cavity_reason", None)
solar_identification = addr.solar_reason
cavity_identification = addr.cavity_reason
if not solar_identification and not cavity_identification:
return None, None, None, []
landlord_heating_system = config["landlord_heating_system"]
landlord_heating_system = addr.landlord_heating_system
# This is the initial version of tackling "already installed" measures
already_installed = []
if landlord_heating_system == "air source heat pump":

View file

@ -672,6 +672,7 @@ async def model_engine(body: PlanTriggerRequest):
landlord_ids = addresses.get_landlord_ids()
postcodes = addresses.get_postcodes_for_flats()
# Check if we've seen these properties before
with db_read_session() as session:
existing_properties = db_funcs.property_functions.get_existing_properties(
session, body.portfolio_id, uprns, landlord_ids
@ -699,32 +700,31 @@ async def model_engine(body: PlanTriggerRequest):
)
# If we have properties that need to be created, we cerate them in bulk
new_property_ids = set()
if to_create:
with db_session() as session:
inserted = db_funcs.property_functions.bulk_create_properties(
session, body, to_create, energy_assessments_by_uprn
)
for prop_id, uprn, landlord_property_id in inserted:
new_property_ids.add(prop_id)
# We append the newly created properties to property_lookup
for prop_id, uprn, landlord_property_id in inserted:
if uprn is not None:
property_lookup[("uprn", uprn)] = prop_id
if landlord_property_id:
property_lookup[("landlord_property_id", landlord_property_id)] = prop_id
# We append the newly created properties to property_lookup
input_properties, inspections_map, eco_packages = [], {}, {}
for addr in tqdm(addresses):
# Identity data
uprn = addr.uprn
address1 = addr.address1
postcode = addr.postcode
full_address = addr.full_address
heating_system = addr.heating_system
for addr, config in tqdm(
zip(addresses, plan_input),
total=len(addresses),
desc="Processing properties",
):
# ---------- 1) filter fetched data ----------
epc_cache = epc_cache_by_uprn[uprn]
epc_cache = epc_cache_by_uprn[addr.uprn]
epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
# Extract from EPC cache
if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH:
@ -732,19 +732,19 @@ async def model_engine(body: PlanTriggerRequest):
# Extract associated UPRNs from the database response
associated_uprns = db_funcs.address_functions.get_associated_uprns(
postcode_searches.get(postcode.upper()), uprn=uprn
postcode_searches.get(addr.postcode.upper()), uprn=addr.uprn
)
energy_assessment = energy_assessments_by_uprn.get(uprn)
energy_assessment = energy_assessments_by_uprn.get(addr.uprn)
epc_searcher = SearchEpc(
address1=address1,
postcode=postcode,
uprn=uprn,
address1=addr.address1,
postcode=addr.postcode,
uprn=addr.uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key="",
full_address=full_address,
heating_system=heating_system,
full_address=addr.full_address,
heating_system=addr.heating_system,
associated_uprns=associated_uprns
)
epc_searcher.ordnance_survey_client.built_form = addr.built_form
@ -754,26 +754,19 @@ async def model_engine(body: PlanTriggerRequest):
epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True)
epc_searcher.set_uprn_source(file_format=body.file_format)
# ---------- 2) ensure property exists ----------
with db_session() as session:
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment, landlord_property_id=addr.landlord_property_id
)
lookup_key = (
("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id)
)
property_id = property_lookup[lookup_key]
if not property_id or (not is_new and not body.multi_plan):
if not property_id:
logger.error("Could not find property ID for address: %s", addr.request_data)
# Should not happen unless input data is inconsistent
continue
if is_new:
# TODO: We can probably make these queries in bulk at the front end and use a placeholder
# property ID, and then inject this information afterwards
with db_session() as session:
db_funcs.property_functions.create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
is_new = property_id in new_property_ids
if not is_new and not body.multi_plan:
continue
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
# Otherwise, we use the newest EPC
@ -784,7 +777,7 @@ async def model_engine(body: PlanTriggerRequest):
)
req_data = extract_property_request_data(
config=config,
address=addr,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
@ -803,7 +796,7 @@ async def model_engine(body: PlanTriggerRequest):
epc_page=epc_page,
rrn=rrn,
cleaned_address=epc_searcher.address_clean,
config_address=config["address"],
config_address=addr.address,
address_postal_town=epc_searcher.address_postal_town
)
)
@ -817,7 +810,7 @@ async def model_engine(body: PlanTriggerRequest):
prepared_epc = averages_cleaning(prepared_epc, cleaning_data)
# If we have an ECO project, we parse the cavity/solar reasons
eco_packages[property_id] = parse_eco_packages(config, prepared_epc)
eco_packages[property_id] = parse_eco_packages(addr, prepared_epc)
# Final step - extract inspections data, if we have it - we inject into property for usage
property_inspections = db_funcs.inspections_functions.extract_inspection_data(config)
@ -1332,7 +1325,7 @@ async def model_engine(body: PlanTriggerRequest):
scenario_id = body.scenario_id
else:
with db_session() as session:
engine_scenario = db_funcs.recommendations_functions.create_scenario(
scenario_id = db_funcs.recommendations_functions.create_scenario(
session=session,
scenario={
"name": body.scenario_name,
@ -1350,7 +1343,6 @@ async def model_engine(body: PlanTriggerRequest):
"multi_plan": body.multi_plan
}
)
scenario_id = engine_scenario.id
for i in tqdm(
range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE))