From 9c34e202bc0797b23881e2177aa1f2d7ee982ebd Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 19 Dec 2025 09:43:13 +0800 Subject: [PATCH] refactoring db requests to run in bulk --- backend/addresses/Address.py | 2 + backend/app/db/functions/address_functions.py | 6 +- .../app/db/functions/inspections_functions.py | 2 - .../db/functions/recommendations_functions.py | 34 ++++----- backend/app/plan/utils.py | 25 ++++--- backend/engine/engine.py | 72 +++++++++---------- 6 files changed, 66 insertions(+), 75 deletions(-) diff --git a/backend/addresses/Address.py b/backend/addresses/Address.py index d6a00407..9b95f5e0 100644 --- a/backend/addresses/Address.py +++ b/backend/addresses/Address.py @@ -17,6 +17,8 @@ class Address: domna_full_address: Optional[str] domna_address_1: Optional[str] landlord_heating_system: Optional[str] = None + solar_reason: Optional[str] = None + cavity_reason: Optional[str] = None @property def address1(self): diff --git a/backend/app/db/functions/address_functions.py b/backend/app/db/functions/address_functions.py index 3074b02a..4b8ad5f2 100644 --- a/backend/app/db/functions/address_functions.py +++ b/backend/app/db/functions/address_functions.py @@ -20,7 +20,7 @@ def _get_associated_records(results, uprn, uprn_key="UPRN"): return matched_record -def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str): +def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str | int): """ Given a postcode and UPRN, for a remote assessment, fetch all associated UPRNs, based on parent UPRN. This will be properties in the same building @@ -36,6 +36,10 @@ def get_associated_uprns(postcode_search: PostcodeSearch, uprn: str): if not postcode_search: return [] + if isinstance(uprn, int): + # For this, coerce to string + uprn = str(uprn) + matched_record = _get_associated_records(results=postcode_search.result_data["results"], uprn=uprn) if len(matched_record) != 1: diff --git a/backend/app/db/functions/inspections_functions.py b/backend/app/db/functions/inspections_functions.py index d66154cb..b1c1eeb5 100644 --- a/backend/app/db/functions/inspections_functions.py +++ b/backend/app/db/functions/inspections_functions.py @@ -2,7 +2,6 @@ import re from dataclasses import dataclass, asdict from typing import Optional, Dict, Any, Type, TypeVar from sqlalchemy.orm import Session -from datetime import timezone from enum import Enum from datetime import datetime, timedelta @@ -24,7 +23,6 @@ from backend.app.db.models.inspections import ( InspectionsCladding, InspectionsAccessIssues, ) -from sqlalchemy.dialects.postgresql import insert NON_INTRUSIVE_PREFIX = "non-intrusives:" diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index 14596749..b70111da 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -96,27 +96,23 @@ def create_plan(session: Session, plan): raise e -def create_scenario(session: Session, scenario): - """ - This function will create a record for the scenario in the database if it does not exist. - :param session: The database session - :param scenario: dictionary of data representing a scenario to be created - """ - try: +def create_scenario(session: Session, scenario: dict) -> int: + existing_scenario = ( + session.query(Scenario) + .filter_by(portfolio_id=scenario["portfolio_id"]) + .first() + ) - # Before creating a new scenario, we check if there is a scenario for this portfolio id already - # If there is, it means that any new scnario created will NOT be the default scenario - existing_scenario = session.query(Scenario).filter_by(portfolio_id=scenario["portfolio_id"]).first() - scenario["is_default"] = True if not existing_scenario else False + scenario["is_default"] = not bool(existing_scenario) - new_scenario = Scenario(**scenario) - session.add(new_scenario) - session.flush() - session.commit() - return new_scenario - except SQLAlchemyError as e: - session.rollback() - raise e + new_scenario = Scenario(**scenario) + session.add(new_scenario) + session.flush() # ensures ID is populated + + scenario_id = new_scenario.id + session.commit() + + return scenario_id def create_recommendation(session: Session, recommendation): diff --git a/backend/app/plan/utils.py b/backend/app/plan/utils.py index 717638cf..52e2b0c4 100644 --- a/backend/app/plan/utils.py +++ b/backend/app/plan/utils.py @@ -1,9 +1,9 @@ +import ast import os -import time import msgpack from uuid import UUID -from typing import Any from utils.s3 import read_from_s3 +from backend.addresses.Address import Address from backend.app.config import get_settings from backend.app.plan.data_classes import PropertyRequestData from backend.app.db.functions.tasks.Tasks import SubTaskInterface @@ -52,21 +52,20 @@ def patch_epc(patch, epc_records): def extract_property_request_data( - config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn + address: Address, patches, already_installed, non_invasive_recommendations, valuation_data, uprn ): patch_has_uprn = "uprn" in patches[0] if patches else True if patch_has_uprn: patch = next(( - x for x in patches if str(x["uprn"]) == str(config["uprn"]) + x for x in patches if str(x["uprn"]) == str(address.uprn) ), {}) else: patch = next(( - x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode) ), {}) property_already_installed = next(( - x for x in already_installed if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + x for x in already_installed if (x["address"] == address.address) and (x["postcode"] == address.postcode) ), []) # Because we have some non-invasive recommendations that match on address and postcode, but not UPRN @@ -85,7 +84,7 @@ def extract_property_request_data( else: property_non_invasive_recommendations = next(( x for x in non_invasive_recommendations if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + (x["address"] == address.address) and (x["postcode"] == address.postcode) ), {}) if isinstance(property_non_invasive_recommendations.get("recommendations"), str): @@ -114,7 +113,7 @@ def extract_property_request_data( else: property_valuation = next(( float(x["valuation"]) for x in valuation_data if - (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + (x["address"] == address.address) and (x["postcode"] == address.postcode) ), None) # Return data class to give a structured format @@ -126,14 +125,14 @@ def extract_property_request_data( ) -def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[ +def parse_eco_packages(addr: Address, prepared_epc) -> tuple[list[str], int, str, list[str]] | tuple[ None, None, None, list]: - solar_identification = config.get("solar_reason", None) - cavity_identification = config.get("cavity_reason", None) + solar_identification = addr.solar_reason + cavity_identification = addr.cavity_reason if not solar_identification and not cavity_identification: return None, None, None, [] - landlord_heating_system = config["landlord_heating_system"] + landlord_heating_system = addr.landlord_heating_system # This is the initial version of tackling "already installed" measures already_installed = [] if landlord_heating_system == "air source heat pump": diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 46490289..c19cf28f 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -672,6 +672,7 @@ async def model_engine(body: PlanTriggerRequest): landlord_ids = addresses.get_landlord_ids() postcodes = addresses.get_postcodes_for_flats() + # Check if we've seen these properties before with db_read_session() as session: existing_properties = db_funcs.property_functions.get_existing_properties( session, body.portfolio_id, uprns, landlord_ids @@ -699,32 +700,31 @@ async def model_engine(body: PlanTriggerRequest): ) # If we have properties that need to be created, we cerate them in bulk + new_property_ids = set() if to_create: with db_session() as session: inserted = db_funcs.property_functions.bulk_create_properties( session, body, to_create, energy_assessments_by_uprn ) + for prop_id, uprn, landlord_property_id in inserted: + new_property_ids.add(prop_id) + # We append the newly created properties to property_lookup for prop_id, uprn, landlord_property_id in inserted: if uprn is not None: property_lookup[("uprn", uprn)] = prop_id if landlord_property_id: property_lookup[("landlord_property_id", landlord_property_id)] = prop_id - # We append the newly created properties to property_lookup - input_properties, inspections_map, eco_packages = [], {}, {} - for addr in tqdm(addresses): - - # Identity data - uprn = addr.uprn - address1 = addr.address1 - postcode = addr.postcode - full_address = addr.full_address - heating_system = addr.heating_system + for addr, config in tqdm( + zip(addresses, plan_input), + total=len(addresses), + desc="Processing properties", + ): # ---------- 1) filter fetched data ---------- - epc_cache = epc_cache_by_uprn[uprn] + epc_cache = epc_cache_by_uprn[addr.uprn] epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] # Extract from EPC cache if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH: @@ -732,19 +732,19 @@ async def model_engine(body: PlanTriggerRequest): # Extract associated UPRNs from the database response associated_uprns = db_funcs.address_functions.get_associated_uprns( - postcode_searches.get(postcode.upper()), uprn=uprn + postcode_searches.get(addr.postcode.upper()), uprn=addr.uprn ) - energy_assessment = energy_assessments_by_uprn.get(uprn) + energy_assessment = energy_assessments_by_uprn.get(addr.uprn) epc_searcher = SearchEpc( - address1=address1, - postcode=postcode, - uprn=uprn, + address1=addr.address1, + postcode=addr.postcode, + uprn=addr.uprn, auth_token=get_settings().EPC_AUTH_TOKEN, os_api_key="", - full_address=full_address, - heating_system=heating_system, + full_address=addr.full_address, + heating_system=addr.heating_system, associated_uprns=associated_uprns ) epc_searcher.ordnance_survey_client.built_form = addr.built_form @@ -754,26 +754,19 @@ async def model_engine(body: PlanTriggerRequest): epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True) epc_searcher.set_uprn_source(file_format=body.file_format) - # ---------- 2) ensure property exists ---------- - with db_session() as session: - property_id, is_new = db_funcs.property_functions.ensure_property_exists( - session, body, epc_searcher, energy_assessment, landlord_property_id=addr.landlord_property_id - ) + lookup_key = ( + ("uprn", addr.uprn) if addr.uprn is not None else ("landlord_property_id", addr.landlord_property_id) + ) + property_id = property_lookup[lookup_key] - if not property_id or (not is_new and not body.multi_plan): + if not property_id: + logger.error("Could not find property ID for address: %s", addr.request_data) + # Should not happen unless input data is inconsistent continue - if is_new: - # TODO: We can probably make these queries in bulk at the front end and use a placeholder - # property ID, and then inject this information afterwards - with db_session() as session: - db_funcs.property_functions.create_property_targets( - session, - property_id=property_id, - portfolio_id=body.portfolio_id, - epc_target=body.goal_value, - heat_demand_target=None - ) + is_new = property_id in new_property_ids + if not is_new and not body.multi_plan: + continue # If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that. # Otherwise, we use the newest EPC @@ -784,7 +777,7 @@ async def model_engine(body: PlanTriggerRequest): ) req_data = extract_property_request_data( - config=config, + address=addr, patches=patches, already_installed=already_installed, non_invasive_recommendations=non_invasive_recommendations, @@ -803,7 +796,7 @@ async def model_engine(body: PlanTriggerRequest): epc_page=epc_page, rrn=rrn, cleaned_address=epc_searcher.address_clean, - config_address=config["address"], + config_address=addr.address, address_postal_town=epc_searcher.address_postal_town ) ) @@ -817,7 +810,7 @@ async def model_engine(body: PlanTriggerRequest): prepared_epc = averages_cleaning(prepared_epc, cleaning_data) # If we have an ECO project, we parse the cavity/solar reasons - eco_packages[property_id] = parse_eco_packages(config, prepared_epc) + eco_packages[property_id] = parse_eco_packages(addr, prepared_epc) # Final step - extract inspections data, if we have it - we inject into property for usage property_inspections = db_funcs.inspections_functions.extract_inspection_data(config) @@ -1332,7 +1325,7 @@ async def model_engine(body: PlanTriggerRequest): scenario_id = body.scenario_id else: with db_session() as session: - engine_scenario = db_funcs.recommendations_functions.create_scenario( + scenario_id = db_funcs.recommendations_functions.create_scenario( session=session, scenario={ "name": body.scenario_name, @@ -1350,7 +1343,6 @@ async def model_engine(body: PlanTriggerRequest): "multi_plan": body.multi_plan } ) - scenario_id = engine_scenario.id for i in tqdm( range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE))