Model/utils/file_data_extraction.py
Khalim Conn-Kowlessar 63521dd1e3 extending extraction
2024-11-27 17:18:17 +00:00

469 lines
20 KiB
Python

import PyPDF2
import re
from collections import Counter
from utils.logger import setup_logger
from xml.dom.minidom import parseString
logger = setup_logger()
"""
This script contains functions used to extract data from retrofit survey files, including EPRs,
summary reports, etc
"""
def is_elmhurst_energy_report(text):
"""
Determines if the provided text indicates that the PDF is an Energy Report.
Returns True if the text contains 'Energy Report'.
"""
return text.startswith("ENERGY REPORT")
def is_elmhurst_summary_report(text):
"""
Determines if the provided text indicates that the PDF is a Summary Report.
"""
return text.startswith("Summary Information")
def is_osmosis_condition_report(text):
"""
Determines if the provided text indicates that the PDF is a Condition Report.
"""
return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport")
def is_elmhurst_evidence_report(text):
"""
Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report.
"""
return text.startswith("RdSAP Evidence Report")
def detect_pdf_report_type(pdf_path):
"""
Detects the type of report based on content or filename.
:param pdf_path: String path to the PDF file
:param pdf_file: String name of the PDF file
:return: String type of the report ("epr", "summary", or None)
"""
# Attempt to read the first page of the PDF to determine type
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
first_page_text = reader.pages[0].extract_text() if reader.pages else ""
if is_elmhurst_energy_report(first_page_text):
return "elmhurst epr"
elif is_elmhurst_summary_report(first_page_text):
return "elmhurst summary report"
elif is_osmosis_condition_report(first_page_text):
return "osmosis condition report"
elif is_elmhurst_evidence_report(first_page_text):
return "elmhurst evidence report"
return None
def detect_xml_report_type(xml_path):
"""
Detects the type of XML report based on content or filename.
:param xml_path: String path to the XML file
:return: String type of the report ("full sap xml", or None)
"""
# Attempt to read the first page of the PDF to determine type
with open(xml_path, "r") as file:
contents = file.read()
contents = parseString(contents)
product_tag_search = contents.getElementsByTagName("Product")
if product_tag_search:
if product_tag_search[0].firstChild.nodeValue == "Sap 2012 Desktop":
return "full sap xml"
raise Exception("Not implemented")
def is_pdf(filename):
"""
Determines if the provided filename is a PDF file.
"""
return filename.endswith(".pdf")
def is_xml(filename):
"""
Determines if the provided filename is an XML file.
"""
return filename.endswith(".xml")
class ElmhurstEprExtractor:
"""
A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR).
"""
def __init__(self, file_path):
self.file_path = file_path
@staticmethod
def extract_window_age_description(windows_text):
"""
Extracts the most common window age description and its proportion.
"""
windows_text = windows_text.replace("\n", "")
window_descriptions = [
"Double post or during 2002",
"Double pre 2002",
"Double with unknown install date",
"Secondary glazing",
"Triple glazing",
"Single glazing",
]
description_counts = Counter()
for description in window_descriptions:
matches = re.findall(re.escape(description), windows_text)
description_counts[description] = len(matches)
if not description_counts or not sum(description_counts.values()):
raise ValueError("Failed to extract window data.")
most_common_description, window_count = description_counts.most_common(1)[0]
window_proportion = window_count / sum(description_counts.values()) * 100
if window_proportion == 100:
second_most_common_description = None
second_most_common_proportion = 0
else:
second_most_common_description, second_window_count = description_counts.most_common(2)[1]
second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100
return {
"Window Age Description": most_common_description,
"Window Age Description Proportion (%)": window_proportion,
"Secondary Window Age Description": second_most_common_description,
"Secondary Window Age Description Proportion (%)": second_most_common_proportion,
"Number of Windows": sum(description_counts.values())
}
@staticmethod
def extract_building_parts(text):
"""
Extracts building parts and associated dimensions from the provided text.
"""
data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party "
r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
floor_data = match.group(2)
room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name)
if room_in_roof_match:
floor_area = float(room_in_roof_match.group(1))
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
data.append({
"Building Part": cleaned_part_name,
"Floor Level": "Room in Roof",
"Floor Area (m2)": floor_area,
"Room Height (m)": None,
"Perimeter (m)": None,
"Party Wall Length (m)": None
})
else:
cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip()
floor_pattern = re.compile(
r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
for floor_match in floor_pattern.finditer(floor_data):
floor_level = floor_match.group(1)
floor_area = float(floor_match.group(2))
room_height = float(floor_match.group(3))
perimeter = float(floor_match.group(4))
party_wall_length = float(floor_match.group(5))
data.append({
"Building Part": cleaned_part_name,
"Floor Level": floor_level,
"Floor Area (m2)": floor_area,
"Room Height (m)": room_height,
"Perimeter (m)": perimeter,
"Party Wall Length (m)": party_wall_length
})
return data
@staticmethod
def extract_roof_details(text):
"""
Extracts roof details for each building part in the provided text.
"""
roof_data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details)
roof_data.append({
"Building Part": cleaned_part_name,
"Roof Type": roof_type_match.group(1).strip() if roof_type_match else None,
"Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None,
"Roof Insulation Thickness": roof_insulation_thickness_match.group(
1).strip() if roof_insulation_thickness_match else None,
})
return roof_data
@staticmethod
def extract_wall_details(text):
"""
Extracts wall details for each building part in the provided text.
"""
wall_data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details)
wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
wall_data.append({
"Building Part": cleaned_part_name,
"Wall Type": wall_type_match.group(1).strip() if wall_type_match else None,
"Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None,
"Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None,
"Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None,
})
return wall_data
@staticmethod
def extract_conservatory(text):
"""
Extracts conservatory data from the provided text.
The section is located between "Conservatory" and "Doors".
Args:
text (str): The full text of the EPR PDF.
Returns:
dict: A dictionary with conservatory details:
- "Conservatory Present"
- "Conservatory Separated"
- "Conservatory Floor Area"
- "Conservatory Double Glazed"
- "Conservatory Glazed Perimeter"
- "Heated Conservatory Height"
"""
conservatory_match = re.search(r"Conservatory\s*(.*?)\s*Doors", text, re.DOTALL)
if not conservatory_match:
logger.error("Failed to extract conservatory data.")
raise ValueError("Could not extract conservatory data.")
conservatory_text = conservatory_match.group(1)
# Check if conservatory is present
present_match = re.search(r"Conservatory Present:\s*(Yes|No)", conservatory_text)
if not present_match or present_match.group(1).strip() == "No":
logger.info("Conservatory not present.")
return {
"Conservatory Present": "No",
"Conservatory Separated": "",
"Conservatory Floor Area": 0,
"Conservatory Double Glazed": "",
"Conservatory Glazed Perimeter": 0,
"Heated Conservatory Height": "",
}
# Extract conservatory details
separated_match = re.search(r"Conservatory Separated:\s*(Yes|No)", conservatory_text)
floor_area_match = re.search(r"Conservatory Floor Area:\s*([\d.]+)", conservatory_text)
double_glazed_match = re.search(r"Conservatory Double Glazed:\s*(Yes|No)", conservatory_text)
glazed_perimeter_match = re.search(r"Conservatory Glazed Perimeter:\s*([\d.]+)", conservatory_text)
height_match = re.search(r"Heated Conservatory Height:\s*(.*?)(?=\n|$)", conservatory_text)
return {
"Conservatory Present": "Yes",
"Conservatory Separated": separated_match.group(1).strip() if separated_match else "",
"Conservatory Floor Area": float(floor_area_match.group(1)) if floor_area_match else 0,
"Conservatory Double Glazed": double_glazed_match.group(1).strip() if double_glazed_match else "",
"Conservatory Glazed Perimeter": float(glazed_perimeter_match.group(1)) if glazed_perimeter_match else 0,
"Heated Conservatory Height": height_match.group(1).strip() if height_match else "",
}
@staticmethod
def _extract_heating_details(section_text, default_value=""):
"""
Extracts heating details from a given section of text.
Args:
section_text (str): The section of text containing heating details.
default_value (str, optional): The default value to return for missing fields. Defaults to "".
Returns:
dict: A dictionary containing heating system details.
"""
system_search = re.search(r"Main Heating Code\s*(.*?)\n", section_text)
pcdf_search = re.search(r"PCDF boiler Reference\s*(\d+)", section_text)
controls_search = re.search(r"Main Heating Controls\s*(.*?)\n", section_text)
heat_search = re.search(r"Percentage of Heat\s*(\d+)\s*%?", section_text)
return {
"System": system_search.group(1).strip() if system_search else default_value,
"PCDF Reference": pcdf_search.group(1) if pcdf_search else default_value,
"Controls": controls_search.group(1).strip() if controls_search else default_value,
"% of Heat": int(heat_search.group(1)) if heat_search else 0,
}
def extract_primary_heating(self, text):
# Extract Primary Heating Section (Main Heating 1)
primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL)
# We may not have a secondary heating
primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
return self._extract_heating_details(primary_text)
def extract_secondary_heating_details(self, text):
# Extract Secondary Heating Section (Main Heating 2)
secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL)
output = {}
if secondary_heating_section is None:
output["System"] = ""
output["PCDF Reference"] = ""
output["Controls"] = ""
output["% of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
output.update(
**self._extract_heating_details(secondary_text)
)
output["Heating Code"] = (
re.search(r"Secondary Heating Code\s*(.*?)\n", text).group(1).strip()
if output["System"] and re.search(r"Secondary Heating Code\s*(.*?)\n", text)
else ""
)
return output
def extract(self):
"""
Extracts all relevant data from the EPR PDF.
Returns:
dict: A dictionary containing extracted data, including:
- Address and Postcode
- SAP Rating and Primary Energy Use
- Lighting, Doors, Windows, Roof, and Wall Details
- Heating systems (Primary and Secondary)
- Building Parts
"""
data = {}
with open(self.file_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
text = "".join(page.extract_text() for page in reader.pages)
data["Assessor Name"] = re.search(r"Created by:\s*(.*?)\n", text).group(1).strip()
# Extracting individual components
address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
if not address_match:
logger.error("Failed to extract address.")
raise ValueError("Failed to extract address.")
data["Address"] = address_match.group(1).strip()
data["Postcode"] = data["Address"].split(",")[-1].strip()
sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text)
if not sap_match:
logger.error("Failed to extract SAP rating.")
raise ValueError("Failed to extract SAP rating.")
data["Current SAP Rating"] = int(sap_match.group(1))
energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text)
if not energy_match:
logger.error("Failed to extract primary energy use.")
raise ValueError("Failed to extract primary energy use.")
data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1))
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
if not storeys_match:
logger.error("Failed to extract the number of storeys.")
raise ValueError("Failed to extract the number of storeys.")
data["Number of Storeys"] = int(storeys_match.group(1))
fuel_match = re.search(r"TOTAL\s*£(\d+)", text)
if not fuel_match:
logger.error("Failed to extract fuel bill.")
raise ValueError("Failed to extract fuel bill.")
data["Fuel Bill"] = f"£{fuel_match.group(1)}"
total_doors_match = re.search(r"Total Doors:\s*(\d+)", text)
if not total_doors_match:
logger.error("Failed to extract total doors.")
raise ValueError("Failed to extract total doors.")
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text)
if not insulated_doors_match:
logger.error("Failed to extract insulated doors.")
raise ValueError("Failed to extract insulated doors.")
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
# Get number of lighting outlets and number of fittings needing LEL
lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text)
if not lighting_fittings_match:
logger.error("Failed to extract lighting.")
raise ValueError("Failed to extract lighting")
data["Number of Light Fittings"] = int(lighting_fittings_match.group(1))
lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text)
if not lel_fittings_match:
logger.error("Failed to extract LEL fittings.")
raise ValueError("Failed to extract LEL fittings.")
data["Number of LEL Fittings"] = int(lel_fittings_match.group(1))
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
if not windows_section:
logger.error("Failed to extract window data.")
raise ValueError("Failed to extract window data.")
data["Windows"] = self.extract_window_age_description(windows_section.group(1))
data["Primary Heating"] = self.extract_primary_heating(text)
data["Secondary Heating"] = self.extract_secondary_heating_details(text)
data["Building Parts"] = self.extract_building_parts(text)
data["Roof Details"] = self.extract_roof_details(text)
data["Wall Details"] = self.extract_wall_details(text)
data["Conservatory"] = self.extract_conservatory(text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if not water_heating_code_match:
logger.error("Failed to extract water heating code.")
raise ValueError("Failed to extract water heating code.")
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
return data