adding the summary report extraction class

This commit is contained in:
Khalim Conn-Kowlessar 2024-11-28 08:38:38 +00:00
parent 63521dd1e3
commit bcbb43ed8f
4 changed files with 315 additions and 62 deletions

View file

@ -1,6 +1,10 @@
import os
import pandas as pd
import utils.file_data_extraction as file_extraction_tools
from utils.fullSapParser import FullSapParser
from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser
output_template = {
"Property Address": None,
@ -9,6 +13,7 @@ output_template = {
"City/County": None,
"District/Town": None,
"Funding Stream": None,
# "Risk Path": None,
"Local Authority": None,
"Trustmark Lodgement ID": None,
"Certificate Number": None,
@ -18,11 +23,12 @@ output_template = {
"Doors UMR": None,
"Measure Lodgement Date": None,
"Full Lodgement Date": None,
"Name": None,
"Phone": None,
"Email": None,
"Secondary Contact Name": None,
"Secondary Contact Phone": None,
"Owner - Name": None,
"Owner - Phone": None,
"Owner - Email": None,
"Tenant - Name": None,
"Tenant - Phone": None,
"R. Assessor - Name": None,
"Trustmark Licence Number": None,
"Retrofit Assessment Date": None,
"Company Name": None,
@ -30,7 +36,7 @@ output_template = {
"Property Type": None,
"Property Detachment": None,
"No. of Bedrooms": None,
"Property Age": None,
"Property age": None,
"SAP Rating Pre (from IMA)": None,
"Pre Heat Transfer": None,
"Pre Total Floor Area": None,
@ -44,22 +50,6 @@ output_template = {
"Number of Eligible Measures Installed": None,
"Total Cost of Works": None,
"Annual Fuel Saving (MTP)": None,
"Work Type ID": None,
"Measure Category": None,
"Installer": None,
"Operative Name": None,
"Operative Certif. Reference": None,
"Manufacturer": None,
"Model": None,
"Financial Protection Body (IBG)": None,
"Policy Start Date": None,
"IBG Policy Reference": None,
"Warranty Duration": None,
"Total Invoiced (Including VAT)": None,
"Installation Date": None,
"Handover Date": None,
"Percentage": None,
"Reference Number": None,
}
@ -100,14 +90,19 @@ def handler():
extractors = {
"elmhurst epr": file_extraction_tools.ElmhurstEprExtractor,
"elmhurst summary report": None,
"osmosis condition report": None,
"elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor,
"osmosis condition report": OsmosisConditionReportParser,
"elmhurst evidence report": None,
"full sap xml": FullSapParser,
}
extracted = []
for property_folder in folders:
coordinator_folder = os.path.join(source_data_path, property_folder, "2. RA Coordinator Info")
# Check if this folder exists
if not os.path.exists(coordinator_folder):
coordinator_folder = os.path.join(source_data_path, property_folder, "1. RA Coordinator Info")
# Get the contents of the folder
coordinator_folder_contents = [
@ -123,10 +118,10 @@ def handler():
if report_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors.get(report_type)
file_extractor = extractors[report_type]
if file_extractor is None:
continue
extracted_contents[report_type] = file_extractor(filepath).extract()
if file_extraction_tools.is_xml(filepath):
@ -141,24 +136,27 @@ def handler():
output_row_data = output_template.copy()
# dict_keys([, , , 'City/County', 'District/Town',
# 'Local Authority',
# 'Trustmark Lodgement ID',
# 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR',
# 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date',
# 'Name', 'Phone', 'Email', (owner)
# 'Secondary Contact
# Name', 'Secondary Contact Phone', 'Trustmark Licence Number', 'Retrofit Assessment Date', 'Company Name',
# 'Retrofit Designer Name', , 'No. of Bedrooms',
# ,
# 'Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat Transfer', 'Post Total Floor Area',
# 'Post Heat Demand', 'Post Air Tightness', 'Number of Eligible Measures Installed', 'Total Cost of Works',
# 'Annual Fuel Saving (MTP)', 'Work Type ID', 'Measure Category', 'Installer', 'Operative Name', 'Operative
# Certif. Reference', 'Manufacturer', 'Model', 'Financial Protection Body (IBG)', 'Policy Start Date',
# 'IBG Policy Reference', 'Warranty Duration', 'Total Invoiced (Including VAT)', 'Installation Date',
# 'Handover Date', 'Percentage', 'Reference Number'])
# dict_keys([ 'City/County', 'District/Town',
# 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR',
# 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone',
# 'Owner - Email', 'Tenant - Name', 'Tenant - Phone',
# 'Trustmark Licence Number',
# 'Company Name', 'Retrofit Designer Name',
# Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat
# Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness', 'Number of Eligible Measures
# Installed', 'Total Cost of Works', 'Annual Fuel Saving (MTP)'])
# Populate the output row data
if extracted_contents["elmhurst epr"]:
update_dictionary_with_check(
output_row_data,
{
"Funding Stream": funding_stream,
"Property Address": property_folder.split(")")[1].strip(),
"Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(),
}
)
if extracted_contents.get("elmhurst epr"):
total_floor_area = sum(
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] +
# Get the conservatory floor area
@ -170,33 +168,45 @@ def handler():
extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area
)
to_insert = {
"Property Address": property_folder.split(")")[1].strip(),
"Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(),
epr_to_insert = {
"Postcode": extracted_contents["elmhurst epr"]["Postcode"],
"City/County": None,
"District/Town": None,
"Funding Stream": funding_stream,
"Local Authority": None,
'Property Age': extracted_contents["elmhurst epr"]["Property Age"],
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"],
'Pre Heat Transfer': pre_heat_transfer,
'Pre Total Floor Area': total_floor_area,
'Pre Heat Demand': pre_heat_demand,
"R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"],
"Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"],
}
update_dictionary_with_check(
output_row_data,
epr_to_insert
)
output_row_data["Property Address"] = property_folder.split(")")[1].strip()
output_row_data["Osm. ID"] = property_folder.split(")")[0].strip().lstrip("(").strip()
output_row_data["Postcode"] = extracted_contents["elmhurst epr"]["Postcode"]
output_row_data["City/County"] = ()
output_row_data["Batch"] = ()
output_row_data["Funding Stream"] = funding_stream
output_row_data["Risk Path"] = ()
if extracted_contents["full sap xml"]:
to_insert = {
if extracted_contents.get("full sap xml"):
xml_to_insert = {
"Property Type": extracted_contents["full sap xml"]["Property Type"],
"Property Detachment": extracted_contents["full sap xml"]["Built Form"],
"Property Age": extracted_contents["full sap xml"]["Age Band"],
"Property age": extracted_contents["full sap xml"]["Age Band"],
}
update_dictionary_with_check(
output_row_data,
xml_to_insert
)
if extracted_contents.get("osmosis condition report"):
cr_to_insert = {
"No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"],
# "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"],
}
update_dictionary_with_check(
output_row_data,
cr_to_insert
)
extracted.append(output_row_data)
extracted_df = pd.DataFrame(extracted)

View file

@ -5,4 +5,6 @@ openpyxl
boto3
usaddress==0.5.11
fuzzywuzzy==0.18.0
python-dotenv
python-dotenv
python-docx
pymupdf

View file

@ -0,0 +1,49 @@
import re
import boto3
import PyPDF2
import fitz
class OsmosisConditionReportParser:
def __init__(self, filekey, bucket_name=None):
self.s3_client = boto3.client('s3')
self.bucket_name = bucket_name
self.filekey = filekey
self.pdf_text = None
self._read_file()
def _read_file(self):
"""
Reads the XML file either locally or from S3 and parses it using minidom.
Raises:
ValueError: If the file cannot be found, read, or parsed.
"""
chunk_size = 10
try:
if self.bucket_name:
# Read from S3
raise NotImplementedError("Imeplement me")
else:
with fitz.open(self.filekey) as pdf:
text = ""
for page in pdf:
text += page.get_text()
# Parse the XML content using minidom
self.pdf_text = text
except FileNotFoundError:
raise ValueError(f"Local file not found: {self.filekey}")
except Exception as e:
raise ValueError(f"An error occurred while reading or parsing the XML: {e}")
def extract(self):
return {
"No. of Bedrooms": int(re.search(r"No\. of Bedrooms \(Total\)\s*(\d+)", self.pdf_text).group(1)),
"Risk Assessment Pathway": re.search(r"Risk\s*Assessment\s*Pathway\s*([A-Z])", self.pdf_text).group(1)
}

View file

@ -100,8 +100,8 @@ def is_xml(filename):
class ElmhurstEprExtractor:
"""
A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR).
"""
A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR).
"""
def __init__(self, file_path):
self.file_path = file_path
@ -388,6 +388,7 @@ class ElmhurstEprExtractor:
text = "".join(page.extract_text() for page in reader.pages)
data["Assessor Name"] = re.search(r"Created by:\s*(.*?)\n", text).group(1).strip()
data["Assessment Date"] = re.search(r"\nAssessment Date\s*(.*?)\n", text).group(1).strip()
# Extracting individual components
address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
@ -467,3 +468,194 @@ class ElmhurstEprExtractor:
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
return data
class ElmhurstSummaryReportExtractor:
"""
A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR).
"""
def __init__(self, file_path):
self.file_path = file_path
def extract(self):
"""
Extracts specific data from the provided PDF file.
Data includes:
- Current SAP rating
- Fuel Bill
- Address
"""
# Expected keys:
# dict_keys(['Total Number of Doors', 'Number of Insulated
# Doors', 'Number of Light Fittings', 'Number of LEL Fittings', 'Number of fittings needing LEL', 'Windows',
# 'Primary Heating', 'Secondary Heating', 'Building Parts', 'Roof Details', 'Wall Details', 'Conservatory',
# 'Water Heating Code'])
data = {
}
with (open(self.file_path, "rb") as file):
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
# Match and extract
name_match = re.search(r"Name:\s*([A-Za-z\s]+)\s*Title:\s*([A-Za-z\.]+)", text)
if not name_match:
raise ValueError("Couldn't extract surveyor name")
data["Assessor Name"] = name_match.group(2).strip() + " " + name_match.group(1).strip()
data["Assessment Date"] = re.search(r"Inspection Date:\s*(.*?)\n", text).group(1).strip()
# Address and postcode
postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text)
region = re.search(r"Region:\s*(.*?)\nHouse Name:", text)
house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text)
house_no = re.search(r"House No:\s*(.*?)\nStreet:", text)
street = re.search(r"Street:\s*(.*?)\nLocality:", text)
locality = re.search(r"Locality:\s*(.*?)\nTown:", text)
town = re.search(r"Town:\s*(.*?)\nCounty:", text)
county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text)
# Clean extracted values and remove any prefixes
address_parts = [
house_no.group(1).strip() if house_no else "",
house_name.group(1).strip() if house_name else "",
street.group(1).strip() if street else "",
locality.group(1).strip() if locality else "",
town.group(1).strip() if town else "",
county.group(1).strip() if county else "",
region.group(1).strip() if region else "",
postcode.group(1).strip() if postcode else ""
]
# Join non-empty parts with a comma
data["Address"] = ", ".join([part for part in address_parts if part])
data["Postcode"] = postcode.group(1).strip()
# Extract Current SAP rating
sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text)
if not sap_match:
raise ValueError("Could not extract SAP rating")
data["Current SAP Rating"] = sap_match.group(1).split(" ")[1]
# We don't have primary energy in the summary report
data['Primary Energy Use Intensity (kWh/m2/yr)'] = None
# Number of storeys
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
if not storeys_match:
raise ValueError("Could not extract number of storeys")
data["Number of Storeys"] = int(storeys_match.group(1))
# Extract Fuel Bill
fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text)
if not fuel_bill_match:
raise ValueError("Could not extract fuel bill")
data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}"
# Extract Total Number of Doors
total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text)
if not total_doors_match:
raise ValueError("Could not extract total number of doors")
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text)
if not insulated_doors_match:
raise ValueError("Could not extract number of insulated doors")
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
windows_text = windows_section.group(1)
window_data = extract_window_age_description(windows_text)
data.update(window_data)
# Extract heating system
# Extract Primary Heating Data
# Extract Primary Heating Section
primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL)
primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
data["Existing Primary Heating System"] = re.search(r"Main Heating Code\s*(.*?)\n", primary_text).group(
1).strip()
data["Existing Primary Heating PCDF Reference"] = re.search(
r"PCDF boiler Reference\s*(\d+)", primary_text
).group(1)
data["Existing Primary Heating Controls"] = re.search(
r"Main Heating Controls\s*(.*?)\n", primary_text
).group(1).strip()
data["Existing Primary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1)
)
# Extract Secondary Heating Section
secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL)
if secondary_heating_section is None:
data["Existing Secondary Heating System"] = ""
data["Existing Secondary Heating PCDF Reference"] = ""
data["Existing Secondary Heating Controls"] = ""
data["Existing Secondary Heating % of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
main_heating_code_match_secondary = re.search(
r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
)
data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip()
data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)",
secondary_text).group(1)
second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text)
data["Existing Secondary Heating Controls"] = (
second_heating_controls_match.group(1).strip() if second_heating_controls_match else ""
)
data["Existing Secondary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1)
)
# Extract Secondary Heating and Water Heating Codes
secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if data["Existing Secondary Heating System"] == "":
data["Secondary Heating Code"] = ""
else:
data["Secondary Heating Code"] = secondary_heating_code_match.group(
1).strip() if secondary_heating_code_match else ""
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
dimensions = extract_building_parts_summary(text)
data.update(dimensions)
data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1))
data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1))
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
extracted_roof_data = extract_roof_details_summary(text)
main_roof_data = [roof for roof in extracted_roof_data if "Main" in roof["Building Part"]][0]
data["Main Roof Type"] = main_roof_data["Roof Type"]
data["Main Roof Insulation"] = main_roof_data["Roof Insulation"]
data["Main Roof Insulation Thickness"] = main_roof_data["Roof Insulation Thickness"]
walls_data = extract_wall_details_summary(text)
# Get the main building wall data
main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0]
data["Main Wall Type"] = main_building_walls["Wall Type"]
data["Main Wall Insulation"] = main_building_walls["Wall Insulation"]
data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"]
data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"]
data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"]
data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"]
data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"]
data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"]
return data