mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
184 lines
6.2 KiB
Python
184 lines
6.2 KiB
Python
from typing import Iterable
|
|
from backend.app.db.models.energy_assessments import (
|
|
EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum
|
|
)
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.exc import IntegrityError
|
|
from typing import Optional, List, Dict
|
|
from sqlalchemy import desc
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> Dict[int, int]:
|
|
"""
|
|
This function inserts or updates multiple energy assessment records into the database and returns a mapping of
|
|
uprn to energy_assessment_id.
|
|
|
|
:param session: The SQLAlchemy session.
|
|
:param data_list: A list of dictionaries containing energy assessment data.
|
|
:return: A dictionary mapping each uprn to its corresponding energy_assessment_id.
|
|
"""
|
|
uprn_to_assessment_id = {}
|
|
|
|
try:
|
|
for data in data_list:
|
|
uprn = data.get('uprn')
|
|
inspection_date = data.get('inspection_date')
|
|
|
|
# Check if a record with the same uprn and inspection_date exists
|
|
existing_record = session.query(EnergyAssessment).filter_by(
|
|
uprn=uprn,
|
|
inspection_date=inspection_date
|
|
).first()
|
|
|
|
if existing_record:
|
|
# Update the existing record with new data
|
|
for key, value in data.items():
|
|
setattr(existing_record, key, value)
|
|
session.add(existing_record)
|
|
|
|
# Map the uprn to the existing record's ID
|
|
uprn_to_assessment_id[uprn] = existing_record.id
|
|
else:
|
|
# Insert a new record
|
|
new_assessment = EnergyAssessment(**data)
|
|
session.add(new_assessment)
|
|
|
|
# Flush the session to get the newly created ID before commit
|
|
session.flush()
|
|
|
|
# Map the uprn to the new record's ID
|
|
uprn_to_assessment_id[uprn] = new_assessment.id
|
|
|
|
# Commit the transaction
|
|
session.commit()
|
|
logger.info("All records inserted or updated successfully.")
|
|
|
|
except IntegrityError as e:
|
|
# Rollback the session in case of error
|
|
session.rollback()
|
|
logger.info(f"Error occurred: {e}")
|
|
|
|
return uprn_to_assessment_id
|
|
|
|
|
|
def get_latest_assessments_for_uprns(
|
|
session: Session,
|
|
uprns: Iterable[int],
|
|
) -> dict[int, dict]:
|
|
"""
|
|
Fetch the latest energy assessment per UPRN in a single query.
|
|
|
|
Returns a dict:
|
|
uprn -> assessment_dict | empty_response
|
|
"""
|
|
|
|
uprns = [u for u in uprns if u]
|
|
if not uprns:
|
|
return {}
|
|
|
|
# DISTINCT ON requires matching ORDER BY
|
|
records = (
|
|
session.query(EnergyAssessment)
|
|
.filter(EnergyAssessment.uprn.in_(uprns))
|
|
.order_by(
|
|
EnergyAssessment.uprn,
|
|
desc(EnergyAssessment.inspection_date),
|
|
)
|
|
.distinct(EnergyAssessment.uprn)
|
|
.all()
|
|
)
|
|
|
|
result: dict[int, dict] = {}
|
|
|
|
for record in records:
|
|
result[record.uprn] = record.to_dict()
|
|
|
|
# Fill missing uprns with empty response
|
|
uprn_set = set(uprns)
|
|
found_set = set(result.keys())
|
|
|
|
missing_uprns = uprn_set - found_set
|
|
|
|
for uprn in missing_uprns:
|
|
result[uprn] = EnergyAssessment.empty_response()
|
|
|
|
return result
|
|
|
|
|
|
def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict):
|
|
"""
|
|
Creates scenarios for documents by UPRN and links them to the energy assessments.
|
|
|
|
:param session: The SQLAlchemy session.
|
|
:param document_list: A list of dictionaries containing document data.
|
|
:param uprn_to_assessment_id: A dictionary mapping UPRN to energy_assessment_id.
|
|
"""
|
|
try:
|
|
for document in document_list:
|
|
uprn = document.get('uprn')
|
|
scenario_name = document.get('scenario_id')
|
|
|
|
if scenario_name:
|
|
# Get the associated energy_assessment_id for the UPRN
|
|
energy_assessment_id = uprn_to_assessment_id.get(uprn)
|
|
|
|
# Check if the scenario already exists
|
|
existing_scenario = session.query(EnergyAssessmentScenarios).filter_by(
|
|
scenario_name=scenario_name,
|
|
energy_assessment_id=energy_assessment_id
|
|
).first()
|
|
|
|
if not existing_scenario:
|
|
# Create the scenario
|
|
new_scenario = EnergyAssessmentScenarios(
|
|
scenario_name=scenario_name,
|
|
energy_assessment_id=energy_assessment_id
|
|
)
|
|
session.add(new_scenario)
|
|
session.flush() # Get the new scenario ID
|
|
|
|
# Update document with new scenario ID
|
|
document['scenario_id'] = new_scenario.id
|
|
else:
|
|
# If the scenario already exists, just use its ID
|
|
document['scenario_id'] = existing_scenario.id
|
|
|
|
# Commit the scenarios
|
|
session.commit()
|
|
logger.info("Scenarios created successfully.")
|
|
|
|
except IntegrityError as e:
|
|
session.rollback()
|
|
logger.info(f"Error occurred: {e}")
|
|
|
|
|
|
def create_documents(session: Session, document_list: List[dict]):
|
|
"""
|
|
Inserts documents into the energy_assessment_documents table, linking them to scenarios and assessments.
|
|
|
|
:param session: The SQLAlchemy session.
|
|
:param document_list: A list of dictionaries containing document data.
|
|
"""
|
|
try:
|
|
for document in document_list:
|
|
# Ensure the document_type is cast to Enum
|
|
new_document = EnergyAssessmentDocuments(
|
|
uprn=document['uprn'],
|
|
document_type=DocumentTypeEnum(document['document_type']).value,
|
|
document_location=document['document_location'],
|
|
energy_assessment_id=document['energy_assessment_id'],
|
|
scenario_id=document.get('scenario_id') # Might be None if no scenario
|
|
)
|
|
|
|
session.add(new_document)
|
|
|
|
# Commit all document insertions
|
|
session.commit()
|
|
logger.info("Documents created successfully.")
|
|
|
|
except IntegrityError as e:
|
|
session.rollback()
|
|
logger.info(f"Error occurred: {e}")
|