mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
326 lines
14 KiB
Python
326 lines
14 KiB
Python
import os
|
|
|
|
import pandas as pd
|
|
|
|
import utils.file_data_extraction as file_extraction_tools
|
|
from utils.fullSapParser import FullSapParser
|
|
from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser
|
|
|
|
output_template = {
|
|
"Property Address": None,
|
|
"Osm. ID": None,
|
|
"Postcode": None,
|
|
"City/County": None,
|
|
"District/Town": None,
|
|
"Funding Stream": None,
|
|
# "Risk Path": None,
|
|
"Local Authority": None,
|
|
"Trustmark Lodgement ID": None,
|
|
"Certificate Number": None,
|
|
"EWI UMR": None,
|
|
"Loft UMR": None,
|
|
"Windows UMR": None,
|
|
"Doors UMR": None,
|
|
"Measure Lodgement Date": None,
|
|
"Full Lodgement Date": None,
|
|
"Owner - Name": None,
|
|
"Owner - Phone": None,
|
|
"Owner - Email": None,
|
|
"Tenant - Name": None,
|
|
"Tenant - Phone": None,
|
|
"R. Assessor - Name": None,
|
|
"R. Coordinator - Name": None,
|
|
"Trustmark Licence Number": None,
|
|
"Retrofit Assessment Date": None,
|
|
"Company Name": None,
|
|
"Retrofit Designer Name": None,
|
|
"Property Type": None,
|
|
"Property Detachment": None,
|
|
"No. of Bedrooms": None,
|
|
"Property age": None,
|
|
"SAP Rating Pre (from IMA)": None,
|
|
"Pre Heat Transfer": None,
|
|
"Pre Total Floor Area": None,
|
|
"Pre Heat Demand": None,
|
|
"Pre Air Tightness": None,
|
|
"SAP Rating Post (from EPC)": None,
|
|
"Post Heat Transfer": None,
|
|
"Post Total Floor Area": None,
|
|
"Post Heat Demand": None,
|
|
"Post Air Tightness": None,
|
|
"Number of Eligible Measures Installed": None,
|
|
"Total Cost of Works": None,
|
|
"Annual Fuel Saving (MTP)": None,
|
|
}
|
|
|
|
|
|
def update_dictionary_with_check(dictionary, updates):
|
|
"""
|
|
Updates a dictionary with key-value pairs, raising an error if the key does not exist.
|
|
|
|
Args:
|
|
dictionary (dict): The dictionary to update.
|
|
updates (dict): The updates to apply.
|
|
|
|
Raises:
|
|
KeyError: If a key in updates does not exist in the dictionary.
|
|
"""
|
|
for key, value in updates.items():
|
|
if key not in dictionary:
|
|
raise KeyError(f"Key '{key}' does not exist in the dictionary.")
|
|
dictionary[key] = value
|
|
|
|
|
|
def handler():
|
|
"""
|
|
This is a simple application that will extract the data from documents that have been uploaded to Sharepoint
|
|
to populate the lodgement spreadsheet with
|
|
:return:
|
|
"""
|
|
|
|
# Ths source data will eventually come from Sharepoint
|
|
source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot"
|
|
output_template_file = "Trustmark Details - Template REV.25.11.24.xlsx"
|
|
funding_stream = "HUG2"
|
|
customer_name = "Shropshire Council"
|
|
customer_phone = "0345 678 9000"
|
|
customer_email = "affordablewarmth@shropshire.gov.uk"
|
|
|
|
# TODO: In order for this to go live, we need to use Poppler, which needs to be installed
|
|
# w/ brew install poppler
|
|
# We also need to install Tesseract: brew install tesseract
|
|
|
|
# List the folders in the source data path
|
|
folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))]
|
|
|
|
extractors = {
|
|
"elmhurst epr": file_extraction_tools.ElmhurstEprExtractor,
|
|
"elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor,
|
|
"osmosis condition report": OsmosisConditionReportParser,
|
|
"elmhurst evidence report": None,
|
|
"full sap xml": FullSapParser,
|
|
"pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor,
|
|
"elmhurst project handover": file_extraction_tools.ElmhurstProjectHandoverExtractor,
|
|
"core logic pas assessment report": file_extraction_tools.CoreLogicPasAssessmentReportExtractor,
|
|
}
|
|
|
|
extracted = []
|
|
for property_folder in folders:
|
|
|
|
property_folder_path = os.path.join(source_data_path, property_folder)
|
|
# List the folders in the source data path
|
|
subfolders = [
|
|
x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x))
|
|
]
|
|
coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0])
|
|
|
|
# Get the contents of the folder
|
|
coordinator_folder_contents = [
|
|
file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file))
|
|
]
|
|
|
|
# We detect the various file types
|
|
extracted_contents = {}
|
|
for filename in coordinator_folder_contents:
|
|
filepath = os.path.join(coord_folder, filename)
|
|
if file_extraction_tools.is_pdf(filepath):
|
|
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
|
|
if report_type is None:
|
|
raise ValueError(f"Unknown report type for {filename}")
|
|
|
|
file_extractor = extractors[report_type]
|
|
if file_extractor is None:
|
|
continue
|
|
|
|
extracted_contents[report_type] = file_extractor(filepath).extract()
|
|
|
|
if file_extraction_tools.is_xml(filepath):
|
|
xml_type = file_extraction_tools.detect_xml_report_type(xml_path=filepath)
|
|
if xml_type is None:
|
|
raise ValueError(f"Unknown report type for {filename}")
|
|
file_extractor = extractors.get(xml_type)
|
|
if file_extractor is None:
|
|
continue
|
|
|
|
extracted_contents[xml_type] = file_extractor(filepath).extract()
|
|
|
|
att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0])
|
|
att_folder_contents = [
|
|
file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file))
|
|
]
|
|
|
|
for filename in att_folder_contents:
|
|
filepath = os.path.join(att_folder, filename)
|
|
if file_extraction_tools.is_pdf(filepath):
|
|
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
|
|
if report_type is None:
|
|
raise ValueError(f"Unknown report type for {filename}")
|
|
file_extractor = extractors[report_type]
|
|
|
|
if file_extractor is None:
|
|
continue
|
|
|
|
extracted_contents[report_type] = file_extractor(filepath).extract()
|
|
|
|
lodgement_folder = os.path.join(
|
|
property_folder_path, [f for f in subfolders if "TrustMark Lodgement" in f][0]
|
|
)
|
|
# Within the lodgement folder, we want the required documents sub-folder
|
|
lodgement_subfolders = [
|
|
file for file in os.listdir(lodgement_folder) if os.path.isdir(os.path.join(lodgement_folder, file))
|
|
]
|
|
required_documents_folder = os.path.join(
|
|
lodgement_folder, [f for f in lodgement_subfolders if "required documents" in f.lower()][0]
|
|
)
|
|
# List the contents
|
|
required_documents_contents = [
|
|
file for file in os.listdir(required_documents_folder) if
|
|
os.path.isfile(os.path.join(required_documents_folder, file))
|
|
]
|
|
|
|
# There are only a few file types we actually want to process in here for the moment
|
|
for filename in required_documents_contents:
|
|
filepath = os.path.join(required_documents_folder, filename)
|
|
if file_extraction_tools.is_pdf(filepath):
|
|
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
|
|
if report_type != "elmhurst project handover":
|
|
continue
|
|
file_extractor = extractors[report_type]
|
|
|
|
extracted_contents[report_type] = file_extractor(filepath).extract()
|
|
|
|
output_row_data = output_template.copy()
|
|
|
|
# dict_keys([ 'City/County', 'District/Town',
|
|
# 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR',
|
|
# 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone',
|
|
# 'Owner - Email', 'Tenant - Name', 'Tenant - Phone',
|
|
# 'Trustmark Licence Number',
|
|
# Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat
|
|
# Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness',
|
|
# 'Total Cost of Works', 'Annual Fuel Saving (MTP)'])
|
|
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
{
|
|
"Funding Stream": funding_stream,
|
|
"Property Address": property_folder.split(")")[1].strip(),
|
|
"Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(),
|
|
}
|
|
)
|
|
|
|
if extracted_contents.get("elmhurst epr"):
|
|
total_floor_area = sum(
|
|
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] +
|
|
# Get the conservatory floor area
|
|
[extracted_contents["elmhurst epr"]["Conservatory"]["Conservatory Floor Area"]]
|
|
)
|
|
|
|
pre_heat_transfer = extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"]
|
|
pre_heat_demand = (
|
|
extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area
|
|
)
|
|
|
|
epr_to_insert = {
|
|
"Postcode": extracted_contents["elmhurst epr"]["Postcode"],
|
|
"City/County": extracted_contents["elmhurst epr"]["County"],
|
|
"District/Town": extracted_contents["elmhurst epr"]["Town"],
|
|
"Local Authority": None,
|
|
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"],
|
|
'Pre Heat Transfer': pre_heat_transfer,
|
|
'Pre Total Floor Area': total_floor_area,
|
|
'Pre Heat Demand': pre_heat_demand,
|
|
"R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"],
|
|
"Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"],
|
|
}
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
epr_to_insert
|
|
)
|
|
|
|
if extracted_contents.get("full sap xml"):
|
|
xml_to_insert = {
|
|
"Property Type": extracted_contents["full sap xml"]["Property Type"],
|
|
"Property Detachment": extracted_contents["full sap xml"]["Built Form"],
|
|
"Property age": extracted_contents["full sap xml"]["Age Band"],
|
|
|
|
}
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
xml_to_insert
|
|
)
|
|
|
|
if extracted_contents.get("osmosis condition report"):
|
|
cr_to_insert = {
|
|
"No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"],
|
|
# "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"],
|
|
}
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
cr_to_insert
|
|
)
|
|
|
|
if extracted_contents.get("elmhurst summary report"):
|
|
total_floor_area = sum(
|
|
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst summary report"]["Building Parts"]] +
|
|
# Get the conservatory floor area
|
|
[extracted_contents["elmhurst summary report"]["Conservatory"]["Conservatory Floor Area"]]
|
|
)
|
|
|
|
pre_heat_transfer = (
|
|
extracted_contents["elmhurst summary report"]["Primary Energy Use Intensity (kWh/m2/yr)"]
|
|
)
|
|
pre_heat_demand = None # Don't have this
|
|
|
|
summary_to_insert = {
|
|
"Postcode": extracted_contents["elmhurst summary report"]["Postcode"],
|
|
"City/County": extracted_contents["elmhurst summary report"]["County"],
|
|
"District/Town": extracted_contents["elmhurst summary report"]["Town"],
|
|
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst summary report"]["Current SAP Rating"],
|
|
'Pre Heat Transfer': pre_heat_transfer,
|
|
'Pre Total Floor Area': total_floor_area,
|
|
'Pre Heat Demand': pre_heat_demand,
|
|
"R. Assessor - Name": extracted_contents["elmhurst summary report"]["Assessor Name"],
|
|
"Retrofit Assessment Date": extracted_contents["elmhurst summary report"]["Assessment Date"],
|
|
}
|
|
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
summary_to_insert
|
|
)
|
|
|
|
if extracted_contents.get("pulse air permeability"):
|
|
# We extract the AP50 number
|
|
results_table = extracted_contents["pulse air permeability"]["Results Table"]
|
|
ap50 = [x["Extrapolated @ 50PA"] for x in results_table if x["Metric"] == "Air Permeability"][0]
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
{"Pre Air Tightness": ap50}
|
|
)
|
|
|
|
if extracted_contents.get("elmhurst project handover"):
|
|
handover_to_insert = {
|
|
"Number of Eligible Measures Installed": len(
|
|
extracted_contents["elmhurst project handover"]["Measures Fitted"]
|
|
),
|
|
"Retrofit Designer Name": extracted_contents["elmhurst project handover"]["Designer Name"],
|
|
"Company Name": extracted_contents["elmhurst project handover"]["Installer Name"],
|
|
"R. Coordinator - Name": extracted_contents["elmhurst project handover"]["Retrofit Coordinator Name"],
|
|
}
|
|
update_dictionary_with_check(output_row_data, handover_to_insert)
|
|
|
|
if extracted_contents.get("core logic pas assessment report"):
|
|
cr_to_insert = {
|
|
"No. of Bedrooms": extracted_contents["core logic pas assessment report"]["Number of bedrooms"],
|
|
}
|
|
update_dictionary_with_check(
|
|
output_row_data,
|
|
cr_to_insert
|
|
)
|
|
|
|
extracted.append(output_row_data)
|
|
|
|
extracted_df = pd.DataFrame(extracted)
|
|
|
|
extracted_df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot/poc-extrcted-data.csv",
|
|
index=False)
|