From 965cf975e289b11bd1387a55c251e1c50e0327e0 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Wed, 27 Nov 2024 13:08:03 +0000 Subject: [PATCH] setting up data extraction pilot --- .idea/Model.iml | 2 +- .idea/misc.xml | 2 +- etl/lodgement/app.py | 47 +++++ etl/lodgement/requirements.txt | 8 + utils/file_data_extraction.py | 343 +++++++++++++++++++++++++++++++++ 5 files changed, 400 insertions(+), 2 deletions(-) create mode 100644 etl/lodgement/app.py create mode 100644 etl/lodgement/requirements.txt create mode 100644 utils/file_data_extraction.py diff --git a/.idea/Model.iml b/.idea/Model.iml index 762580d9..9b63b142 100644 --- a/.idea/Model.iml +++ b/.idea/Model.iml @@ -7,7 +7,7 @@ - + diff --git a/.idea/misc.xml b/.idea/misc.xml index c916a158..acd935c1 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,7 +3,7 @@ - + diff --git a/etl/lodgement/app.py b/etl/lodgement/app.py new file mode 100644 index 00000000..ede644b8 --- /dev/null +++ b/etl/lodgement/app.py @@ -0,0 +1,47 @@ +import os +import utils.file_data_extraction as file_extraction_tools + + +def handler(): + """ + This is a simple application that will extract the data from documents that have been uploaded to Sharepoint + to populate the lodgement spreadsheet with + :return: + """ + + # Ths source data will eventually come from Sharepoint + source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot" + output_template = "Trustmark Details - Template REV.25.11.24.xlsx" + + # List the folders in the source data path + folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))] + + extractors = { + "elmhurst epr": file_extraction_tools.ElmhurstEprExtractor, + "elmhurst summary report": None, + "osmosis condition report": None, + "elmhurst evidence report": None + } + + for property_folder in folders: + coordinator_folder = os.path.join(source_data_path, property_folder, "2. RA Coordinator Info") + + # Get the contents of the folder + coordinator_folder_contents = [ + file for file in os.listdir(coordinator_folder) if os.path.isfile(os.path.join(coordinator_folder, file)) + ] + + # We detect the various file types + extracted_contents = {} + for filename in coordinator_folder_contents: + filepath = os.path.join(coordinator_folder, filename) + if file_extraction_tools.is_pdf(filepath): + report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) + if report_type is None: + raise ValueError(f"Unknown report type for {filename}") + + file_extractor = extractors.get(report_type) + if file_extractor is None: + continue + + extracted_contents[report_type] = file_extractor(filepath).extract() diff --git a/etl/lodgement/requirements.txt b/etl/lodgement/requirements.txt new file mode 100644 index 00000000..601907ed --- /dev/null +++ b/etl/lodgement/requirements.txt @@ -0,0 +1,8 @@ +PyPDF2 +pandas +tqdm +openpyxl +boto3 +usaddress==0.5.11 +fuzzywuzzy==0.18.0 +python-dotenv \ No newline at end of file diff --git a/utils/file_data_extraction.py b/utils/file_data_extraction.py new file mode 100644 index 00000000..cdd25f8a --- /dev/null +++ b/utils/file_data_extraction.py @@ -0,0 +1,343 @@ +import PyPDF2 +import re +from collections import Counter + +""" +This script contains functions used to extract data from retrofit survey files, including EPRs, +summary reports, etc +""" + + +def is_elmhurst_energy_report(text): + """ + Determines if the provided text indicates that the PDF is an Energy Report. + Returns True if the text contains 'Energy Report'. + """ + return text.startswith("ENERGY REPORT") + + +def is_elmhurst_summary_report(text): + """ + Determines if the provided text indicates that the PDF is a Summary Report. + """ + return text.startswith("Summary Information") + + +def is_osmosis_condition_report(text): + """ + Determines if the provided text indicates that the PDF is a Condition Report. + """ + return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport") + + +def is_elmhurst_evidence_report(text): + """ + Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report. + """ + return text.startswith("RdSAP Evidence Report") + + +def detect_pdf_report_type(pdf_path): + """ + Detects the type of report based on content or filename. + :param pdf_path: String path to the PDF file + :param pdf_file: String name of the PDF file + :return: String type of the report ("epr", "summary", or None) + """ + # Attempt to read the first page of the PDF to determine type + with open(pdf_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + first_page_text = reader.pages[0].extract_text() if reader.pages else "" + + if is_elmhurst_energy_report(first_page_text): + return "elmhurst epr" + elif is_elmhurst_summary_report(first_page_text): + return "elmhurst summary report" + elif is_osmosis_condition_report(first_page_text): + return "osmosis condition report" + elif is_elmhurst_evidence_report(first_page_text): + return "elmhurst evidence report" + + return None + + +def is_pdf(filename): + """ + Determines if the provided filename is a PDF file. + """ + return filename.endswith(".pdf") + + +class ElmhurstEprExtractor: + def __init__(self, file_path): + self.file_path = file_path + + @staticmethod + def extract_window_age_description(windows_text): + """ + Extracts the most common window age description and its proportion. + """ + windows_text = windows_text.replace("\n", "") + window_descriptions = [ + "Double post or during 2002", + "Double pre 2002", + "Double with unknown install date", + "Secondary glazing", + "Triple glazing", + "Single glazing", + ] + description_counts = Counter() + for description in window_descriptions: + matches = re.findall(re.escape(description), windows_text) + description_counts[description] = len(matches) + + if not description_counts or not sum(description_counts.values()): + raise ValueError("Failed to extract window data.") + + most_common_description, window_count = description_counts.most_common(1)[0] + window_proportion = window_count / sum(description_counts.values()) * 100 + + if window_proportion == 100: + second_most_common_description = None + second_most_common_proportion = 0 + else: + second_most_common_description, second_window_count = description_counts.most_common(2)[1] + second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 + + return { + "Window Age Description": most_common_description, + "Window Age Description Proportion (%)": window_proportion, + "Secondary Window Age Description": second_most_common_description, + "Secondary Window Age Description Proportion (%)": second_most_common_proportion, + "Number of Windows": sum(description_counts.values()) + } + + @staticmethod + def extract_building_parts(text): + """ + Extracts building parts and associated dimensions from the provided text. + """ + data = [] + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party " + r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)", + re.DOTALL + ) + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + floor_data = match.group(2) + room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name) + if room_in_roof_match: + floor_area = float(room_in_roof_match.group(1)) + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + data.append({ + "Building Part": cleaned_part_name, + "Floor Level": "Room in Roof", + "Floor Area (m2)": floor_area, + "Room Height (m)": None, + "Perimeter (m)": None, + "Party Wall Length (m)": None + }) + else: + cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip() + + floor_pattern = re.compile( + r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + for floor_match in floor_pattern.finditer(floor_data): + floor_level = floor_match.group(1) + floor_area = float(floor_match.group(2)) + room_height = float(floor_match.group(3)) + perimeter = float(floor_match.group(4)) + party_wall_length = float(floor_match.group(5)) + data.append({ + "Building Part": cleaned_part_name, + "Floor Level": floor_level, + "Floor Area (m2)": floor_area, + "Room Height (m)": room_height, + "Perimeter (m)": perimeter, + "Party Wall Length (m)": party_wall_length + }) + + return data + + @staticmethod + def extract_roof_details(text): + """ + Extracts roof details for each building part in the provided text. + """ + roof_data = [] + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", + re.DOTALL + ) + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + part_details = match.group(2) + roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details) + roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details) + roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details) + + roof_data.append({ + "Building Part": cleaned_part_name, + "Roof Type": roof_type_match.group(1).strip() if roof_type_match else None, + "Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None, + "Roof Insulation Thickness": roof_insulation_thickness_match.group( + 1).strip() if roof_insulation_thickness_match else None, + }) + + return roof_data + + @staticmethod + def extract_wall_details(text): + """ + Extracts wall details for each building part in the provided text. + """ + wall_data = [] + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", + re.DOTALL + ) + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + part_details = match.group(2) + wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details) + wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details) + wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details) + wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details) + + wall_data.append({ + "Building Part": cleaned_part_name, + "Wall Type": wall_type_match.group(1).strip() if wall_type_match else None, + "Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None, + "Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None, + "Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None, + }) + + return wall_data + + @staticmethod + def extract_primary_heating(text): + + # Extract Primary Heating Section (Main Heating 1) + primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL) + # We may not have a secondary heating + primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + primary_text = primary_heating_section.group(1) + + primary_heating_output = { + "Existing Primary Heating System": re.search( + r"Main Heating Code\s*(.*?)\n", primary_text + ).group(1).strip(), + "Existing Primary Heating PCDF Reference": re.search( + r"PCDF boiler Reference\s*(\d+)", primary_text + ).group(1), + "Existing Primary Heating Controls": re.search( + r"Main Heating Controls\s*(.*?)\n", primary_text + ).group(1).strip(), + "Existing Primary Heating % of Heat": int( + re.search(r"Percentage of Heat\s*(\d+)\s*%?", primary_text).group(1) + ) + } + + return primary_heating_output + + @staticmethod + def extract_secondary_heating(text): + # Extract Secondary Heating Section (Main Heating 2) + secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL) + output = {} + if secondary_heating_section is None: + output["Existing Heating System"] = "" + output["Existing Heating PCDF Reference"] = "" + output["Existing Heating Controls"] = "" + output["Existing Heating % of Heat"] = 0 + + else: + secondary_text = secondary_heating_section.group(1) + + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + output["Existing Heating System"] = main_heating_code_match_secondary.group(1).strip() + + output["Existing Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", secondary_text + ).group(1) + + if output["Existing Heating System"] == "": + output["Existing Heating Controls"] = "" + else: + # Might not have heating controls on 2nd system + secondary_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) + output["Existing Heating Controls"] = ( + secondary_controls_match.group(1).strip() if secondary_controls_match else "" + ) + output["Existing Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%?", secondary_text).group(1) + ) + + return output + + def extract(self): + data = {} + + with open(self.file_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + text = "".join(page.extract_text() for page in reader.pages) + + # Extracting individual components + address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL) + data["Address"] = address_match.group(1).strip() + data["Postcode"] = data["Address"].split(",")[-1].strip() + + sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text) + data["Current SAP Rating"] = int(sap_match.group(1)) + + energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text) + data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1)) + + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + data["Number of Storeys"] = int(storeys_match.group(1)) + + fuel_match = re.search(r"TOTAL\s*£(\d+)", text) + data["Fuel Bill"] = f"£{fuel_match.group(1)}" + + total_doors_match = re.search(r"Total Doors:\s*(\d+)", text) + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text) + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + # Get number of lighting outlets and number of fittings needing LEL + lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text) + data["Number of Light Fittings"] = int(lighting_fittings_match.group(1)) + lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text) + data["Number of LEL Fittings"] = int(lel_fittings_match.group(1)) + data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + data["Windows"] = self.extract_window_age_description(windows_section.group(1)) + + data["Primary Heating"] = self.extract_primary_heating(text) + data["Secondary Heating"] = self.extract_secondary_heating(text) + data["Building Parts"] = self.extract_building_parts(text) + data["Roof Details"] = self.extract_roof_details(text) + data["Wall Details"] = self.extract_wall_details(text) + + secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + + if data["Secondary Heating"]["Existing Heating System"] == "": + data["Secondary Heating Code"] = "" + else: + data["Secondary Heating Code"] = secondary_heating_code_match.group( + 1).strip() if secondary_heating_code_match else "" + + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + return data