import os import pandas as pd import utils.file_data_extraction as file_extraction_tools from utils.fullSapParser import FullSapParser from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser output_template = { "Property Address": None, "Osm. ID": None, "Postcode": None, "City/County": None, "District/Town": None, "Funding Stream": None, # "Risk Path": None, "Local Authority": None, "Trustmark Lodgement ID": None, "Certificate Number": None, "EWI UMR": None, "Loft UMR": None, "Windows UMR": None, "Doors UMR": None, "Measure Lodgement Date": None, "Full Lodgement Date": None, "Owner - Name": None, "Owner - Phone": None, "Owner - Email": None, "Tenant - Name": None, "Tenant - Phone": None, "R. Assessor - Name": None, "R. Coordinator - Name": None, "Trustmark Licence Number": None, "Retrofit Assessment Date": None, "Company Name": None, "Retrofit Designer Name": None, "Property Type": None, "Property Detachment": None, "No. of Bedrooms": None, "Property age": None, "SAP Rating Pre (from IMA)": None, "Pre Heat Transfer": None, "Pre Total Floor Area": None, "Pre Heat Demand": None, "Pre Air Tightness": None, "SAP Rating Post (from EPC)": None, "Post Heat Transfer": None, "Post Total Floor Area": None, "Post Heat Demand": None, "Post Air Tightness": None, "Number of Eligible Measures Installed": None, "Total Cost of Works": None, "Annual Fuel Saving (MTP)": None, } def update_dictionary_with_check(dictionary, updates): """ Updates a dictionary with key-value pairs, raising an error if the key does not exist. Args: dictionary (dict): The dictionary to update. updates (dict): The updates to apply. Raises: KeyError: If a key in updates does not exist in the dictionary. """ for key, value in updates.items(): if key not in dictionary: raise KeyError(f"Key '{key}' does not exist in the dictionary.") dictionary[key] = value def handler(): """ This is a simple application that will extract the data from documents that have been uploaded to Sharepoint to populate the lodgement spreadsheet with :return: """ # Ths source data will eventually come from Sharepoint source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot" output_template_file = "Trustmark Details - Template REV.25.11.24.xlsx" funding_stream = "HUG2" customer_name = "Shropshire Council" customer_phone = "0345 678 9000" customer_email = "affordablewarmth@shropshire.gov.uk" # TODO: In order for this to go live, we need to use Poppler, which needs to be installed # w/ brew install poppler # We also need to install Tesseract: brew install tesseract # List the folders in the source data path folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))] extractors = { "elmhurst epr": file_extraction_tools.ElmhurstEprExtractor, "elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor, "osmosis condition report": OsmosisConditionReportParser, "elmhurst evidence report": None, "full sap xml": FullSapParser, "pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor, "elmhurst project handover": file_extraction_tools.ElmhurstProjectHandoverExtractor, "core logic pas assessment report": file_extraction_tools.CoreLogicPasAssessmentReportExtractor, } extracted = [] for property_folder in folders: property_folder_path = os.path.join(source_data_path, property_folder) # List the folders in the source data path subfolders = [ x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x)) ] coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0]) # Get the contents of the folder coordinator_folder_contents = [ file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file)) ] # We detect the various file types extracted_contents = {} for filename in coordinator_folder_contents: filepath = os.path.join(coord_folder, filename) if file_extraction_tools.is_pdf(filepath): report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) if report_type is None: raise ValueError(f"Unknown report type for {filename}") file_extractor = extractors[report_type] if file_extractor is None: continue extracted_contents[report_type] = file_extractor(filepath).extract() if file_extraction_tools.is_xml(filepath): xml_type = file_extraction_tools.detect_xml_report_type(xml_path=filepath) if xml_type is None: raise ValueError(f"Unknown report type for {filename}") file_extractor = extractors.get(xml_type) if file_extractor is None: continue extracted_contents[xml_type] = file_extractor(filepath).extract() att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0]) att_folder_contents = [ file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file)) ] for filename in att_folder_contents: filepath = os.path.join(att_folder, filename) if file_extraction_tools.is_pdf(filepath): report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) if report_type is None: raise ValueError(f"Unknown report type for {filename}") file_extractor = extractors[report_type] if file_extractor is None: continue extracted_contents[report_type] = file_extractor(filepath).extract() lodgement_folder = os.path.join( property_folder_path, [f for f in subfolders if "TrustMark Lodgement" in f][0] ) # Within the lodgement folder, we want the required documents sub-folder lodgement_subfolders = [ file for file in os.listdir(lodgement_folder) if os.path.isdir(os.path.join(lodgement_folder, file)) ] required_documents_folder = os.path.join( lodgement_folder, [f for f in lodgement_subfolders if "required documents" in f.lower()][0] ) # List the contents required_documents_contents = [ file for file in os.listdir(required_documents_folder) if os.path.isfile(os.path.join(required_documents_folder, file)) ] # There are only a few file types we actually want to process in here for the moment for filename in required_documents_contents: filepath = os.path.join(required_documents_folder, filename) if file_extraction_tools.is_pdf(filepath): report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) if report_type != "elmhurst project handover": continue file_extractor = extractors[report_type] extracted_contents[report_type] = file_extractor(filepath).extract() output_row_data = output_template.copy() # dict_keys([ 'City/County', 'District/Town', # 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR', # 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone', # 'Owner - Email', 'Tenant - Name', 'Tenant - Phone', # 'Trustmark Licence Number', # Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat # Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness', # 'Total Cost of Works', 'Annual Fuel Saving (MTP)']) update_dictionary_with_check( output_row_data, { "Funding Stream": funding_stream, "Property Address": property_folder.split(")")[1].strip(), "Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(), } ) if extracted_contents.get("elmhurst epr"): total_floor_area = sum( [x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] + # Get the conservatory floor area [extracted_contents["elmhurst epr"]["Conservatory"]["Conservatory Floor Area"]] ) pre_heat_transfer = extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] pre_heat_demand = ( extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area ) epr_to_insert = { "Postcode": extracted_contents["elmhurst epr"]["Postcode"], "City/County": extracted_contents["elmhurst epr"]["County"], "District/Town": extracted_contents["elmhurst epr"]["Town"], "Local Authority": None, 'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"], 'Pre Heat Transfer': pre_heat_transfer, 'Pre Total Floor Area': total_floor_area, 'Pre Heat Demand': pre_heat_demand, "R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"], "Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"], } update_dictionary_with_check( output_row_data, epr_to_insert ) if extracted_contents.get("full sap xml"): xml_to_insert = { "Property Type": extracted_contents["full sap xml"]["Property Type"], "Property Detachment": extracted_contents["full sap xml"]["Built Form"], "Property age": extracted_contents["full sap xml"]["Age Band"], } update_dictionary_with_check( output_row_data, xml_to_insert ) if extracted_contents.get("osmosis condition report"): cr_to_insert = { "No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"], # "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"], } update_dictionary_with_check( output_row_data, cr_to_insert ) if extracted_contents.get("elmhurst summary report"): total_floor_area = sum( [x["Floor Area (m2)"] for x in extracted_contents["elmhurst summary report"]["Building Parts"]] + # Get the conservatory floor area [extracted_contents["elmhurst summary report"]["Conservatory"]["Conservatory Floor Area"]] ) pre_heat_transfer = ( extracted_contents["elmhurst summary report"]["Primary Energy Use Intensity (kWh/m2/yr)"] ) pre_heat_demand = None # Don't have this summary_to_insert = { "Postcode": extracted_contents["elmhurst summary report"]["Postcode"], "City/County": extracted_contents["elmhurst summary report"]["County"], "District/Town": extracted_contents["elmhurst summary report"]["Town"], 'SAP Rating Pre (from IMA)': extracted_contents["elmhurst summary report"]["Current SAP Rating"], 'Pre Heat Transfer': pre_heat_transfer, 'Pre Total Floor Area': total_floor_area, 'Pre Heat Demand': pre_heat_demand, "R. Assessor - Name": extracted_contents["elmhurst summary report"]["Assessor Name"], "Retrofit Assessment Date": extracted_contents["elmhurst summary report"]["Assessment Date"], } update_dictionary_with_check( output_row_data, summary_to_insert ) if extracted_contents.get("pulse air permeability"): # We extract the AP50 number results_table = extracted_contents["pulse air permeability"]["Results Table"] ap50 = [x["Extrapolated @ 50PA"] for x in results_table if x["Metric"] == "Air Permeability"][0] update_dictionary_with_check( output_row_data, {"Pre Air Tightness": ap50} ) if extracted_contents.get("elmhurst project handover"): handover_to_insert = { "Number of Eligible Measures Installed": len( extracted_contents["elmhurst project handover"]["Measures Fitted"] ), "Retrofit Designer Name": extracted_contents["elmhurst project handover"]["Designer Name"], "Company Name": extracted_contents["elmhurst project handover"]["Installer Name"], "R. Coordinator - Name": extracted_contents["elmhurst project handover"]["Retrofit Coordinator Name"], } update_dictionary_with_check(output_row_data, handover_to_insert) if extracted_contents.get("core logic pas assessment report"): cr_to_insert = { "No. of Bedrooms": extracted_contents["core logic pas assessment report"]["Number of bedrooms"], } update_dictionary_with_check( output_row_data, cr_to_insert ) extracted.append(output_row_data) extracted_df = pd.DataFrame(extracted) extracted_df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot/poc-extrcted-data.csv", index=False)