Model/backend/app/db/functions/energy_assessment_functions.py
2024-09-04 19:39:31 +01:00

158 lines
6 KiB
Python

from backend.app.db.models.energy_assessments import (
EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum
)
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from typing import Optional, List, Dict
from sqlalchemy import desc
from utils.logger import setup_logger
logger = setup_logger()
def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> Dict[int, int]:
"""
This function inserts or updates multiple energy assessment records into the database and returns a mapping of
uprn to energy_assessment_id.
:param session: The SQLAlchemy session.
:param data_list: A list of dictionaries containing energy assessment data.
:return: A dictionary mapping each uprn to its corresponding energy_assessment_id.
"""
uprn_to_assessment_id = {}
try:
for data in data_list:
uprn = data.get('uprn')
inspection_date = data.get('inspection_date')
# Check if a record with the same uprn and inspection_date exists
existing_record = session.query(EnergyAssessment).filter_by(
uprn=uprn,
inspection_date=inspection_date
).first()
if existing_record:
# Update the existing record with new data
for key, value in data.items():
setattr(existing_record, key, value)
session.add(existing_record)
# Map the uprn to the existing record's ID
uprn_to_assessment_id[uprn] = existing_record.id
else:
# Insert a new record
new_assessment = EnergyAssessment(**data)
session.add(new_assessment)
# Flush the session to get the newly created ID before commit
session.flush()
# Map the uprn to the new record's ID
uprn_to_assessment_id[uprn] = new_assessment.id
# Commit the transaction
session.commit()
logger.info("All records inserted or updated successfully.")
except IntegrityError as e:
# Rollback the session in case of error
session.rollback()
logger.info(f"Error occurred: {e}")
return uprn_to_assessment_id
def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]:
"""
Retrieve the latest energy assessment for a given UPRN based on the inspection date.
:param session: The database session
:param uprn: The unique property reference number
:return: The latest EnergyAssessment object or None if not found
"""
try:
# Query the EnergyAssessment model, filter by uprn, order by inspection_date in descending order
latest_assessment = session.query(EnergyAssessment).filter_by(uprn=uprn).order_by(
desc(EnergyAssessment.inspection_date)).first()
return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response()
except Exception as e:
logger.info(f"An error occurred: {e}")
return None
def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict):
"""
Creates scenarios for documents by UPRN and links them to the energy assessments.
:param session: The SQLAlchemy session.
:param document_list: A list of dictionaries containing document data.
:param uprn_to_assessment_id: A dictionary mapping UPRN to energy_assessment_id.
"""
try:
for document in document_list:
uprn = document.get('uprn')
scenario_name = document.get('scenario_id')
if scenario_name:
# Get the associated energy_assessment_id for the UPRN
energy_assessment_id = uprn_to_assessment_id.get(uprn)
# Check if the scenario already exists
existing_scenario = session.query(EnergyAssessmentScenarios).filter_by(
scenario_name=scenario_name,
energy_assessment_id=energy_assessment_id
).first()
if not existing_scenario:
# Create the scenario
new_scenario = EnergyAssessmentScenarios(
scenario_name=scenario_name,
energy_assessment_id=energy_assessment_id
)
session.add(new_scenario)
session.flush() # Get the new scenario ID
# Update document with new scenario ID
document['scenario_id'] = new_scenario.id
else:
# If the scenario already exists, just use its ID
document['scenario_id'] = existing_scenario.id
# Commit the scenarios
session.commit()
logger.info("Scenarios created successfully.")
except IntegrityError as e:
session.rollback()
logger.info(f"Error occurred: {e}")
def create_documents(session: Session, document_list: List[dict]):
"""
Inserts documents into the energy_assessment_documents table, linking them to scenarios and assessments.
:param session: The SQLAlchemy session.
:param document_list: A list of dictionaries containing document data.
"""
try:
for document in document_list:
# Ensure the document_type is cast to Enum
new_document = EnergyAssessmentDocuments(
uprn=document['uprn'],
document_type=DocumentTypeEnum(document['document_type']).value,
document_location=document['document_location'],
energy_assessment_id=document['energy_assessment_id'],
scenario_id=document.get('scenario_id') # Might be None if no scenario
)
session.add(new_document)
# Commit all document insertions
session.commit()
logger.info("Documents created successfully.")
except IntegrityError as e:
session.rollback()
logger.info(f"Error occurred: {e}")