diff --git a/etl/lodgement/app.py b/etl/lodgement/app.py index 4ff8bdf1..3688ca19 100644 --- a/etl/lodgement/app.py +++ b/etl/lodgement/app.py @@ -1,6 +1,10 @@ import os + +import pandas as pd + import utils.file_data_extraction as file_extraction_tools from utils.fullSapParser import FullSapParser +from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser output_template = { "Property Address": None, @@ -9,6 +13,7 @@ output_template = { "City/County": None, "District/Town": None, "Funding Stream": None, + # "Risk Path": None, "Local Authority": None, "Trustmark Lodgement ID": None, "Certificate Number": None, @@ -18,11 +23,12 @@ output_template = { "Doors UMR": None, "Measure Lodgement Date": None, "Full Lodgement Date": None, - "Name": None, - "Phone": None, - "Email": None, - "Secondary Contact Name": None, - "Secondary Contact Phone": None, + "Owner - Name": None, + "Owner - Phone": None, + "Owner - Email": None, + "Tenant - Name": None, + "Tenant - Phone": None, + "R. Assessor - Name": None, "Trustmark Licence Number": None, "Retrofit Assessment Date": None, "Company Name": None, @@ -30,7 +36,7 @@ output_template = { "Property Type": None, "Property Detachment": None, "No. of Bedrooms": None, - "Property Age": None, + "Property age": None, "SAP Rating Pre (from IMA)": None, "Pre Heat Transfer": None, "Pre Total Floor Area": None, @@ -44,22 +50,6 @@ output_template = { "Number of Eligible Measures Installed": None, "Total Cost of Works": None, "Annual Fuel Saving (MTP)": None, - "Work Type ID": None, - "Measure Category": None, - "Installer": None, - "Operative Name": None, - "Operative Certif. Reference": None, - "Manufacturer": None, - "Model": None, - "Financial Protection Body (IBG)": None, - "Policy Start Date": None, - "IBG Policy Reference": None, - "Warranty Duration": None, - "Total Invoiced (Including VAT)": None, - "Installation Date": None, - "Handover Date": None, - "Percentage": None, - "Reference Number": None, } @@ -100,14 +90,19 @@ def handler(): extractors = { "elmhurst epr": file_extraction_tools.ElmhurstEprExtractor, - "elmhurst summary report": None, - "osmosis condition report": None, + "elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor, + "osmosis condition report": OsmosisConditionReportParser, "elmhurst evidence report": None, "full sap xml": FullSapParser, } + extracted = [] for property_folder in folders: + coordinator_folder = os.path.join(source_data_path, property_folder, "2. RA Coordinator Info") + # Check if this folder exists + if not os.path.exists(coordinator_folder): + coordinator_folder = os.path.join(source_data_path, property_folder, "1. RA Coordinator Info") # Get the contents of the folder coordinator_folder_contents = [ @@ -123,10 +118,10 @@ def handler(): if report_type is None: raise ValueError(f"Unknown report type for {filename}") - file_extractor = extractors.get(report_type) + file_extractor = extractors[report_type] if file_extractor is None: continue - + extracted_contents[report_type] = file_extractor(filepath).extract() if file_extraction_tools.is_xml(filepath): @@ -141,24 +136,27 @@ def handler(): output_row_data = output_template.copy() - # dict_keys([, , , 'City/County', 'District/Town', - # 'Local Authority', - # 'Trustmark Lodgement ID', - # 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR', - # 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', - # 'Name', 'Phone', 'Email', (owner) - # 'Secondary Contact - # Name', 'Secondary Contact Phone', 'Trustmark Licence Number', 'Retrofit Assessment Date', 'Company Name', - # 'Retrofit Designer Name', , 'No. of Bedrooms', - # , - # 'Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat Transfer', 'Post Total Floor Area', - # 'Post Heat Demand', 'Post Air Tightness', 'Number of Eligible Measures Installed', 'Total Cost of Works', - # 'Annual Fuel Saving (MTP)', 'Work Type ID', 'Measure Category', 'Installer', 'Operative Name', 'Operative - # Certif. Reference', 'Manufacturer', 'Model', 'Financial Protection Body (IBG)', 'Policy Start Date', - # 'IBG Policy Reference', 'Warranty Duration', 'Total Invoiced (Including VAT)', 'Installation Date', - # 'Handover Date', 'Percentage', 'Reference Number']) + # dict_keys([ 'City/County', 'District/Town', + # 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR', + # 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone', + # 'Owner - Email', 'Tenant - Name', 'Tenant - Phone', + # 'Trustmark Licence Number', + # 'Company Name', 'Retrofit Designer Name', + # Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat + # Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness', 'Number of Eligible Measures + # Installed', 'Total Cost of Works', 'Annual Fuel Saving (MTP)']) # Populate the output row data - if extracted_contents["elmhurst epr"]: + + update_dictionary_with_check( + output_row_data, + { + "Funding Stream": funding_stream, + "Property Address": property_folder.split(")")[1].strip(), + "Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(), + } + ) + + if extracted_contents.get("elmhurst epr"): total_floor_area = sum( [x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] + # Get the conservatory floor area @@ -170,33 +168,45 @@ def handler(): extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area ) - to_insert = { - "Property Address": property_folder.split(")")[1].strip(), - "Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(), + epr_to_insert = { "Postcode": extracted_contents["elmhurst epr"]["Postcode"], "City/County": None, "District/Town": None, - "Funding Stream": funding_stream, "Local Authority": None, - 'Property Age': extracted_contents["elmhurst epr"]["Property Age"], 'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"], 'Pre Heat Transfer': pre_heat_transfer, 'Pre Total Floor Area': total_floor_area, 'Pre Heat Demand': pre_heat_demand, + "R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"], + "Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"], } + update_dictionary_with_check( + output_row_data, + epr_to_insert + ) - output_row_data["Property Address"] = property_folder.split(")")[1].strip() - output_row_data["Osm. ID"] = property_folder.split(")")[0].strip().lstrip("(").strip() - output_row_data["Postcode"] = extracted_contents["elmhurst epr"]["Postcode"] - output_row_data["City/County"] = () - output_row_data["Batch"] = () - output_row_data["Funding Stream"] = funding_stream - output_row_data["Risk Path"] = () - - if extracted_contents["full sap xml"]: - to_insert = { + if extracted_contents.get("full sap xml"): + xml_to_insert = { "Property Type": extracted_contents["full sap xml"]["Property Type"], "Property Detachment": extracted_contents["full sap xml"]["Built Form"], - "Property Age": extracted_contents["full sap xml"]["Age Band"], + "Property age": extracted_contents["full sap xml"]["Age Band"], } + update_dictionary_with_check( + output_row_data, + xml_to_insert + ) + + if extracted_contents.get("osmosis condition report"): + cr_to_insert = { + "No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"], + # "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"], + } + update_dictionary_with_check( + output_row_data, + cr_to_insert + ) + + extracted.append(output_row_data) + + extracted_df = pd.DataFrame(extracted) diff --git a/etl/lodgement/requirements.txt b/etl/lodgement/requirements.txt index 601907ed..75c63b26 100644 --- a/etl/lodgement/requirements.txt +++ b/etl/lodgement/requirements.txt @@ -5,4 +5,6 @@ openpyxl boto3 usaddress==0.5.11 fuzzywuzzy==0.18.0 -python-dotenv \ No newline at end of file +python-dotenv +python-docx +pymupdf diff --git a/utils/OsmosisCondtionReportParser.py b/utils/OsmosisCondtionReportParser.py new file mode 100644 index 00000000..4d8873a2 --- /dev/null +++ b/utils/OsmosisCondtionReportParser.py @@ -0,0 +1,49 @@ +import re +import boto3 +import PyPDF2 +import fitz + + +class OsmosisConditionReportParser: + + def __init__(self, filekey, bucket_name=None): + self.s3_client = boto3.client('s3') + self.bucket_name = bucket_name + self.filekey = filekey + self.pdf_text = None + + self._read_file() + + def _read_file(self): + """ + Reads the XML file either locally or from S3 and parses it using minidom. + + Raises: + ValueError: If the file cannot be found, read, or parsed. + """ + + chunk_size = 10 + + try: + if self.bucket_name: + # Read from S3 + raise NotImplementedError("Imeplement me") + else: + + with fitz.open(self.filekey) as pdf: + text = "" + for page in pdf: + text += page.get_text() + + # Parse the XML content using minidom + self.pdf_text = text + except FileNotFoundError: + raise ValueError(f"Local file not found: {self.filekey}") + except Exception as e: + raise ValueError(f"An error occurred while reading or parsing the XML: {e}") + + def extract(self): + return { + "No. of Bedrooms": int(re.search(r"No\. of Bedrooms \(Total\)\s*(\d+)", self.pdf_text).group(1)), + "Risk Assessment Pathway": re.search(r"Risk\s*Assessment\s*Pathway\s*([A-Z])", self.pdf_text).group(1) + } diff --git a/utils/file_data_extraction.py b/utils/file_data_extraction.py index ae75735b..2337ea9d 100644 --- a/utils/file_data_extraction.py +++ b/utils/file_data_extraction.py @@ -100,8 +100,8 @@ def is_xml(filename): class ElmhurstEprExtractor: """ - A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). - """ + A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). + """ def __init__(self, file_path): self.file_path = file_path @@ -388,6 +388,7 @@ class ElmhurstEprExtractor: text = "".join(page.extract_text() for page in reader.pages) data["Assessor Name"] = re.search(r"Created by:\s*(.*?)\n", text).group(1).strip() + data["Assessment Date"] = re.search(r"\nAssessment Date\s*(.*?)\n", text).group(1).strip() # Extracting individual components address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL) @@ -467,3 +468,194 @@ class ElmhurstEprExtractor: data["Water Heating Code"] = water_heating_code_match.group(1).strip() return data + + +class ElmhurstSummaryReportExtractor: + """ + A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). + """ + + def __init__(self, file_path): + self.file_path = file_path + + def extract(self): + """ + Extracts specific data from the provided PDF file. + Data includes: + - Current SAP rating + - Fuel Bill + - Address + """ + + # Expected keys: + # dict_keys(['Total Number of Doors', 'Number of Insulated + # Doors', 'Number of Light Fittings', 'Number of LEL Fittings', 'Number of fittings needing LEL', 'Windows', + # 'Primary Heating', 'Secondary Heating', 'Building Parts', 'Roof Details', 'Wall Details', 'Conservatory', + # 'Water Heating Code']) + + data = { + + } + + with (open(self.file_path, "rb") as file): + reader = PyPDF2.PdfReader(file) + text = "" + for page in reader.pages: + text += page.extract_text() + + # Match and extract + name_match = re.search(r"Name:\s*([A-Za-z\s]+)\s*Title:\s*([A-Za-z\.]+)", text) + if not name_match: + raise ValueError("Couldn't extract surveyor name") + data["Assessor Name"] = name_match.group(2).strip() + " " + name_match.group(1).strip() + data["Assessment Date"] = re.search(r"Inspection Date:\s*(.*?)\n", text).group(1).strip() + + # Address and postcode + postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text) + region = re.search(r"Region:\s*(.*?)\nHouse Name:", text) + house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text) + house_no = re.search(r"House No:\s*(.*?)\nStreet:", text) + street = re.search(r"Street:\s*(.*?)\nLocality:", text) + locality = re.search(r"Locality:\s*(.*?)\nTown:", text) + town = re.search(r"Town:\s*(.*?)\nCounty:", text) + county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text) + + # Clean extracted values and remove any prefixes + address_parts = [ + house_no.group(1).strip() if house_no else "", + house_name.group(1).strip() if house_name else "", + street.group(1).strip() if street else "", + locality.group(1).strip() if locality else "", + town.group(1).strip() if town else "", + county.group(1).strip() if county else "", + region.group(1).strip() if region else "", + postcode.group(1).strip() if postcode else "" + ] + + # Join non-empty parts with a comma + data["Address"] = ", ".join([part for part in address_parts if part]) + data["Postcode"] = postcode.group(1).strip() + + # Extract Current SAP rating + sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text) + if not sap_match: + raise ValueError("Could not extract SAP rating") + data["Current SAP Rating"] = sap_match.group(1).split(" ")[1] + + # We don't have primary energy in the summary report + data['Primary Energy Use Intensity (kWh/m2/yr)'] = None + + # Number of storeys + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + if not storeys_match: + raise ValueError("Could not extract number of storeys") + data["Number of Storeys"] = int(storeys_match.group(1)) + + # Extract Fuel Bill + fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text) + if not fuel_bill_match: + raise ValueError("Could not extract fuel bill") + data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}" + + # Extract Total Number of Doors + total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text) + if not total_doors_match: + raise ValueError("Could not extract total number of doors") + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text) + if not insulated_doors_match: + raise ValueError("Could not extract number of insulated doors") + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + windows_text = windows_section.group(1) + window_data = extract_window_age_description(windows_text) + data.update(window_data) + + # Extract heating system + # Extract Primary Heating Data + # Extract Primary Heating Section + primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL) + primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + + primary_text = primary_heating_section.group(1) + + data["Existing Primary Heating System"] = re.search(r"Main Heating Code\s*(.*?)\n", primary_text).group( + 1).strip() + data["Existing Primary Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", primary_text + ).group(1) + data["Existing Primary Heating Controls"] = re.search( + r"Main Heating Controls\s*(.*?)\n", primary_text + ).group(1).strip() + data["Existing Primary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1) + ) + + # Extract Secondary Heating Section + secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + + if secondary_heating_section is None: + data["Existing Secondary Heating System"] = "" + data["Existing Secondary Heating PCDF Reference"] = "" + data["Existing Secondary Heating Controls"] = "" + data["Existing Secondary Heating % of Heat"] = 0 + + else: + secondary_text = secondary_heating_section.group(1) + + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip() + data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", + secondary_text).group(1) + second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) + data["Existing Secondary Heating Controls"] = ( + second_heating_controls_match.group(1).strip() if second_heating_controls_match else "" + ) + data["Existing Secondary Heating % of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1) + ) + + # Extract Secondary Heating and Water Heating Codes + secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + + if data["Existing Secondary Heating System"] == "": + data["Secondary Heating Code"] = "" + else: + data["Secondary Heating Code"] = secondary_heating_code_match.group( + 1).strip() if secondary_heating_code_match else "" + + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + dimensions = extract_building_parts_summary(text) + data.update(dimensions) + + data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1)) + data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) + data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + extracted_roof_data = extract_roof_details_summary(text) + main_roof_data = [roof for roof in extracted_roof_data if "Main" in roof["Building Part"]][0] + data["Main Roof Type"] = main_roof_data["Roof Type"] + data["Main Roof Insulation"] = main_roof_data["Roof Insulation"] + data["Main Roof Insulation Thickness"] = main_roof_data["Roof Insulation Thickness"] + + walls_data = extract_wall_details_summary(text) + # Get the main building wall data + main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0] + data["Main Wall Type"] = main_building_walls["Wall Type"] + data["Main Wall Insulation"] = main_building_walls["Wall Insulation"] + data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"] + data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"] + data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"] + data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"] + data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"] + data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"] + + return data