Added serialization method

This commit is contained in:
Khalim Conn-Kowlessar 2024-07-26 14:03:24 +01:00
parent c90c6d860b
commit b42d2c7750
4 changed files with 36 additions and 4 deletions

View file

@ -1,6 +1,8 @@
from backend.app.db.models.energy_assessments import EnergyAssessment
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from typing import Optional
from sqlalchemy import desc
def bulk_insert_energy_assessments(session: Session, data_list):
@ -13,12 +15,12 @@ def bulk_insert_energy_assessments(session: Session, data_list):
try:
for data in data_list:
uprn = data.get('uprn')
lodgement_date = data.get('lodgement_date')
inspection_date = data.get('inspection_date')
# Check if a record with the same uprn and lodgement_date exists
# Check if a record with the same uprn and inspection_date exists
existing_record = session.query(EnergyAssessment).filter_by(
uprn=uprn,
lodgement_date=lodgement_date
inspection_date=inspection_date
).first()
if existing_record:
@ -39,3 +41,21 @@ def bulk_insert_energy_assessments(session: Session, data_list):
# Rollback the session in case of error
session.rollback()
print(f"Error occurred: {e}")
def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]:
"""
Retrieve the latest energy assessment for a given UPRN based on the inspection date.
:param session: The database session
:param uprn: The unique property reference number
:return: The latest EnergyAssessment object or None if not found
"""
try:
# Query the EnergyAssessment model, filter by uprn, order by inspection_date in descending order
latest_assessment = session.query(EnergyAssessment).filter_by(uprn=uprn).order_by(
desc(EnergyAssessment.inspection_date)).first()
return latest_assessment.to_dict() if latest_assessment else {}
except Exception as e:
print(f"An error occurred: {e}")
return None

View file

@ -119,3 +119,9 @@ class EnergyAssessment(Base):
cylinder_insulation_type = Column(Text)
cylinder_insulation_thickness = Column(Integer)
cylinder_thermostat = Column(Boolean)
def to_dict(self):
"""
Convert the SQLAlchemy object to a dictionary.
"""
return {column.name: getattr(self, column.name) for column in self.__table__.columns}

View file

@ -21,6 +21,7 @@ from backend.app.db.functions.property_functions import (
from backend.app.db.functions.recommendations_functions import (
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest, MdsRequest
@ -265,6 +266,7 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = []
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if uprn:
@ -281,6 +283,10 @@ async def trigger_plan(body: PlanTriggerRequest):
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
# We check for an energy assessment we have performed on this property:
energy_assessment = get_latest_assessment_by_uprn(session, uprn)
# Create a record in db
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn

View file

@ -107,7 +107,7 @@ def main():
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": "",
# "exclusions": [],
"budget": None,
}
print(body)