Model/utils/file_data_extraction.py
2024-11-27 13:30:36 +00:00

369 lines
16 KiB
Python

import PyPDF2
import re
from collections import Counter
"""
This script contains functions used to extract data from retrofit survey files, including EPRs,
summary reports, etc
"""
def is_elmhurst_energy_report(text):
"""
Determines if the provided text indicates that the PDF is an Energy Report.
Returns True if the text contains 'Energy Report'.
"""
return text.startswith("ENERGY REPORT")
def is_elmhurst_summary_report(text):
"""
Determines if the provided text indicates that the PDF is a Summary Report.
"""
return text.startswith("Summary Information")
def is_osmosis_condition_report(text):
"""
Determines if the provided text indicates that the PDF is a Condition Report.
"""
return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport")
def is_elmhurst_evidence_report(text):
"""
Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report.
"""
return text.startswith("RdSAP Evidence Report")
def detect_pdf_report_type(pdf_path):
"""
Detects the type of report based on content or filename.
:param pdf_path: String path to the PDF file
:param pdf_file: String name of the PDF file
:return: String type of the report ("epr", "summary", or None)
"""
# Attempt to read the first page of the PDF to determine type
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
first_page_text = reader.pages[0].extract_text() if reader.pages else ""
if is_elmhurst_energy_report(first_page_text):
return "elmhurst epr"
elif is_elmhurst_summary_report(first_page_text):
return "elmhurst summary report"
elif is_osmosis_condition_report(first_page_text):
return "osmosis condition report"
elif is_elmhurst_evidence_report(first_page_text):
return "elmhurst evidence report"
return None
def is_pdf(filename):
"""
Determines if the provided filename is a PDF file.
"""
return filename.endswith(".pdf")
class ElmhurstEprExtractor:
"""
A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR).
"""
def __init__(self, file_path):
self.file_path = file_path
@staticmethod
def extract_window_age_description(windows_text):
"""
Extracts the most common window age description and its proportion.
"""
windows_text = windows_text.replace("\n", "")
window_descriptions = [
"Double post or during 2002",
"Double pre 2002",
"Double with unknown install date",
"Secondary glazing",
"Triple glazing",
"Single glazing",
]
description_counts = Counter()
for description in window_descriptions:
matches = re.findall(re.escape(description), windows_text)
description_counts[description] = len(matches)
if not description_counts or not sum(description_counts.values()):
raise ValueError("Failed to extract window data.")
most_common_description, window_count = description_counts.most_common(1)[0]
window_proportion = window_count / sum(description_counts.values()) * 100
if window_proportion == 100:
second_most_common_description = None
second_most_common_proportion = 0
else:
second_most_common_description, second_window_count = description_counts.most_common(2)[1]
second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100
return {
"Window Age Description": most_common_description,
"Window Age Description Proportion (%)": window_proportion,
"Secondary Window Age Description": second_most_common_description,
"Secondary Window Age Description Proportion (%)": second_most_common_proportion,
"Number of Windows": sum(description_counts.values())
}
@staticmethod
def extract_building_parts(text):
"""
Extracts building parts and associated dimensions from the provided text.
"""
data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party "
r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
floor_data = match.group(2)
room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name)
if room_in_roof_match:
floor_area = float(room_in_roof_match.group(1))
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
data.append({
"Building Part": cleaned_part_name,
"Floor Level": "Room in Roof",
"Floor Area (m2)": floor_area,
"Room Height (m)": None,
"Perimeter (m)": None,
"Party Wall Length (m)": None
})
else:
cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip()
floor_pattern = re.compile(
r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
for floor_match in floor_pattern.finditer(floor_data):
floor_level = floor_match.group(1)
floor_area = float(floor_match.group(2))
room_height = float(floor_match.group(3))
perimeter = float(floor_match.group(4))
party_wall_length = float(floor_match.group(5))
data.append({
"Building Part": cleaned_part_name,
"Floor Level": floor_level,
"Floor Area (m2)": floor_area,
"Room Height (m)": room_height,
"Perimeter (m)": perimeter,
"Party Wall Length (m)": party_wall_length
})
return data
@staticmethod
def extract_roof_details(text):
"""
Extracts roof details for each building part in the provided text.
"""
roof_data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details)
roof_data.append({
"Building Part": cleaned_part_name,
"Roof Type": roof_type_match.group(1).strip() if roof_type_match else None,
"Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None,
"Roof Insulation Thickness": roof_insulation_thickness_match.group(
1).strip() if roof_insulation_thickness_match else None,
})
return roof_data
@staticmethod
def extract_wall_details(text):
"""
Extracts wall details for each building part in the provided text.
"""
wall_data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details)
wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
wall_data.append({
"Building Part": cleaned_part_name,
"Wall Type": wall_type_match.group(1).strip() if wall_type_match else None,
"Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None,
"Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None,
"Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None,
})
return wall_data
@staticmethod
def _extract_heating_details(section_text):
"""
Extracts heating details from a given section of text.
Args:
section_text (str): The section of text containing heating details.
Returns:
dict: A dictionary containing heating system details.
"""
system_search = re.search(r"Main Heating Code\s*(.*?)\n", section_text)
pcdf_search = re.search(r"PCDF boiler Reference\s*(\d+)", section_text)
controls_search = re.search(r"Main Heating Controls\s*(.*?)\n", section_text)
heat_search = re.search(r"Percentage of Heat\s*(\d+)\s*%?", section_text)
return {
"System": system_search.group(1).strip() if system_search else "",
"PCDF Reference": pcdf_search.group(1) if pcdf_search else "",
"Controls": controls_search.group(1).strip() if controls_search else "",
"% of Heat": int(heat_search.group(1)) if heat_search else 0,
}
def extract_primary_heating(self, text):
# Extract Primary Heating Section (Main Heating 1)
primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL)
# We may not have a secondary heating
primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
return self._extract_heating_details(primary_text)
def extract_secondary_heating(self, text):
# Extract Secondary Heating Section (Main Heating 2)
secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL)
output = {}
if secondary_heating_section is None:
output["System"] = ""
output[" PCDF Reference"] = ""
output["Controls"] = ""
output["% of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
output.update(
**self._extract_heating_details(secondary_text)
)
output["Heating Code"] = (
re.search(r"Secondary Heating Code\s*(.*?)\n", text).group(1).strip()
if output["System"] and re.search(r"Secondary Heating Code\s*(.*?)\n", text)
else ""
)
return output
def extract(self):
"""
Extracts all relevant data from the EPR PDF.
Returns:
dict: A dictionary containing extracted data, including:
- Address and Postcode
- SAP Rating and Primary Energy Use
- Lighting, Doors, Windows, Roof, and Wall Details
- Heating systems (Primary and Secondary)
- Building Parts
"""
data = {}
with open(self.file_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
text = "".join(page.extract_text() for page in reader.pages)
# Extracting individual components
address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
if not address_match:
raise ValueError("Failed to extract address.")
data["Address"] = address_match.group(1).strip()
data["Postcode"] = data["Address"].split(",")[-1].strip()
sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text)
if not sap_match:
raise ValueError("Failed to extract SAP rating.")
data["Current SAP Rating"] = int(sap_match.group(1))
energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text)
if not energy_match:
raise ValueError("Failed to extract primary energy use.")
data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1))
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
if not storeys_match:
raise ValueError("Failed to extract number of storeys.")
data["Number of Storeys"] = int(storeys_match.group(1))
fuel_match = re.search(r"TOTAL\s*£(\d+)", text)
if not fuel_match:
raise ValueError("Failed to extract fuel bill.")
data["Fuel Bill"] = f"£{fuel_match.group(1)}"
total_doors_match = re.search(r"Total Doors:\s*(\d+)", text)
if not total_doors_match:
raise ValueError("Failed to extract total doors.")
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text)
if not insulated_doors_match:
raise ValueError("Failed to extract insulated doors.")
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
# Get number of lighting outlets and number of fittings needing LEL
lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text)
if not lighting_fittings_match:
raise ValueError("Failed to extract lighting")
data["Number of Light Fittings"] = int(lighting_fittings_match.group(1))
lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text)
if not lel_fittings_match:
raise ValueError("Failed to extract LEL fittings.")
data["Number of LEL Fittings"] = int(lel_fittings_match.group(1))
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
if not windows_section:
raise ValueError("Failed to extract window data.")
data["Windows"] = self.extract_window_age_description(windows_section.group(1))
data["Primary Heating"] = self.extract_primary_heating(text)
data["Secondary Heating"] = self.extract_secondary_heating(text)
data["Building Parts"] = self.extract_building_parts(text)
data["Roof Details"] = self.extract_roof_details(text)
data["Wall Details"] = self.extract_wall_details(text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if not water_heating_code_match:
raise ValueError("Failed to extract water heating code.")
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
return data