From 2bc80b4d508db6287b0c3317c162d9ebdb5be717 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Mon, 22 Dec 2025 16:09:31 +0800 Subject: [PATCH] fixing uprn bug --- backend/Property.py | 41 +++------- .../app/db/functions/property_functions.py | 82 ++++++++++++++++++- backend/engine/engine.py | 71 +++++++++++++--- recommendations/WallRecommendations.py | 2 +- 4 files changed, 151 insertions(+), 45 deletions(-) diff --git a/backend/Property.py b/backend/Property.py index b9830b93..5d4922a3 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -23,6 +23,7 @@ from recommendations.recommendation_utils import ( from backend.ml_models.AnnualBillSavings import AnnualBillSavings from backend.app.utils import sap_to_epc import backend.app.assumptions as assumptions +from backend.app.db.models.portfolio import rating_lookup ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev") DATA_BUCKET = os.environ.get( @@ -828,7 +829,7 @@ class Property: return property_data @classmethod - def _prepare_rating_field(cls, field, rating_lookup): + def _prepare_rating_field(cls, field): """ Utility function for usage in the lambda, for preparing the _rating fields """ @@ -838,7 +839,7 @@ class Property: else None ) - def get_property_details_epc(self, portfolio_id: int, rating_lookup): + def get_property_details_epc(self, portfolio_id: int): if self.current_energy_bill is None: raise ValueError("Current energy bill has not been set") @@ -869,37 +870,21 @@ class Property: "full_address": self.data["address"], "total_floor_area": float(self.data["total-floor-area"]), "walls": self.walls["clean_description"], - "walls_rating": self._prepare_rating_field( - self.data["walls-energy-eff"], rating_lookup - ), + "walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"]), "roof": self.roof["clean_description"], - "roof_rating": self._prepare_rating_field( - self.data["roof-energy-eff"], rating_lookup - ), + "roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"]), "floor": self.floor["clean_description"], - "floor_rating": self._prepare_rating_field( - self.data["floor-energy-eff"], rating_lookup - ), + "floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"]), "windows": self.windows["clean_description"], - "windows_rating": self._prepare_rating_field( - self.data["windows-energy-eff"], rating_lookup - ), + "windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"]), "heating": self.main_heating["clean_description"], - "heating_rating": self._prepare_rating_field( - self.data["mainheat-energy-eff"], rating_lookup - ), + "heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"]), "heating_controls": self.main_heating_controls["clean_description"], - "heating_controls_rating": self._prepare_rating_field( - self.data["mainheatc-energy-eff"], rating_lookup - ), + "heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"]), "hot_water": self.hotwater["clean_description"], - "hot_water_rating": self._prepare_rating_field( - self.data["hot-water-energy-eff"], rating_lookup - ), + "hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"]), "lighting": self.lighting["clean_description"], - "lighting_rating": self._prepare_rating_field( - self.data["lighting-energy-eff"], rating_lookup - ), + "lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"]), "mainfuel": self.main_fuel["clean_description"], "ventilation": self.ventilation["ventilation"], "solar_pv": self.solar_pv["solar_pv"], @@ -908,9 +893,7 @@ class Property: "floor_height": self.floor_height, "heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"], "unheated_corridor_length": self.heat_loss_corridor["length"], - "number_of_open_fireplaces": self.number_of_open_fireplaces[ - "number_of_open_fireplaces" - ], + "number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"], "number_of_extensions": self.number_of_extensions["number_of_extensions"], "number_of_storeys": self.number_of_storeys["number_of_storeys"], "mains_gas": self.mains_gas, diff --git a/backend/app/db/functions/property_functions.py b/backend/app/db/functions/property_functions.py index 32cd9a25..3bf4a912 100644 --- a/backend/app/db/functions/property_functions.py +++ b/backend/app/db/functions/property_functions.py @@ -1,10 +1,9 @@ ### # This script contains methods for interacting with the property table in the database ### -from typing import List import datetime import pytz -from sqlalchemy import select, or_ +from sqlalchemy import select, or_, bindparam, update from sqlalchemy.orm import Session from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.dialects.postgresql import insert @@ -272,7 +271,8 @@ def bulk_create_properties( insert(PropertyModel) .values(rows) .on_conflict_do_nothing( - index_elements=["portfolio_id", "uprn"] + index_elements=["portfolio_id", "uprn"], + index_where=PropertyModel.uprn.isnot(None), ) .returning( PropertyModel.id, @@ -285,3 +285,79 @@ def bulk_create_properties( session.flush() return result.fetchall() + + +def bulk_update_properties(session: Session, property_updates: list[dict]): + if not property_updates: + return + + now = datetime.now(pytz.utc) + + stmt = ( + update(PropertyModel) + .where( + PropertyModel.id == bindparam("property_id"), + PropertyModel.portfolio_id == bindparam("portfolio_id"), + ) + .values( + **{k: bindparam(k) for k in property_updates[0]["data"].keys()}, + updated_at=now, + ) + ) + + payload = [] + for row in property_updates: + payload.append({ + "property_id": row["property_id"], + "portfolio_id": row["portfolio_id"], + **row["data"], + }) + + session.execute(stmt, payload) + + +def bulk_upsert_property_details_epc(session: Session, rows: list[dict]): + if not rows: + return + + insert_stmt = insert(PropertyDetailsEpcModel).values(rows) + + update_cols = { + col.name: insert_stmt.excluded[col.name] + for col in PropertyDetailsEpcModel.__table__.columns + if col.name not in ("id",) + } + + stmt = insert_stmt.on_conflict_do_update( + index_elements=["portfolio_id", "property_id"], + set_=update_cols, + ) + + session.execute(stmt) + + +def bulk_upsert_property_spatial(session: Session, rows: list[dict]): + if not rows: + return + + values = [] + for row in rows: + values.append({ + "uprn": row["uprn"], + **row["data"], + }) + + insert_stmt = insert(PropertyDetailsSpatial).values(values) + + update_cols = { + col.name: insert_stmt.excluded[col.name] + for col in PropertyDetailsSpatial.__table__.columns + if col.name not in ("id", "uprn") + } + + stmt = insert_stmt.on_conflict_do_update( + index_elements=["uprn"], + set_=update_cols, + ) + + session.execute(stmt) diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 9b22feee..27020607 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -23,7 +23,6 @@ from backend.app.db.connection import db_engine import backend.app.db.functions as db_funcs from backend.app.db.functions.tasks.Tasks import SubTaskInterface -from backend.app.db.models.portfolio import rating_lookup from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES from backend.app.plan.utils import ( get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error, build_cloudwatch_log_url @@ -702,6 +701,7 @@ async def model_engine(body: PlanTriggerRequest): # If we have properties that need to be created, we cerate them in bulk new_property_ids = set() if to_create: + logger.info("Creating %d new properties", len(to_create)) with db_session() as session: inserted = db_funcs.property_functions.bulk_create_properties( session, body, to_create, energy_assessments_by_uprn @@ -722,7 +722,6 @@ async def model_engine(body: PlanTriggerRequest): total=len(addresses), desc="Processing properties", ): - # ---------- 1) filter fetched data ---------- epc_cache = epc_cache_by_uprn[addr.uprn] epc_api_data, epc_page, rrn, = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"] @@ -838,11 +837,12 @@ async def model_engine(body: PlanTriggerRequest): # 2) A real EPC # 3) A UPRN (meaning that a UPRN could be fetched against that property) # We store this data + uprn_to_check_against = addr.uprn if addr.uprn is not None else epc_searcher.uprn # Until we enforce uprn if db_funcs.epc_functions.EpcStoreService.check_insert_needed( - epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn, + epc_cache, epc_searcher.newest_epc.get("estimated"), uprn_to_check_against, ): epc_upserts.append({ - "uprn": epc_searcher.uprn, + "uprn": uprn_to_check_against, "epc_api": epc_searcher.data, "epc_page": epc_page_source.get("page_source"), "epc_page_rrn": epc_page_source.get("rrn"), @@ -853,16 +853,15 @@ async def model_engine(body: PlanTriggerRequest): check_duplicate_property_ids(input_properties) + logger.info("Inserting property data") # We now bulk upload all of the EPC data with db_session() as session: db_funcs.epc_functions.EpcStoreService.bulk_upsert_epc_data(session, epc_upserts) # We check if we have inspections data and store it in the database if so. We'll update or create # aginst each property if - if inspections_map: - logger.info("Inserting inspections data") - with db_session() as session: - db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map) + with db_session() as session: + db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map) # Set up model api and warm up the lambdas model_api = ModelApi( @@ -899,7 +898,7 @@ async def model_engine(body: PlanTriggerRequest): # Insert the spatial data logger.info("Getting spatial data") input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET) - + [p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties] logger.info("Performing solar analysis") @@ -1344,6 +1343,56 @@ async def model_engine(body: PlanTriggerRequest): } ) + # TODO: New + property_updates, property_epc_details, property_spatial_updates = [], [], [] + # plans_to_create = [{property_id, plan_data}] + # recommendations_to_create = [{plan_ref, recommendation_data}] + # funding_to_create = [{plan_ref, funding_data}] + plans_to_create, recommendations_to_create, funding_to_create = [], [], [] + + # Prepare the data that will need to be uploaded in bulk + for p in input_properties: + recommendations_for_property = recommendations.get(p.id, []) + default_recommendations = [r for r in recommendations_for_property if r["default"]] + total_sap_points = sum([r["sap_points"] for r in default_recommendations]) + new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points + new_epc = sap_to_epc(new_sap_points) + total_cost = sum([r["total"] for r in default_recommendations]) + valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc, total_cost=total_cost) + + # --- property-level updates (always) --- + property_updates.append({ + "property_id": p.id, + "portfolio_id": body.portfolio_id, + "data": p.get_full_property_data(current_valuation=valuations["current_value"]) + }) + + property_epc_details.append(p.get_property_details_epc(portfolio_id=body.portfolio_id)) + + property_spatial_updates.append({"uprn": p.uprn, "data": p.spatial}) + + # --- skip plan creation if no recommendations --- + if not recommendations_for_property: + continue + + plan_data = db_funcs.recommendations_functions.prepare_plan_data( + p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations + ) + plans_to_create.append({"property_id": p.id, "plan_data": plan_data}) + + # store recommendations keyed by property + for r in recommendations_for_property: + recommendations_to_create.append({"property_id": p.id, "data": r}) + + # Bulk upload property data + logger.info("Uploading property data in bulk") + with db_session() as session: + db_funcs.property_functions.bulk_update_properties(session, property_updates) + db_funcs.property_functions.bulk_upsert_property_details_epc(session, property_epc_details) + db_funcs.property_functions.bulk_upsert_property_spatial(session, property_spatial_updates) + + # TODO: End New + for i in tqdm( range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE)) ): @@ -1369,9 +1418,7 @@ async def model_engine(body: PlanTriggerRequest): default_recommendations ) - property_details_epc = p.get_property_details_epc( - portfolio_id=body.portfolio_id, rating_lookup=rating_lookup, - ) + property_details_epc = p.get_property_details_epc(portfolio_id=body.portfolio_id) property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) db_funcs.property_functions.create_property_details_epc(session, property_details_epc) diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index c54c00d9..328f1ab8 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -168,7 +168,7 @@ class WallRecommendations(Definitions): ): return - if u_value: + if u_value is not None: if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT: raise NotImplementedError(