Model/utils/file_data_extraction.py
2024-11-27 13:08:03 +00:00

343 lines
15 KiB
Python

import PyPDF2
import re
from collections import Counter
"""
This script contains functions used to extract data from retrofit survey files, including EPRs,
summary reports, etc
"""
def is_elmhurst_energy_report(text):
"""
Determines if the provided text indicates that the PDF is an Energy Report.
Returns True if the text contains 'Energy Report'.
"""
return text.startswith("ENERGY REPORT")
def is_elmhurst_summary_report(text):
"""
Determines if the provided text indicates that the PDF is a Summary Report.
"""
return text.startswith("Summary Information")
def is_osmosis_condition_report(text):
"""
Determines if the provided text indicates that the PDF is a Condition Report.
"""
return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport")
def is_elmhurst_evidence_report(text):
"""
Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report.
"""
return text.startswith("RdSAP Evidence Report")
def detect_pdf_report_type(pdf_path):
"""
Detects the type of report based on content or filename.
:param pdf_path: String path to the PDF file
:param pdf_file: String name of the PDF file
:return: String type of the report ("epr", "summary", or None)
"""
# Attempt to read the first page of the PDF to determine type
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
first_page_text = reader.pages[0].extract_text() if reader.pages else ""
if is_elmhurst_energy_report(first_page_text):
return "elmhurst epr"
elif is_elmhurst_summary_report(first_page_text):
return "elmhurst summary report"
elif is_osmosis_condition_report(first_page_text):
return "osmosis condition report"
elif is_elmhurst_evidence_report(first_page_text):
return "elmhurst evidence report"
return None
def is_pdf(filename):
"""
Determines if the provided filename is a PDF file.
"""
return filename.endswith(".pdf")
class ElmhurstEprExtractor:
def __init__(self, file_path):
self.file_path = file_path
@staticmethod
def extract_window_age_description(windows_text):
"""
Extracts the most common window age description and its proportion.
"""
windows_text = windows_text.replace("\n", "")
window_descriptions = [
"Double post or during 2002",
"Double pre 2002",
"Double with unknown install date",
"Secondary glazing",
"Triple glazing",
"Single glazing",
]
description_counts = Counter()
for description in window_descriptions:
matches = re.findall(re.escape(description), windows_text)
description_counts[description] = len(matches)
if not description_counts or not sum(description_counts.values()):
raise ValueError("Failed to extract window data.")
most_common_description, window_count = description_counts.most_common(1)[0]
window_proportion = window_count / sum(description_counts.values()) * 100
if window_proportion == 100:
second_most_common_description = None
second_most_common_proportion = 0
else:
second_most_common_description, second_window_count = description_counts.most_common(2)[1]
second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100
return {
"Window Age Description": most_common_description,
"Window Age Description Proportion (%)": window_proportion,
"Secondary Window Age Description": second_most_common_description,
"Secondary Window Age Description Proportion (%)": second_most_common_proportion,
"Number of Windows": sum(description_counts.values())
}
@staticmethod
def extract_building_parts(text):
"""
Extracts building parts and associated dimensions from the provided text.
"""
data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party "
r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
floor_data = match.group(2)
room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name)
if room_in_roof_match:
floor_area = float(room_in_roof_match.group(1))
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
data.append({
"Building Part": cleaned_part_name,
"Floor Level": "Room in Roof",
"Floor Area (m2)": floor_area,
"Room Height (m)": None,
"Perimeter (m)": None,
"Party Wall Length (m)": None
})
else:
cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip()
floor_pattern = re.compile(
r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
for floor_match in floor_pattern.finditer(floor_data):
floor_level = floor_match.group(1)
floor_area = float(floor_match.group(2))
room_height = float(floor_match.group(3))
perimeter = float(floor_match.group(4))
party_wall_length = float(floor_match.group(5))
data.append({
"Building Part": cleaned_part_name,
"Floor Level": floor_level,
"Floor Area (m2)": floor_area,
"Room Height (m)": room_height,
"Perimeter (m)": perimeter,
"Party Wall Length (m)": party_wall_length
})
return data
@staticmethod
def extract_roof_details(text):
"""
Extracts roof details for each building part in the provided text.
"""
roof_data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details)
roof_data.append({
"Building Part": cleaned_part_name,
"Roof Type": roof_type_match.group(1).strip() if roof_type_match else None,
"Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None,
"Roof Insulation Thickness": roof_insulation_thickness_match.group(
1).strip() if roof_insulation_thickness_match else None,
})
return roof_data
@staticmethod
def extract_wall_details(text):
"""
Extracts wall details for each building part in the provided text.
"""
wall_data = []
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details)
wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
wall_data.append({
"Building Part": cleaned_part_name,
"Wall Type": wall_type_match.group(1).strip() if wall_type_match else None,
"Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None,
"Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None,
"Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None,
})
return wall_data
@staticmethod
def extract_primary_heating(text):
# Extract Primary Heating Section (Main Heating 1)
primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL)
# We may not have a secondary heating
primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
primary_heating_output = {
"Existing Primary Heating System": re.search(
r"Main Heating Code\s*(.*?)\n", primary_text
).group(1).strip(),
"Existing Primary Heating PCDF Reference": re.search(
r"PCDF boiler Reference\s*(\d+)", primary_text
).group(1),
"Existing Primary Heating Controls": re.search(
r"Main Heating Controls\s*(.*?)\n", primary_text
).group(1).strip(),
"Existing Primary Heating % of Heat": int(
re.search(r"Percentage of Heat\s*(\d+)\s*%?", primary_text).group(1)
)
}
return primary_heating_output
@staticmethod
def extract_secondary_heating(text):
# Extract Secondary Heating Section (Main Heating 2)
secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL)
output = {}
if secondary_heating_section is None:
output["Existing Heating System"] = ""
output["Existing Heating PCDF Reference"] = ""
output["Existing Heating Controls"] = ""
output["Existing Heating % of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
main_heating_code_match_secondary = re.search(
r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
)
output["Existing Heating System"] = main_heating_code_match_secondary.group(1).strip()
output["Existing Heating PCDF Reference"] = re.search(
r"PCDF boiler Reference\s*(\d+)", secondary_text
).group(1)
if output["Existing Heating System"] == "":
output["Existing Heating Controls"] = ""
else:
# Might not have heating controls on 2nd system
secondary_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text)
output["Existing Heating Controls"] = (
secondary_controls_match.group(1).strip() if secondary_controls_match else ""
)
output["Existing Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%?", secondary_text).group(1)
)
return output
def extract(self):
data = {}
with open(self.file_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
text = "".join(page.extract_text() for page in reader.pages)
# Extracting individual components
address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
data["Address"] = address_match.group(1).strip()
data["Postcode"] = data["Address"].split(",")[-1].strip()
sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text)
data["Current SAP Rating"] = int(sap_match.group(1))
energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text)
data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1))
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
data["Number of Storeys"] = int(storeys_match.group(1))
fuel_match = re.search(r"TOTAL\s*£(\d+)", text)
data["Fuel Bill"] = f"£{fuel_match.group(1)}"
total_doors_match = re.search(r"Total Doors:\s*(\d+)", text)
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text)
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
# Get number of lighting outlets and number of fittings needing LEL
lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text)
data["Number of Light Fittings"] = int(lighting_fittings_match.group(1))
lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text)
data["Number of LEL Fittings"] = int(lel_fittings_match.group(1))
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
data["Windows"] = self.extract_window_age_description(windows_section.group(1))
data["Primary Heating"] = self.extract_primary_heating(text)
data["Secondary Heating"] = self.extract_secondary_heating(text)
data["Building Parts"] = self.extract_building_parts(text)
data["Roof Details"] = self.extract_roof_details(text)
data["Wall Details"] = self.extract_wall_details(text)
secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if data["Secondary Heating"]["Existing Heating System"] == "":
data["Secondary Heating Code"] = ""
else:
data["Secondary Heating Code"] = secondary_heating_code_match.group(
1).strip() if secondary_heating_code_match else ""
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
return data