import PyPDF2 import re import pdfplumber from collections import Counter from utils.logger import setup_logger from xml.dom.minidom import parseString from pdf2image import convert_from_path from pytesseract import image_to_string logger = setup_logger() """ This script contains functions used to extract data from retrofit survey files, including EPRs, summary reports, etc """ def is_elmhurst_energy_report(text): """ Determines if the provided text indicates that the PDF is an Energy Report. Returns True if the text contains 'Energy Report'. """ return text.startswith("ENERGY REPORT") def is_elmhurst_summary_report(text): """ Determines if the provided text indicates that the PDF is a Summary Report. """ return text.startswith("Summary Information") def is_osmosis_condition_report(text): """ Determines if the provided text indicates that the PDF is a Condition Report. """ return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport") def is_elmhurst_evidence_report(text): """ Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report. """ return text.startswith("RdSAP Evidence Report") def is_pulse_air_permeability(text): """ Determines if the provided text indicates that the PDF is a Pulse Air Permeability Report. """ return text.startswith("Air Permeability Test Report @O PULSE") def is_elmhurst_project_handover(text): """ Determines if the provided text indicates that the PDF is an Elmhurst Project Handover Report. """ return "Retrofit_Project_Handover" in text or "Retrofit Project Handover" in text def is_core_logic_pas_assessment_report(text): """ Determines if the provided text indicates that the PDF is a PAS Assessment Report. """ return text.startswith("Generated Using CoreLogic UK PAS Assessment") def detect_pdf_report_type(pdf_path): """ Detects the type of report based on content or filename. :param pdf_path: String path to the PDF file :return: String type of the report ("epr", "summary", or None) """ # Attempt to read the first page of the PDF to determine type with open(pdf_path, "rb") as file: reader = PyPDF2.PdfReader(file) first_page_text = reader.pages[0].extract_text() if reader.pages else "" if first_page_text == "": # Convert PDF pages to images logger.info("Extracting text from PDF images..., this may take a moment.") pages = convert_from_path(pdf_path, dpi=300) if pages: first_page_text = image_to_string(pages[0]) if is_elmhurst_energy_report(first_page_text): return "elmhurst epr" elif is_elmhurst_summary_report(first_page_text): return "elmhurst summary report" elif is_osmosis_condition_report(first_page_text): return "osmosis condition report" elif is_elmhurst_evidence_report(first_page_text): return "elmhurst evidence report" elif is_pulse_air_permeability(first_page_text): return "pulse air permeability" elif is_elmhurst_project_handover(first_page_text): return "elmhurst project handover" elif is_core_logic_pas_assessment_report(first_page_text): return "core logic pas assessment report" return None def detect_xml_report_type(xml_path): """ Detects the type of XML report based on content or filename. :param xml_path: String path to the XML file :return: String type of the report ("full sap xml", or None) """ # Attempt to read the first page of the PDF to determine type with open(xml_path, "r") as file: contents = file.read() contents = parseString(contents) product_tag_search = contents.getElementsByTagName("Product") if product_tag_search: if product_tag_search[0].firstChild.nodeValue == "Sap 2012 Desktop": return "full sap xml" raise Exception("Not implemented") def is_pdf(filename): """ Determines if the provided filename is a PDF file. """ return filename.endswith(".pdf") def is_xml(filename): """ Determines if the provided filename is an XML file. """ return filename.endswith(".xml") class ElmhurstEprExtractor: """ A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). """ def __init__(self, file_path): self.file_path = file_path @staticmethod def extract_window_age_description(windows_text): """ Extracts the most common window age description and its proportion. """ windows_text = windows_text.replace("\n", "") window_descriptions = [ "Double post or during 2002", "Double pre 2002", "Double with unknown install date", "Secondary glazing", "Triple glazing", "Single glazing", ] description_counts = Counter() for description in window_descriptions: matches = re.findall(re.escape(description), windows_text) description_counts[description] = len(matches) if not description_counts or not sum(description_counts.values()): raise ValueError("Failed to extract window data.") most_common_description, window_count = description_counts.most_common(1)[0] window_proportion = window_count / sum(description_counts.values()) * 100 if window_proportion == 100: second_most_common_description = None second_most_common_proportion = 0 else: second_most_common_description, second_window_count = description_counts.most_common(2)[1] second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 return { "Window Age Description": most_common_description, "Window Age Description Proportion (%)": window_proportion, "Secondary Window Age Description": second_most_common_description, "Secondary Window Age Description Proportion (%)": second_most_common_proportion, "Number of Windows": sum(description_counts.values()) } @staticmethod def extract_building_parts(text): """ Extracts building parts and associated dimensions from the provided text. """ data = [] building_part_pattern = re.compile( r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party " r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)", re.DOTALL ) for match in building_part_pattern.finditer(text): part_name = match.group(1).strip() floor_data = match.group(2) room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name) if room_in_roof_match: floor_area = float(room_in_roof_match.group(1)) cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() data.append({ "Building Part": cleaned_part_name, "Floor Level": "Room in Roof", "Floor Area (m2)": floor_area, "Room Height (m)": None, "Perimeter (m)": None, "Party Wall Length (m)": None }) else: cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip() floor_pattern = re.compile( r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" ) for floor_match in floor_pattern.finditer(floor_data): floor_level = floor_match.group(1) floor_area = float(floor_match.group(2)) room_height = float(floor_match.group(3)) perimeter = float(floor_match.group(4)) party_wall_length = float(floor_match.group(5)) data.append({ "Building Part": cleaned_part_name, "Floor Level": floor_level, "Floor Area (m2)": floor_area, "Room Height (m)": room_height, "Perimeter (m)": perimeter, "Party Wall Length (m)": party_wall_length }) return data @staticmethod def extract_roof_details(text): """ Extracts roof details for each building part in the provided text. """ roof_data = [] building_part_pattern = re.compile( r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", re.DOTALL ) for match in building_part_pattern.finditer(text): part_name = match.group(1).strip() cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() part_details = match.group(2) roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details) roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details) roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details) roof_data.append({ "Building Part": cleaned_part_name, "Roof Type": roof_type_match.group(1).strip() if roof_type_match else None, "Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None, "Roof Insulation Thickness": roof_insulation_thickness_match.group( 1).strip() if roof_insulation_thickness_match else None, }) return roof_data @staticmethod def extract_wall_details(text): """ Extracts wall details for each building part in the provided text. """ wall_data = [] building_part_pattern = re.compile( r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", re.DOTALL ) for match in building_part_pattern.finditer(text): part_name = match.group(1).strip() cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() part_details = match.group(2) wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details) wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details) wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details) wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details) wall_data.append({ "Building Part": cleaned_part_name, "Wall Type": wall_type_match.group(1).strip() if wall_type_match else None, "Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None, "Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None, "Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None, }) return wall_data @staticmethod def extract_conservatory(text): """ Extracts conservatory data from the provided text. The section is located between "Conservatory" and "Doors". Args: text (str): The full text of the EPR PDF. Returns: dict: A dictionary with conservatory details: - "Conservatory Present" - "Conservatory Separated" - "Conservatory Floor Area" - "Conservatory Double Glazed" - "Conservatory Glazed Perimeter" - "Heated Conservatory Height" """ conservatory_match = re.search(r"Conservatory\s*(.*?)\s*Doors", text, re.DOTALL) if not conservatory_match: logger.error("Failed to extract conservatory data.") raise ValueError("Could not extract conservatory data.") conservatory_text = conservatory_match.group(1) # Check if conservatory is present present_match = re.search(r"Conservatory Present:\s*(Yes|No)", conservatory_text) if not present_match or present_match.group(1).strip() == "No": logger.info("Conservatory not present.") return { "Conservatory Present": "No", "Conservatory Separated": "", "Conservatory Floor Area": 0, "Conservatory Double Glazed": "", "Conservatory Glazed Perimeter": 0, "Heated Conservatory Height": "", } # Extract conservatory details separated_match = re.search(r"Conservatory Separated:\s*(Yes|No)", conservatory_text) floor_area_match = re.search(r"Conservatory Floor Area:\s*([\d.]+)", conservatory_text) double_glazed_match = re.search(r"Conservatory Double Glazed:\s*(Yes|No)", conservatory_text) glazed_perimeter_match = re.search(r"Conservatory Glazed Perimeter:\s*([\d.]+)", conservatory_text) height_match = re.search(r"Heated Conservatory Height:\s*(.*?)(?=\n|$)", conservatory_text) return { "Conservatory Present": "Yes", "Conservatory Separated": separated_match.group(1).strip() if separated_match else "", "Conservatory Floor Area": float(floor_area_match.group(1)) if floor_area_match else 0, "Conservatory Double Glazed": double_glazed_match.group(1).strip() if double_glazed_match else "", "Conservatory Glazed Perimeter": float(glazed_perimeter_match.group(1)) if glazed_perimeter_match else 0, "Heated Conservatory Height": height_match.group(1).strip() if height_match else "", } @staticmethod def _extract_heating_details(section_text, default_value=""): """ Extracts heating details from a given section of text. Args: section_text (str): The section of text containing heating details. default_value (str, optional): The default value to return for missing fields. Defaults to "". Returns: dict: A dictionary containing heating system details. """ system_search = re.search(r"Main Heating Code\s*(.*?)\n", section_text) pcdf_search = re.search(r"PCDF boiler Reference\s*(\d+)", section_text) controls_search = re.search(r"Main Heating Controls\s*(.*?)\n", section_text) heat_search = re.search(r"Percentage of Heat\s*(\d+)\s*%?", section_text) return { "System": system_search.group(1).strip() if system_search else default_value, "PCDF Reference": pcdf_search.group(1) if pcdf_search else default_value, "Controls": controls_search.group(1).strip() if controls_search else default_value, "% of Heat": int(heat_search.group(1)) if heat_search else 0, } def extract_primary_heating(self, text): # Extract Primary Heating Section (Main Heating 1) primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL) # We may not have a secondary heating primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL) primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 primary_text = primary_heating_section.group(1) return self._extract_heating_details(primary_text) def extract_secondary_heating_details(self, text): # Extract Secondary Heating Section (Main Heating 2) secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL) output = {} if secondary_heating_section is None: output["System"] = "" output["PCDF Reference"] = "" output["Controls"] = "" output["% of Heat"] = 0 else: secondary_text = secondary_heating_section.group(1) output.update( **self._extract_heating_details(secondary_text) ) output["Heating Code"] = ( re.search(r"Secondary Heating Code\s*(.*?)\n", text).group(1).strip() if output["System"] and re.search(r"Secondary Heating Code\s*(.*?)\n", text) else "" ) return output def extract(self): """ Extracts all relevant data from the EPR PDF. Returns: dict: A dictionary containing extracted data, including: - Address and Postcode - SAP Rating and Primary Energy Use - Lighting, Doors, Windows, Roof, and Wall Details - Heating systems (Primary and Secondary) - Building Parts """ data = {} with open(self.file_path, "rb") as file: reader = PyPDF2.PdfReader(file) text = "".join(page.extract_text() for page in reader.pages) data["Assessor Name"] = re.search(r"Created by:\s*(.*?)\n", text).group(1).strip() data["Assessment Date"] = re.search(r"\nAssessment Date\s*(.*?)\n", text).group(1).strip() # Extracting individual components address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL) if not address_match: logger.error("Failed to extract address.") raise ValueError("Failed to extract address.") data["Address"] = address_match.group(1).strip() data["Postcode"] = data["Address"].split(",")[-1].strip() # TODO: data["Region"] = None data["House Name"] = None data["House No"] = None data["Street"] = None data["Locality"] = None data["Town"] = None data["County"] = None sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text) if not sap_match: logger.error("Failed to extract SAP rating.") raise ValueError("Failed to extract SAP rating.") data["Current SAP Rating"] = int(sap_match.group(1)) energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text) if not energy_match: logger.error("Failed to extract primary energy use.") raise ValueError("Failed to extract primary energy use.") data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1)) storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) if not storeys_match: logger.error("Failed to extract the number of storeys.") raise ValueError("Failed to extract the number of storeys.") data["Number of Storeys"] = int(storeys_match.group(1)) fuel_match = re.search(r"TOTAL\s*£(\d+)", text) if not fuel_match: logger.error("Failed to extract fuel bill.") raise ValueError("Failed to extract fuel bill.") data["Fuel Bill"] = f"£{fuel_match.group(1)}" total_doors_match = re.search(r"Total Doors:\s*(\d+)", text) if not total_doors_match: logger.error("Failed to extract total doors.") raise ValueError("Failed to extract total doors.") data["Total Number of Doors"] = int(total_doors_match.group(1)) # Extract Number of Insulated Doors insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text) if not insulated_doors_match: logger.error("Failed to extract insulated doors.") raise ValueError("Failed to extract insulated doors.") data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) # Get number of lighting outlets and number of fittings needing LEL lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text) if not lighting_fittings_match: logger.error("Failed to extract lighting.") raise ValueError("Failed to extract lighting") data["Number of Light Fittings"] = int(lighting_fittings_match.group(1)) lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text) if not lel_fittings_match: logger.error("Failed to extract LEL fittings.") raise ValueError("Failed to extract LEL fittings.") data["Number of LEL Fittings"] = int(lel_fittings_match.group(1)) data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) if not windows_section: logger.error("Failed to extract window data.") raise ValueError("Failed to extract window data.") data["Windows"] = self.extract_window_age_description(windows_section.group(1)) data["Primary Heating"] = self.extract_primary_heating(text) data["Secondary Heating"] = self.extract_secondary_heating_details(text) data["Building Parts"] = self.extract_building_parts(text) data["Roof Details"] = self.extract_roof_details(text) data["Wall Details"] = self.extract_wall_details(text) data["Conservatory"] = self.extract_conservatory(text) water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) if not water_heating_code_match: logger.error("Failed to extract water heating code.") raise ValueError("Failed to extract water heating code.") data["Water Heating Code"] = water_heating_code_match.group(1).strip() return data class ElmhurstSummaryReportExtractor: """ A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). """ def __init__(self, file_path): self.file_path = file_path @staticmethod def extract_window_age_description(windows_text): """ Extracts the most common window age description and its proportion. Parameters: windows_text (str): The text section containing window data. Returns: dict: A dictionary with the most common window age description and its proportion. """ # Clean up windows_text by removing line breaks for better pattern matching windows_text = windows_text.replace("\n", "") # Define possible window age descriptions window_descriptions = [ "Double post or during 2002", "Double pre 2002", "Double with unknown install date", "Secondary glazing", "Triple glazing", "Single glazing", ] # Count occurrences of each description description_counts = Counter() for description in window_descriptions: matches = re.findall(re.escape(description), windows_text) description_counts[description] = len(matches) if not description_counts or not sum(description_counts.values()): raise ValueError("Failed to extract window data.") # Determine the most common description and calculate its proportion most_common_description, window_count = description_counts.most_common(1)[0] window_proportion = window_count / sum(description_counts.values()) * 100 # Get the second most common and the proportion if window_proportion == 100: second_most_common_description = None second_most_common_proportion = 0 else: second_most_common_description, second_window_count = description_counts.most_common(2)[1] second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 return { "Window Age Description": most_common_description, "Window Age Description Proportion (%)": window_proportion, "Secondary Window Age Description": second_most_common_description, "Secondary Window Age Description Proportion (%)": second_most_common_proportion, "Number of Windows": sum(description_counts.values()) } @staticmethod def extract_primary_heating(text): primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL) primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 if primary_heating_section is None: raise ValueError("Failed to extract primary heating data.") primary_text = primary_heating_section.group(1) output = { 'System': re.search(r"Main Heating Code\s*(.*?)\n", primary_text).group(1).strip(), 'PCDF Reference': re.search(r"PCDF boiler Reference\s*(\d+)", primary_text).group(1), 'Controls': re.search(r"Main Heating Controls\s*(.*?)\n", primary_text).group(1).strip(), '% of Heat': int(re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1)) } return output @staticmethod def extract_secondary_heating_details(text): secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) # Defaults output = { "System": "", "PCDF Reference": "", "Controls": "", "% of Heat": 0, "Heating Code": "" } if secondary_heating_section is not None: # Overwrite defaults secondary_text = secondary_heating_section.group(1) main_heating_code_match_secondary = re.search( r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text ) output["System"] = main_heating_code_match_secondary.group(1).strip() output["PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", secondary_text).group(1) second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) output["Heating Controls"] = ( second_heating_controls_match.group(1).strip() if second_heating_controls_match else "" ) output["% of Heat"] = int( re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1) ) secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) if output["System"] != "": output["Heating Code"] = ( secondary_heating_code_match.group(1).strip() if secondary_heating_code_match else "" ) return output @staticmethod def extract_building_parts(text): """ Extracts building parts and associated dimensions from the summary report PDF. This includes Main Property, multiple extensions if they exist, and Room in Roof areas. """ data = [] # Locate the Dimensions section dimensions_section = re.search( r"Dimensions:\s*Dimension type: Internal\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL ) if not dimensions_section: raise ValueError("Failed to locate dimensions section in the text.") dimensions_text = dimensions_section.group(1) # Pattern to extract each building part, starting from Main Property and including extensions building_part_pattern = re.compile( r"(Main Property|\d+(?:st|nd|rd|th) Extension)\s*" r"(.*?)(?=\d+(?:st|nd|rd|th) Extension|5\.0 Conservatory|$)", re.DOTALL ) # Loop through each building part match, including Main Property and extensions for match in building_part_pattern.finditer(dimensions_text): part_name = match.group(1) floor_data = match.group(2) # Pattern to extract floor details: Floor Level, Floor Area, Room Height, Perimeter, Party Wall Length floor_pattern = re.compile( r"(1st Floor|Lowest Floor|Second floor):\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" ) # Extract data for each floor within the building part for floor_match in floor_pattern.finditer(floor_data): floor_level = floor_match.group(1) floor_area = float(floor_match.group(2)) room_height = float(floor_match.group(3)) perimeter = float(floor_match.group(4)) party_wall_length = float(floor_match.group(5)) # Append to data list data.append( { "Building Part": part_name, "Floor Level": floor_level, "Floor Area (m2)": floor_area, "Room Height (m)": room_height, "Perimeter (m)": perimeter, "Party Wall Length (m)": party_wall_length } ) # Check specifically for "Room(s) in Roof" entries, which only have Floor Area room_in_roof_pattern = re.compile(r"Room\(s\) in Roof:\s*([\d.]+)") room_in_roof_match = room_in_roof_pattern.search(floor_data) if room_in_roof_match: floor_area = float(room_in_roof_match.group(1)) data.append( { "Building Part": part_name, "Floor Level": "Room in Roof", "Floor Area (m2)": floor_area, "Room Height (m)": None, # Placeholder for missing data "Perimeter (m)": None, # Placeholder for missing data "Party Wall Length (m)": None # Placeholder for missing data } ) return data @staticmethod def extract_roof_details(text): """ Extracts roof type, insulation, and insulation thickness for each building part in the 8.0 Roofs section of the summary report. """ # Define data structure to hold results roof_data = [] # Locate the entire 8.0 Roofs section roof_section_match = re.search(r"8\.0 Roofs:\n(.*?)(?=\n9\.0 Floors:|$)", text, re.DOTALL) if not roof_section_match: return roof_data # Return empty if no roof section is found # Extract the roof section and append "9.0 Floors:" as the boundary roof_section = roof_section_match.group(1).strip() + "\n9.0 Floors:" # Define pattern to match each building part's roof entry building_part_pattern = re.compile( r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label r"Type\s+(.*?)(?=\n(?:Insulation|9\.0 Floors:|[A-Z]))" # Matches Roof Type until the next field, label, # or end r"(?:\nInsulation\s+(.*?)(?=\n(?:Insulation Thickness|9\.0 Floors:|[A-Z])))?" # Optional Insulation r"(?:\nInsulation Thickness\s+(.*?)(?=\n(?:9\.0 Floors:|[A-Z])))?", # Optional Insulation Thickness re.DOTALL ) # Extract each building part's data for match in building_part_pattern.finditer(roof_section): part_name = match.group(1).strip() # Building part label roof_type = match.group(2).strip() # Roof Type roof_insulation = match.group(3).strip() if match.group(3) else None # Optional Insulation roof_insulation_thickness = match.group(4).strip() if match.group(4) else None # Optional Thickness # Cleaning to handle annoying cases when it comes out like this: # 'A Another dwelling above\n1st Extension' if roof_type.startswith("A Another dwelling above"): roof_type = "A Another dwelling above" # Store results for this building part roof_data.append( { "Building Part": part_name, "Roof Type": roof_type, "Roof Insulation": roof_insulation, "Roof Insulation Thickness": roof_insulation_thickness, } ) return roof_data @staticmethod def extract_wall_details(text): """ Extracts wall type, insulation, dry-lining, and thickness for each building part, including any alternative wall details within the 7.0 Walls section of the summary PDF text. """ # Define data structure to hold all building part wall entries wall_data = [] # Locate the entire 7.0 Walls section wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1) # Define pattern to match each building part's wall entry within the section building_part_pattern = re.compile( r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label r"Type\s+(.*?)\n" # Matches main wall Type r"Insulation\s+(.*?)\n" # Matches main wall Insulation r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness re.DOTALL ) # Define pattern to capture alternative wall details, if present alternative_wall_pattern = re.compile( r"Alternative Wall Area.*?\n" # Matches start of alternative wall section r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness re.DOTALL ) # Find all building part entries within the 7.0 Walls section for match in building_part_pattern.finditer(wall_section): wall_label = match.group(1).strip() main_wall_type = match.group(2).strip() main_wall_insulation = match.group(3).strip() main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A" main_wall_thickness_unknown = match.group(6).strip() main_wall_thickness = int(match.group(7)) # Initialize dictionary for this wall entry wall_entry = { "Building Part": wall_label, "Wall Type": main_wall_type, "Wall Insulation": main_wall_insulation, "Wall Dry-lining": main_wall_dry_lining, "Wall Thickness Unknown": main_wall_thickness_unknown, "Wall Thickness (mm)": main_wall_thickness, "Alternative Wall Type": None, "Alternative Wall Insulation": None, "Alternative Wall Dry-lining": "N/A", "Alternative Wall Thickness Unknown": None, "Alternative Wall Thickness (mm)": None, } # Check if there's an alternative wall section following this wall entry alt_match = alternative_wall_pattern.search(wall_section, match.end()) if alt_match: wall_entry["Alternative Wall Type"] = alt_match.group(1).strip() wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip() wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A" wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip() wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6)) # Append each building part as a dictionary in the wall_data list wall_data.append(wall_entry) return wall_data @staticmethod def extract_conservatory(text): """ Extracts conservatory data from the provided text. The section is located between "5.0 Conservatory" and "7.0 Walls". Args: text (str): The full text of the Summary Report PDF. Returns: dict: A dictionary with conservatory details: - "Conservatory Present" - "Conservatory Separated" - "Conservatory Floor Area" - "Conservatory Double Glazed" - "Conservatory Glazed Perimeter" - "Heated Conservatory Height" """ # Extract the section between "5.0 Conservatory" and "7.0 Walls" conservatory_match = re.search(r"5\.0 Conservatory:(.*?)7\.0 Walls:", text, re.DOTALL) if not conservatory_match: logger.error("Failed to extract conservatory data.") raise ValueError("Could not extract conservatory data.") conservatory_text = conservatory_match.group(1) # Check if conservatory is present present_match = re.search(r"Is there a conservatory\?\s*(Yes|No)", conservatory_text, re.IGNORECASE) if not present_match or present_match.group(1).strip().lower() == "no": return { "Conservatory Present": "No", "Conservatory Separated": "", "Conservatory Floor Area": 0, "Conservatory Double Glazed": "", "Conservatory Glazed Perimeter": 0, "Heated Conservatory Height": "", } # If we get here, raise a temporary exception since we've not seen a case of this, so should make sure # this is correct separated_match = re.search(r"Is it thermally separated\?\s*(Yes|No)", conservatory_text, re.IGNORECASE) floor_area_match = re.search(r"Floor Area \[m2\]\s*([\d.]+)", conservatory_text, re.IGNORECASE) double_glazed_match = re.search(r"Double Glazed\s*(Yes|No)", conservatory_text, re.IGNORECASE) glazed_perimeter_match = re.search(r"Glazed Perimeter \[m\]\s*([\d.]+)", conservatory_text, re.IGNORECASE) height_match = re.search(r"Room Height\s*(.*?)(?=\n|$)", conservatory_text, re.IGNORECASE) return { "Conservatory Present": "Yes", "Conservatory Separated": separated_match.group(1).strip() if separated_match else "", "Conservatory Floor Area": float(floor_area_match.group(1)) if floor_area_match else 0, "Conservatory Double Glazed": double_glazed_match.group(1).strip() if double_glazed_match else "", "Conservatory Glazed Perimeter": float(glazed_perimeter_match.group(1)) if glazed_perimeter_match else 0, "Heated Conservatory Height": height_match.group(1).strip() if height_match else "", } def extract(self): """ Extracts specific data from the provided PDF file. Data includes: - Current SAP rating - Fuel Bill - Address """ data = {} with (open(self.file_path, "rb") as file): reader = PyPDF2.PdfReader(file) text = "" for page in reader.pages: text += page.extract_text() # Match and extract name_match = re.search(r"Name:\s*([A-Za-z\s]+)\s*Title:\s*([A-Za-z\.]+)", text) if not name_match: raise ValueError("Couldn't extract surveyor name") data["Assessor Name"] = name_match.group(2).strip() + " " + name_match.group(1).strip() data["Assessment Date"] = re.search(r"Inspection Date:\s*(.*?)\n", text).group(1).strip() # Address and postcode postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text) postcode = postcode.group(1).strip() if postcode else "" region = re.search(r"Region:\s*(.*?)\nHouse Name:", text) region = region.group(1).strip() if region else "" house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text) house_name = house_name.group(1).strip() if house_name else "" house_no = re.search(r"House No:\s*(.*?)\nStreet:", text) house_no = house_no.group(1).strip() if house_no else "" street = re.search(r"Street:\s*(.*?)\nLocality:", text) street = street.group(1).strip() if street else "" locality = re.search(r"Locality:\s*(.*?)\nTown:", text) locality = locality.group(1).strip() if locality else "" town = re.search(r"Town:\s*(.*?)\nCounty:", text) town = town.group(1).strip() if town else "" county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text) county = county.group(1).strip() if county else "" # Clean extracted values and remove any prefixes address_parts = [ house_no, house_name, street, locality, town, county, region, postcode ] # Join non-empty parts with a comma data["Address"] = ", ".join([part for part in address_parts if part]) data["Postcode"] = postcode data["Region"] = region data["House Name"] = house_name data["House No"] = house_no data["Street"] = street data["Locality"] = locality data["Town"] = town data["County"] = county # Extract Current SAP rating sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text) if not sap_match: raise ValueError("Could not extract SAP rating") data["Current SAP Rating"] = sap_match.group(1).split(" ")[1] # We don't have primary energy in the summary report data['Primary Energy Use Intensity (kWh/m2/yr)'] = None # Number of storeys storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) if not storeys_match: raise ValueError("Could not extract number of storeys") data["Number of Storeys"] = int(storeys_match.group(1)) # Extract Fuel Bill fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text) if not fuel_bill_match: raise ValueError("Could not extract fuel bill") data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}" # Extract Total Number of Doors total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text) if not total_doors_match: raise ValueError("Could not extract total number of doors") data["Total Number of Doors"] = int(total_doors_match.group(1)) # Extract Number of Insulated Doors insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text) if not insulated_doors_match: raise ValueError("Could not extract number of insulated doors") data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) # lighting data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1)) data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) if not windows_section: raise ValueError("Failed to extract window data.") data["Windows"] = self.extract_window_age_description(windows_section.group(1)) data["Primary Heating"] = self.extract_primary_heating(text) data["Secondary Heating"] = self.extract_secondary_heating_details(text) data["Building Parts"] = self.extract_building_parts(text) data["Roof Details"] = self.extract_roof_details(text) data["Wall Details"] = self.extract_wall_details(text) data["Conservatory"] = self.extract_conservatory(text) water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) if not water_heating_code_match: raise ValueError("Failed to extract water heating code.") data["Water Heating Code"] = water_heating_code_match.group(1).strip() return data class PulseAirPermeabilityExtractor: """ A utility class for extracting specific data from Pulse Air Permeability Test Reports. """ def __init__(self, file_path): self.file_path = file_path @staticmethod def extract_table(text): patterns = { "Air Leakage Rate": r"Air Leakage Rate\s*([\d,@.]+)\s*m/h\s*([\d,@.]+)\s*m3/h", "Air Permeability": r"Air Permeability\s*([\d,@.]+)\s*=.*?\s*([\d,@.]+)\s*m\?/m\?h", "Air Changes per Hour": r"Air Changes per Hour\s*([\d,@.]+)\s*([\d,@.]+)", "Equivalent Leakage Area": r"Equivalent Leakage Area\s*([\d,@.]+)\s*([\d,@.]+)", "Calculation Uncertainty": r"Calculation Uncertainty\s*([\d,@.]+)\s*([\d,@.]+)", } # Initialize results dictionary table_data = [] # Parse each metric using the corresponding regex for metric, pattern in patterns.items(): match = re.search(pattern, text) if match: # Extract the two column values first_value = match.group(1) second_value = match.group(2) # Post-process values: replace '@' with '0' and remove commas first_value = first_value.replace("@", "0").replace(",", "") second_value = second_value.replace("@", "0").replace(",", "") table_data.append( { "Metric": metric, "Measured @ 4PA": first_value, "Extrapolated @ 50PA": second_value, } ) else: raise ValueError(f"Could not extract metric: {metric}") return table_data def extract(self): # Extract the pdf using tesseract logger.info("Extracting data from pdf image - this may take a while...") pages = convert_from_path(self.file_path, dpi=300) # Extract all of the pages text = "" for page in pages: text += image_to_string(page) # We extract the air permeability reading results_table = self.extract_table(text) data = { "Results Table": results_table } return data class ElmhurstProjectHandoverExtractor: """ A utility class for extracting specific data from The Elmhurst Project Handover document """ def __init__(self, file_path): self.file_path = file_path def extract(self): with (open(self.file_path, "rb") as file): reader = PyPDF2.PdfReader(file) text = "" for page in reader.pages: text += page.extract_text() data = {} # Regex patterns patterns = { "Retrofit Coordinator Name": r"Retrofit Coordinator Name:\s*(.+)", "Retrofit Coordinator ID": r"Retrofit Coordinator ID:\s*(\d+)", "Measures Fitted": r"Measure\(s\) Fitted:\s*([\s\S]*?)\nRetrofit Assessor Name:", "Designer Name": r"Designer Name\(s\):\s*(.+)", "Installer Name": r"Installer Name\(s\):\s*(.+)", } # Extract data for key, pattern in patterns.items(): match = re.search(pattern, text) if not match: raise ValueError(f"Could not match {key}") if match: if key == "Measures Fitted": # Special handling for multiline measures measures = re.findall(r"[\u2022\u00b7\u25cf\uf0b7]\s*(.+)", match.group(1)) measures = [m.strip() for m in measures] data[key] = measures else: data[key] = match.group(1).strip() if match else "" return data class CoreLogicPasAssessmentReportExtractor: """ A utility class for extracting specific data from CoreLogic PAS Assessment Reports. """ def __init__(self, file_path): self.file_path = file_path def extract(self): data = {} with pdfplumber.open(self.file_path) as pdf: for page in pdf.pages: tables = page.extract_tables() if tables: # If tables are detected on the page for table in tables: for row in table: # Check if the row contains "Number of bedrooms" if any("Number of bedrooms" in str(cell) for cell in row): # Extract the corresponding value by filtering out None and non-relevant cells for cell in row: if cell and cell.strip().isdigit(): # Check if cell contains a numeric value data["Number of bedrooms"] = int(cell.strip()) break # Stop further processing once value is found return data