From b60112d75b2362a0bed394bba215d486c8fe9a9c Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 25 Jul 2024 19:03:57 +0100 Subject: [PATCH] setting up push to db --- .../functions/energy_assessment_functions.py | 27 +++++++++++++ etl/xml_survey_extraction/app.py | 38 +++++++++++++++++-- 2 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 backend/app/db/functions/energy_assessment_functions.py diff --git a/backend/app/db/functions/energy_assessment_functions.py b/backend/app/db/functions/energy_assessment_functions.py new file mode 100644 index 00000000..8befe903 --- /dev/null +++ b/backend/app/db/functions/energy_assessment_functions.py @@ -0,0 +1,27 @@ +from backend.app.db.models.energy_assessments import EnergyAssessment +from sqlalchemy.orm import Session +from sqlalchemy.exc import IntegrityError + + +def bulk_insert_energy_assessments(session: Session, data_list): + """ + This function inserts multiple energy assessment records into the database. + + :param session: The database session + :param data_list: A list of dictionaries containing energy assessment data. + """ + + try: + # Map dictionaries to EnergyAssessment instances + assessments = [EnergyAssessment(**data) for data in data_list] + + # Add all instances to the session + session.add_all(assessments) + # Commit the transaction + session.commit() + print("All records inserted successfully.") + + except IntegrityError as e: + # Rollback the session in case of error + session.rollback() + print(f"Error occurred: {e}") diff --git a/etl/xml_survey_extraction/app.py b/etl/xml_survey_extraction/app.py index 6fe02e2d..eea030e5 100644 --- a/etl/xml_survey_extraction/app.py +++ b/etl/xml_survey_extraction/app.py @@ -1,9 +1,11 @@ +from backend.app.db.functions.energy_assessment_functions import bulk_insert_energy_assessments from sqlalchemy.orm import sessionmaker from backend.app.db.connection import db_engine from utils.s3 import read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder from utils.logger import setup_logger from etl.xml_survey_extraction.XmlParser import XmlParser import os +import pandas as pd from io import BytesIO logger = setup_logger() @@ -11,7 +13,8 @@ logger = setup_logger() SURVEYORS = "JAFFERSONS ENERGY CONSULTANTS" PROJECT_CODE = "VDE001" BUCKET = "retrofit-energy-assessments-dev" -PORTFOLIO_ID = None +PORTFOLIO_ID = 86 +USER_ID = 8 def main(): @@ -59,7 +62,8 @@ def main(): surveyor_company=SURVEYORS, ) xml_parser.run() - logger.info(f"Extracted data from {xml}") + if xml_parser.is_lig: + logger.info(f"Extracted data from {xml}") extracted_epc = xml_parser.epc extracted_additional_data = xml_parser.additional_data @@ -72,8 +76,36 @@ def main(): logger.info("Uploading data to the database") session = sessionmaker(bind=db_engine)() + bulk_insert_energy_assessments(session, database_data) + session.close() - # TODO: Set a portfolio ID, Target and Automatically upload the asset list and create the event for the portfolio + # Create the asset list + asset_list = [ + {"uprn": x["uprn"], "address": x["address1"], "postcode": x["postcode"]} for x in database_data + ] + asset_list = pd.DataFrame(asset_list) + + # Store the asset list in s3 + filename = f"{USER_ID}/{PORTFOLIO_ID}/non_intrusives.csv" + save_csv_to_s3( + dataframe=asset_list, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Private", + "goal": "Increase EPC", + "goal_value": "A", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": "", + "exclusions": "", + "budget": None, + } + print(body) # TODO: In order to get the full data associated to the heating system, we need to download and parse the pcdb which # can be found here: https://www.ncm-pcdb.org.uk/pcdb/pcdb10.dat