Model/etl/xml_survey_extraction/app.py
2024-09-04 10:26:39 +01:00

268 lines
12 KiB
Python

from backend.app.db.functions.energy_assessment_functions import bulk_insert_energy_assessments
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from utils.s3 import read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder, save_csv_to_s3
from utils.logger import setup_logger
from etl.xml_survey_extraction.XmlParser import XmlParser
import os
import pandas as pd
from io import BytesIO
logger = setup_logger()
BUCKET = "retrofit-energy-assessments-dev"
USER_ID = 8
non_invasive_recommendations_filepath = "{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
SCENARIOS = {
101: {
"project_code": "VEC001",
"surveyor": "JAFFERSONS ENERGY CONSULTANTS",
"bodies": [
# Scenario A: Cavity wall insulation
{
"portfolio_id": str(101),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"inclusions": [
"draught_proofing", "secondary_glazing", "trickle_vents", "low_energy_lighting",
],
"budget": None,
"scenario_name": "Quick wins - do now while tenanted",
"multi_plan": True,
},
# Scenario B: CWI, Solar PV, AHSP
{
"portfolio_id": str(101),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"inclusions": [
"draught_proofing",
"secondary_glazing",
"trickle_vents",
"low_energy_lighting",
"suspended_floor_insulation",
"internal_wall_insulation"
],
"budget": None,
"scenario_name": "Do when void",
"multi_plan": True,
},
]
},
}
# TODO: These non-intrusive recommendations should be detected from the EPRs, the scenarios and the condition report?
# For recommendations like trickle vents, we can deduce this from the condition report, depending on the
# ventilation of the room and the presence of trickle vents.
NON_INTRUSITVE_RECOMMENDATIONS = [
{
# 2 Grove Mansions
"uprn": 121016121,
"recommendations": [
{
"type": "draught_proofing",
"cost": None,
"survey": True
},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "suspended_floor_insulation", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 8 Grove Mansions
"uprn": 10024087855,
"recommendations": [
{"type": "draught_proofing", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 9 Grove Mansions
"uprn": 121016128,
"recommendations": [
{"type": "draught_proofing", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "suspended_floor_insulation", "cost": None},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 5 Grove Mansions
"uprn": 121016124,
"recommendations": [
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 14 Grove Mansions
"uprn": 121016117,
"recommendations": [
{"type": "draught_proofing", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 19 Grove Mansions
"uprn": 121016117,
"recommendations": [
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
{"type": "room_roof_insulation", "cost": None, "survey": True},
]
},
]
def main():
"""
This function executes the main process, which will retrieve data from the specified locations, extract the data
fields and store them to our database
:return:
"""
# TODO: Build solution to get this data from Onedrive and store what we need in S3
# In s3, we have a bucket called retrofit-energy-assessments-{stage} which contains the data we need
# The data is stored in a folder called {surveyors}/{project_code}/{uprn}
# We'll need to get the uprn from the folder name, which we can do with EpcSearcher class
# TODO: Pull out county, as in create_epc_records in the router, we pull it from the latest EPC, but we should
# be able to deduce it from just the address. Same for constituency and constituency_label
# TODO: Store the project code in the database
#
for scenario_config in SCENARIOS.values():
energy_assessments = list_files_and_subfolders_in_s3_folder(
bucket_name=BUCKET, folder_name=f"{scenario_config['surveyor']}/{scenario_config['project_code']}/"
)
logger.info(
f"Found {len(energy_assessments)} energy assessments for {scenario_config['surveyor']} and "
f"{scenario_config['project_code']}"
)
assessments_map = {}
for assessment in energy_assessments:
uploaded_xmls = list_xmls_in_s3_folder(
bucket_name=BUCKET, folder_name=os.path.join(assessment, "docs & plans")
)
uprn = int(assessment.rstrip("/").split("/")[-1])
assessments_map[uprn] = uploaded_xmls
logger.info(f"Exatracted XMLS for the energy assessments")
# TODO: IF we have many uploads, we can do them in a batch so we don't try and upload huge amounts of data to
# the database at onece
# TODO: We now have detailed information about primary and secondary walls, so we should use this information
# in our recommendations when we have it
# For example, for 77 Peryn Road, W3 7LT, the energy assessment has a main dwelling and two extensions,
# where
# the physical dimensions and the fabric of each building is constructed in a way as if each building is
# separate. We should use this information to make recommendations that are specific to each building
# part, though the problem here is that while the fabric and dimensions are separate, the actual SAP,
# CO2, etc
# figures span across the entire property.
# Idea: We can collect all of this information by building part and store it separately in the database
# against the uprn. We can have key data for the EPC, but then also additional data for each
# building
# part. We can then use this data to make recommendations that are specific to each building part
# We should probably re-think this data model, so we break up the data in a more considered fasion and
# produce
# the underlying EPC data as a summary of the building parts. Not only do we have data against the main
# dwelling and extensions, but we also have multiple windows with individiaul pieces of information that
# we can use to make recommendations. We should store this data in a way that we can easily access it and
# use it to make recommendations (e.g. we should have a Windows table)
# For each property, we download the xmls and extract the data
database_data = []
for uprn, xmls in assessments_map.items():
extracted_data = {}
for xml in xmls:
xml_data = read_from_s3(bucket_name=BUCKET, s3_file_name=xml)
xml_data_io = BytesIO(xml_data)
xml_parser = XmlParser(
file=xml_data_io,
filekey=os.path.join(f"s3://{BUCKET}", xml),
uprn=uprn,
surveyor_company=scenario_config["surveyor"],
)
xml_parser.run()
if xml_parser.is_lig:
logger.info(f"Extracted data from {xml}")
extracted_epc = xml_parser.epc
extracted_additional_data = xml_parser.additional_data
data_to_update = {
**extracted_epc, **extracted_additional_data
}
# We need to update the keys to match the database schema - i.e. we should replace all hyphens with
# underscores
data_to_update = {k.replace("-", "_"): v for k, v in data_to_update.items()}
extracted_data.update(data_to_update)
database_data.append(extracted_data)
logger.info("Uploading data to the database")
session = sessionmaker(bind=db_engine)()
bulk_insert_energy_assessments(session, database_data)
session.close()
# Create the asset list
asset_list = [
{"uprn": x["uprn"], "address": x["address1"], "postcode": x["postcode"]} for x in database_data
]
asset_list = pd.DataFrame(asset_list)
# Store the asset list in s3
filename = f"{USER_ID}/{scenario_config['bodies'][0]['portfolio_id']}/non_intrusives.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
for body in scenario_config["bodies"]:
body["trigger_file_path"] = filename
print(body)
# TODO: In order to get the full data associated to the heating system, we need to download and parse the pcdb which
# can be found here: https://www.ncm-pcdb.org.uk/pcdb/pcdb10.dat
# https://www.ncm-pcdb.org.uk/sap/download
# However retrieving this data is not a priority, so we can leave this for now as parsing the database
# is a non-trivial task
# TODO: The condition report contains additional data such as the number of bedrooms and the number of bathrooms
# We can extract this data and store it in the database as well. We can then update our kwargs methodology
# that is passed to the property class, where instead we store this additional data in our database (it could
# be stored in the energy assessment table, or in a separate table) and then when we're passed additional data
# we can query the database for this data and use it to update the property object, instead of storing it
# in the asset list and pulling it out of the asset list
# 1) Bathrooms
# 2) Bedrooms