### # This script contains methods for interacting with the property table in the database ### import datetime import pytz from sqlalchemy import select, or_, bindparam, update from sqlalchemy.orm import Session from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.dialects.postgresql import insert from backend.addresses.Address import Address from backend.app.db.models.portfolio import ( PropertyModel, PropertyCreationStatus, PortfolioStatus, PropertyTargetsModel, PropertyDetailsEpcModel, PropertyDetailsSpatial ) def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str, energy_assessment: dict, landlord_property_id: str | None = None) -> (int, bool): """ This function will create a record for the property in the database if it does not exist. If it does exist, it will just update the updated_at field. :param session: The database session :param portfolio_id: The ID of the portfolio the property belongs to :param address: The address of the property :param postcode: The postcode of the property :param uprn: The UPRN of the property :param energy_assessment: The energy assessment data for the property :param landlord_property_id: The landlord property ID if available :return: The ID of the property and a boolean indicating whether it was created or not """ try: # Attempt to fetch the existing property existing_property = session.query(PropertyModel).filter_by( uprn=uprn, portfolio_id=portfolio_id ).one() # Update the 'updated_at' field existing_property.updated_at = datetime.datetime.now(pytz.utc) # Merge the updated property back into the session session.merge(existing_property) session.flush() return existing_property.id, False except NoResultFound: # Property doesn't exist, create a new one status = PortfolioStatus.ASSESSMENT.value if len(energy_assessment["epc"]) == 0 \ else PortfolioStatus.SURVEY.value new_property = PropertyModel( address=address, postcode=postcode, portfolio_id=portfolio_id, uprn=uprn, landlord_property_id=landlord_property_id, creation_status=PropertyCreationStatus.LOADING, status=status, has_pre_condition_report=False, has_recommendations=False ) # Add the new property to the session session.add(new_property) session.flush() return new_property.id, True def ensure_property_exists(session, body, epc_searcher, energy_assessment, landlord_property_id=None): """ Wrapper funtion which checks if a property is new and will return the roperty type if not :param session: :param body: :param epc_searcher: :param energy_assessment: :param landlord_property_id: :return: """ property_id, is_new = create_property( session=session, portfolio_id=body.portfolio_id, address=epc_searcher.address_clean, postcode=epc_searcher.postcode_clean, uprn=epc_searcher.uprn, energy_assessment=energy_assessment, landlord_property_id=str(landlord_property_id) if landlord_property_id is not None else None ) if not is_new and not body.multi_plan: return None, False return property_id, is_new def create_property_targets( session: Session, property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None ): """ This function will create a record for the property targets in the database if it does not exist. :param session: The database session :param property_id: The ID of the property the targets belong to :param portfolio_id: The ID of the portfolio the property belongs to :param epc_target: Goal EPC value for the property :param heat_demand_target: Heat demand target for the property in kwh/m^2/year :return: """ new_target = PropertyTargetsModel( property_id=property_id, portfolio_id=portfolio_id, epc=epc_target, heat_demand=heat_demand_target ) session.add(new_target) session.flush() return True def update_property_data( session: Session, property_id: int, portfolio_id: int, property_data: dict ): now = datetime.datetime.now(pytz.utc) try: # Attempt to fetch the existing property existing_property = session.query(PropertyModel).filter_by( id=property_id, portfolio_id=portfolio_id ).one() # Update the fields with the data in property_data for key, value in property_data.items(): setattr(existing_property, key, value) existing_property.updated_at = now # Merge the updated property back into the session and flush session.merge(existing_property) session.flush() except NoResultFound: raise Exception(f"Property with property_id {property_id} and portfolio_id {portfolio_id} not found") return True def create_property_details_epc( session: Session, property_details_epc: dict ): """ This function will create or update a record for the property details EPC in the database. :param session: The database session :param property_details_epc: A dictionary containing details about the property EPC. :return: True if successful, False otherwise. """ existing_record = session.query(PropertyDetailsEpcModel).filter_by( portfolio_id=property_details_epc["portfolio_id"], property_id=property_details_epc["property_id"] ).first() if existing_record: # If the record exists, update its fields for key, value in property_details_epc.items(): setattr(existing_record, key, value) else: # If the record doesn't exist, create a new one new_property_details_epc = PropertyDetailsEpcModel(**property_details_epc) session.add(new_property_details_epc) session.flush() return True def update_or_create_property_spatial_details(session: Session, uprn: int, property_details_spatial: dict): """ Update an existing property details record or create a new one based on the UPRN. :param session: The SQLAlchemy session for database interaction. :param uprn: The unique property reference number (UPRN) of the property. :param property_details_spatial: A dictionary containing the spatial property details to store or update. :return: True if the operation is successful, otherwise raises an exception. """ try: # Attempt to fetch the existing property details existing_property_details = session.query(PropertyDetailsSpatial).filter_by( uprn=uprn ).one() # Update the fields with the data in property_details for key, value in property_details_spatial.items(): setattr(existing_property_details, key, value) # Merge the updated property details back into the session and flush session.merge(existing_property_details) session.flush() except NoResultFound: # Create a new record if not found new_property_details = PropertyDetailsSpatial(uprn=uprn, **property_details_spatial) session.add(new_property_details) session.flush() return True def get_existing_properties(session, portfolio_id, uprns, landlord_ids): """ Bulk method for checking for existing properties :param session: :param portfolio_id: :param uprns: :param landlord_ids: :return: """ return ( session.exec( select(PropertyModel) .where(PropertyModel.portfolio_id == portfolio_id) .where( or_( PropertyModel.uprn.in_(uprns), PropertyModel.landlord_property_id.in_(landlord_ids), ) ) ) .scalars() .all() ) def bulk_create_properties( session, body, addresses: list[Address], # these are *new* addresses energy_assessment_by_uprn: dict[int, dict], ): rows = [] for addr in addresses: energy_assessment = energy_assessment_by_uprn.get(addr.uprn, {}) status = ( PortfolioStatus.ASSESSMENT.value if not energy_assessment.get("epc") else PortfolioStatus.SURVEY.value ) rows.append( { "address": addr.address1, "postcode": addr.postcode, "portfolio_id": body.portfolio_id, "uprn": addr.uprn, "landlord_property_id": addr.landlord_property_id, "creation_status": PropertyCreationStatus.LOADING, "status": status, "has_pre_condition_report": False, "has_recommendations": False, } ) if not rows: return [] stmt = ( insert(PropertyModel) .values(rows) .on_conflict_do_nothing( index_elements=["portfolio_id", "uprn"], index_where=PropertyModel.uprn.isnot(None), ) .returning( PropertyModel.id, PropertyModel.uprn, PropertyModel.landlord_property_id, ) ) result = session.execute(stmt) session.flush() return result.fetchall() def bulk_update_properties(session: Session, property_updates: list[dict]): if not property_updates: return now = datetime.datetime.now(pytz.utc) stmt = ( update(PropertyModel.__table__) .where( PropertyModel.id == bindparam("b_id"), PropertyModel.portfolio_id == bindparam("b_portfolio_id"), ) .values( **{k: bindparam(k) for k in property_updates[0]["data"].keys()}, updated_at=now, ) ) payload = [ { "b_id": row["property_id"], # renamed bind param "b_portfolio_id": row["portfolio_id"], **row["data"], } for row in property_updates ] session.execute( stmt, payload, execution_options={"synchronize_session": False}, ) def bulk_upsert_property_details_epc(session: Session, rows: list[dict]): if not rows: return insert_stmt = insert(PropertyDetailsEpcModel).values(rows) update_cols = { col.name: insert_stmt.excluded[col.name] for col in PropertyDetailsEpcModel.__table__.columns if col.name not in ("id",) } stmt = insert_stmt.on_conflict_do_update( index_elements=["portfolio_id", "property_id"], set_=update_cols, ) session.execute(stmt) def bulk_upsert_property_spatial(session: Session, rows: list[dict]): if not rows: return values = [] for row in rows: values.append({ "uprn": row["uprn"], **row["data"], }) insert_stmt = insert(PropertyDetailsSpatial).values(values) update_cols = { col.name: insert_stmt.excluded[col.name] for col in PropertyDetailsSpatial.__table__.columns if col.name not in ("id", "uprn") } stmt = insert_stmt.on_conflict_do_update( index_elements=["uprn"], set_=update_cols, ) session.execute(stmt)