Model/etl/lodgement/app.py
Khalim Conn-Kowlessar c6e02836a8 poc done for now
2024-11-29 12:10:29 +00:00

326 lines
14 KiB
Python

import os
import pandas as pd
import utils.file_data_extraction as file_extraction_tools
from utils.fullSapParser import FullSapParser
from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser
output_template = {
"Property Address": None,
"Osm. ID": None,
"Postcode": None,
"City/County": None,
"District/Town": None,
"Funding Stream": None,
# "Risk Path": None,
"Local Authority": None,
"Trustmark Lodgement ID": None,
"Certificate Number": None,
"EWI UMR": None,
"Loft UMR": None,
"Windows UMR": None,
"Doors UMR": None,
"Measure Lodgement Date": None,
"Full Lodgement Date": None,
"Owner - Name": None,
"Owner - Phone": None,
"Owner - Email": None,
"Tenant - Name": None,
"Tenant - Phone": None,
"R. Assessor - Name": None,
"R. Coordinator - Name": None,
"Trustmark Licence Number": None,
"Retrofit Assessment Date": None,
"Company Name": None,
"Retrofit Designer Name": None,
"Property Type": None,
"Property Detachment": None,
"No. of Bedrooms": None,
"Property age": None,
"SAP Rating Pre (from IMA)": None,
"Pre Heat Transfer": None,
"Pre Total Floor Area": None,
"Pre Heat Demand": None,
"Pre Air Tightness": None,
"SAP Rating Post (from EPC)": None,
"Post Heat Transfer": None,
"Post Total Floor Area": None,
"Post Heat Demand": None,
"Post Air Tightness": None,
"Number of Eligible Measures Installed": None,
"Total Cost of Works": None,
"Annual Fuel Saving (MTP)": None,
}
def update_dictionary_with_check(dictionary, updates):
"""
Updates a dictionary with key-value pairs, raising an error if the key does not exist.
Args:
dictionary (dict): The dictionary to update.
updates (dict): The updates to apply.
Raises:
KeyError: If a key in updates does not exist in the dictionary.
"""
for key, value in updates.items():
if key not in dictionary:
raise KeyError(f"Key '{key}' does not exist in the dictionary.")
dictionary[key] = value
def handler():
"""
This is a simple application that will extract the data from documents that have been uploaded to Sharepoint
to populate the lodgement spreadsheet with
:return:
"""
# Ths source data will eventually come from Sharepoint
source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot"
output_template_file = "Trustmark Details - Template REV.25.11.24.xlsx"
funding_stream = "HUG2"
customer_name = "Shropshire Council"
customer_phone = "0345 678 9000"
customer_email = "affordablewarmth@shropshire.gov.uk"
# TODO: In order for this to go live, we need to use Poppler, which needs to be installed
# w/ brew install poppler
# We also need to install Tesseract: brew install tesseract
# List the folders in the source data path
folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))]
extractors = {
"elmhurst epr": file_extraction_tools.ElmhurstEprExtractor,
"elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor,
"osmosis condition report": OsmosisConditionReportParser,
"elmhurst evidence report": None,
"full sap xml": FullSapParser,
"pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor,
"elmhurst project handover": file_extraction_tools.ElmhurstProjectHandoverExtractor,
"core logic pas assessment report": file_extraction_tools.CoreLogicPasAssessmentReportExtractor,
}
extracted = []
for property_folder in folders:
property_folder_path = os.path.join(source_data_path, property_folder)
# List the folders in the source data path
subfolders = [
x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x))
]
coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0])
# Get the contents of the folder
coordinator_folder_contents = [
file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file))
]
# We detect the various file types
extracted_contents = {}
for filename in coordinator_folder_contents:
filepath = os.path.join(coord_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors[report_type]
if file_extractor is None:
continue
extracted_contents[report_type] = file_extractor(filepath).extract()
if file_extraction_tools.is_xml(filepath):
xml_type = file_extraction_tools.detect_xml_report_type(xml_path=filepath)
if xml_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors.get(xml_type)
if file_extractor is None:
continue
extracted_contents[xml_type] = file_extractor(filepath).extract()
att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0])
att_folder_contents = [
file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file))
]
for filename in att_folder_contents:
filepath = os.path.join(att_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors[report_type]
if file_extractor is None:
continue
extracted_contents[report_type] = file_extractor(filepath).extract()
lodgement_folder = os.path.join(
property_folder_path, [f for f in subfolders if "TrustMark Lodgement" in f][0]
)
# Within the lodgement folder, we want the required documents sub-folder
lodgement_subfolders = [
file for file in os.listdir(lodgement_folder) if os.path.isdir(os.path.join(lodgement_folder, file))
]
required_documents_folder = os.path.join(
lodgement_folder, [f for f in lodgement_subfolders if "required documents" in f.lower()][0]
)
# List the contents
required_documents_contents = [
file for file in os.listdir(required_documents_folder) if
os.path.isfile(os.path.join(required_documents_folder, file))
]
# There are only a few file types we actually want to process in here for the moment
for filename in required_documents_contents:
filepath = os.path.join(required_documents_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type != "elmhurst project handover":
continue
file_extractor = extractors[report_type]
extracted_contents[report_type] = file_extractor(filepath).extract()
output_row_data = output_template.copy()
# dict_keys([ 'City/County', 'District/Town',
# 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR',
# 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone',
# 'Owner - Email', 'Tenant - Name', 'Tenant - Phone',
# 'Trustmark Licence Number',
# Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat
# Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness',
# 'Total Cost of Works', 'Annual Fuel Saving (MTP)'])
update_dictionary_with_check(
output_row_data,
{
"Funding Stream": funding_stream,
"Property Address": property_folder.split(")")[1].strip(),
"Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(),
}
)
if extracted_contents.get("elmhurst epr"):
total_floor_area = sum(
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] +
# Get the conservatory floor area
[extracted_contents["elmhurst epr"]["Conservatory"]["Conservatory Floor Area"]]
)
pre_heat_transfer = extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"]
pre_heat_demand = (
extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area
)
epr_to_insert = {
"Postcode": extracted_contents["elmhurst epr"]["Postcode"],
"City/County": extracted_contents["elmhurst epr"]["County"],
"District/Town": extracted_contents["elmhurst epr"]["Town"],
"Local Authority": None,
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"],
'Pre Heat Transfer': pre_heat_transfer,
'Pre Total Floor Area': total_floor_area,
'Pre Heat Demand': pre_heat_demand,
"R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"],
"Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"],
}
update_dictionary_with_check(
output_row_data,
epr_to_insert
)
if extracted_contents.get("full sap xml"):
xml_to_insert = {
"Property Type": extracted_contents["full sap xml"]["Property Type"],
"Property Detachment": extracted_contents["full sap xml"]["Built Form"],
"Property age": extracted_contents["full sap xml"]["Age Band"],
}
update_dictionary_with_check(
output_row_data,
xml_to_insert
)
if extracted_contents.get("osmosis condition report"):
cr_to_insert = {
"No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"],
# "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"],
}
update_dictionary_with_check(
output_row_data,
cr_to_insert
)
if extracted_contents.get("elmhurst summary report"):
total_floor_area = sum(
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst summary report"]["Building Parts"]] +
# Get the conservatory floor area
[extracted_contents["elmhurst summary report"]["Conservatory"]["Conservatory Floor Area"]]
)
pre_heat_transfer = (
extracted_contents["elmhurst summary report"]["Primary Energy Use Intensity (kWh/m2/yr)"]
)
pre_heat_demand = None # Don't have this
summary_to_insert = {
"Postcode": extracted_contents["elmhurst summary report"]["Postcode"],
"City/County": extracted_contents["elmhurst summary report"]["County"],
"District/Town": extracted_contents["elmhurst summary report"]["Town"],
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst summary report"]["Current SAP Rating"],
'Pre Heat Transfer': pre_heat_transfer,
'Pre Total Floor Area': total_floor_area,
'Pre Heat Demand': pre_heat_demand,
"R. Assessor - Name": extracted_contents["elmhurst summary report"]["Assessor Name"],
"Retrofit Assessment Date": extracted_contents["elmhurst summary report"]["Assessment Date"],
}
update_dictionary_with_check(
output_row_data,
summary_to_insert
)
if extracted_contents.get("pulse air permeability"):
# We extract the AP50 number
results_table = extracted_contents["pulse air permeability"]["Results Table"]
ap50 = [x["Extrapolated @ 50PA"] for x in results_table if x["Metric"] == "Air Permeability"][0]
update_dictionary_with_check(
output_row_data,
{"Pre Air Tightness": ap50}
)
if extracted_contents.get("elmhurst project handover"):
handover_to_insert = {
"Number of Eligible Measures Installed": len(
extracted_contents["elmhurst project handover"]["Measures Fitted"]
),
"Retrofit Designer Name": extracted_contents["elmhurst project handover"]["Designer Name"],
"Company Name": extracted_contents["elmhurst project handover"]["Installer Name"],
"R. Coordinator - Name": extracted_contents["elmhurst project handover"]["Retrofit Coordinator Name"],
}
update_dictionary_with_check(output_row_data, handover_to_insert)
if extracted_contents.get("core logic pas assessment report"):
cr_to_insert = {
"No. of Bedrooms": extracted_contents["core logic pas assessment report"]["Number of bedrooms"],
}
update_dictionary_with_check(
output_row_data,
cr_to_insert
)
extracted.append(output_row_data)
extracted_df = pd.DataFrame(extracted)
extracted_df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot/poc-extrcted-data.csv",
index=False)