created basic sqlalchemy functions

This commit is contained in:
Khalim Conn-Kowlessar 2024-09-04 16:40:51 +01:00
parent d8e337e55d
commit 9bac1e7132
2 changed files with 114 additions and 9 deletions

View file

@ -1,7 +1,9 @@
from backend.app.db.models.energy_assessments import EnergyAssessment
from backend.app.db.models.energy_assessments import (
EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments
)
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from typing import Optional
from typing import Optional, List
from sqlalchemy import desc
@ -60,3 +62,75 @@ def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[Energ
except Exception as e:
print(f"An error occurred: {e}")
return None
def create_energy_assessment_scenario(session: Session, data_list: List[dict], energy_assessment_id: int):
"""
This function creates the necessary energy assessment scenarios if they don't already exist.
:param session: The SQLAlchemy session.
:param data_list: A list of dictionaries containing document data with scenario information.
:param energy_assessment_id: The ID of the energy assessment.
"""
try:
# Extract unique scenario names from the data
scenario_names = {item['scenario_id'] for item in data_list if item['scenario_id'] is not None}
for scenario_name in scenario_names:
# Check if the scenario already exists in the database
existing_scenario = session.query(EnergyAssessmentScenarios).filter_by(scenario_name=scenario_name).first()
if not existing_scenario:
# Create a new scenario
new_scenario = EnergyAssessmentScenarios(
scenario_name=scenario_name, energy_assessment_id=energy_assessment_id
)
session.add(new_scenario)
# Commit all scenario creations
session.commit()
print("Scenarios created successfully.")
except IntegrityError as e:
session.rollback()
print(f"Error occurred: {e}")
def create_scenario_documents(session: Session, data_list: List[dict]):
"""
This function creates documents in the energy_assessment_documents table, linking them to scenarios if applicable.
For usage in the energy assessment upload router
:param session: The SQLAlchemy session.
:param data_list: A list of dictionaries containing document data.
"""
try:
for data in data_list:
scenario_name = data.get('scenario_id')
if scenario_name:
# Get the scenario ID from the scenario name
scenario = session.query(EnergyAssessmentScenarios).filter_by(scenario_name=scenario_name).first()
if scenario:
data['scenario_id'] = scenario.id
else:
print(f"Scenario '{scenario_name}' not found. Skipping document.")
# Create the new document
new_document = EnergyAssessmentDocuments(
uprn=data['uprn'],
document_type=data['document_type'],
document_location=data['document_location'],
scenario_id=data['scenario_id'] # Might be None
)
session.add(new_document)
# Commit all document insertions
session.commit()
print("Documents created successfully.")
except IntegrityError as e:
session.rollback()
print(f"Error occurred: {e}")

View file

@ -106,8 +106,7 @@ async def upload(body: EnergyAssessmentUploadPayload):
scenario_folders = [
folder for folder in scenario_folders if "scenario" in folder.rstrip("/").split("/")[-1].lower()
]
scenario_site_notes = []
scenario_draft_epcs = []
scenario_documents = []
for sf in scenario_folders:
scenario_files = list_files_in_s3_folder(
bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET,
@ -118,8 +117,13 @@ async def upload(body: EnergyAssessmentUploadPayload):
]
# This should be the leftovers
draft_epc = [file for file in scenario_files if file not in notes]
scenario_site_notes.extend(notes)
scenario_draft_epcs.extend(draft_epc)
scenario_documents.append(
{
"identifier": sf.rstrip("/").split("/")[-1],
"Scenario Site Notes": notes,
"Scenario Draft EPC": draft_epc
}
)
uprn = int(assessment.rstrip("/").split("/")[-1])
assessments_map[uprn] = {
@ -129,8 +133,7 @@ async def upload(body: EnergyAssessmentUploadPayload):
"Evidence Report": evidence_reports,
"Summary Information": summary_reports,
"Floor Plan": floor_plans,
"Scenario Site Notes": scenario_site_notes,
"Scenario Draft EPC": scenario_draft_epcs
"scenario_documents": scenario_documents
}
logger.info("Extracted energy assessment data and storing file locations to database")
@ -143,12 +146,40 @@ async def upload(body: EnergyAssessmentUploadPayload):
if doc_type == "xmls":
continue
if doc_type == "scenario_documents":
for doc in doc_files:
# This scenario id is put in as a placeholder means os associating the scenario documents with
# the correct scenario
scenario_id = doc["identifier"]
for sn in doc["Scenario Site Notes"]:
property_ea_docs.append(
{
"uprn": uprn,
"document_type": "Scenario Site Notes",
"document_location": sn,
"scenario_id": scenario_id
}
)
for d_epc in doc["Scenario Draft EPC"]:
property_ea_docs.append(
{
"uprn": uprn,
"document_type": "Scenario Draft EPC",
"document_location": d_epc,
"scenario_id": scenario_id
}
)
continue
for doc in doc_files:
property_ea_docs.append(
{
"uprn": uprn,
"document_type": doc_type,
"document_location": doc
"document_location": doc,
"scenario_id": None
}
)
energy_assessment_documents.extend(property_ea_docs)