added ocr extraction for permeability report

This commit is contained in:
Khalim Conn-Kowlessar 2024-11-28 14:02:55 +00:00
parent 5a2ffe646c
commit 5e7827f706
3 changed files with 124 additions and 16 deletions

View file

@ -85,6 +85,10 @@ def handler():
customer_phone = "0345 678 9000"
customer_email = "affordablewarmth@shropshire.gov.uk"
# TODO: In order for this to go live, we need to use Poppler, which needs to be installed
# w/ brew install poppler
# We also need to install Tesseract: brew install tesseract
# List the folders in the source data path
folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))]
@ -94,25 +98,28 @@ def handler():
"osmosis condition report": OsmosisConditionReportParser,
"elmhurst evidence report": None,
"full sap xml": FullSapParser,
"pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor
}
extracted = []
for property_folder in folders:
coordinator_folder = os.path.join(source_data_path, property_folder, "2. RA Coordinator Info")
# Check if this folder exists
if not os.path.exists(coordinator_folder):
coordinator_folder = os.path.join(source_data_path, property_folder, "1. RA Coordinator Info")
property_folder_path = os.path.join(source_data_path, property_folder)
# List the folders in the source data path
subfolders = [
x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x))
]
coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0])
# Get the contents of the folder
coordinator_folder_contents = [
file for file in os.listdir(coordinator_folder) if os.path.isfile(os.path.join(coordinator_folder, file))
file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file))
]
# We detect the various file types
extracted_contents = {}
for filename in coordinator_folder_contents:
filepath = os.path.join(coordinator_folder, filename)
filepath = os.path.join(coord_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type is None:
@ -134,6 +141,24 @@ def handler():
extracted_contents[xml_type] = file_extractor(filepath).extract()
att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0])
att_folder_contents = [
file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file))
]
for filename in att_folder_contents:
filepath = os.path.join(att_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors[report_type]
if file_extractor is None:
continue
extracted_contents[report_type] = file_extractor(filepath).extract()
output_row_data = output_template.copy()
# dict_keys([ 'City/County', 'District/Town',

View file

@ -8,3 +8,6 @@ fuzzywuzzy==0.18.0
python-dotenv
python-docx
pymupdf
pytesseract
pdf2image
pillow

View file

@ -3,6 +3,8 @@ import re
from collections import Counter
from utils.logger import setup_logger
from xml.dom.minidom import parseString
from pdf2image import convert_from_path
from pytesseract import image_to_string
logger = setup_logger()
@ -41,11 +43,17 @@ def is_elmhurst_evidence_report(text):
return text.startswith("RdSAP Evidence Report")
def is_pulse_air_permeability(text):
"""
Determines if the provided text indicates that the PDF is a Pulse Air Permeability Report.
"""
return text.startswith("Air Permeability Test Report @O PULSE")
def detect_pdf_report_type(pdf_path):
"""
Detects the type of report based on content or filename.
:param pdf_path: String path to the PDF file
:param pdf_file: String name of the PDF file
:return: String type of the report ("epr", "summary", or None)
"""
# Attempt to read the first page of the PDF to determine type
@ -53,14 +61,23 @@ def detect_pdf_report_type(pdf_path):
reader = PyPDF2.PdfReader(file)
first_page_text = reader.pages[0].extract_text() if reader.pages else ""
if is_elmhurst_energy_report(first_page_text):
return "elmhurst epr"
elif is_elmhurst_summary_report(first_page_text):
return "elmhurst summary report"
elif is_osmosis_condition_report(first_page_text):
return "osmosis condition report"
elif is_elmhurst_evidence_report(first_page_text):
return "elmhurst evidence report"
if first_page_text == "":
# Convert PDF pages to images
logger.info("Extracting text from PDF images..., this may take a moment.")
pages = convert_from_path(pdf_path, dpi=300)
if pages:
first_page_text = image_to_string(pages[0])
if is_elmhurst_energy_report(first_page_text):
return "elmhurst epr"
elif is_elmhurst_summary_report(first_page_text):
return "elmhurst summary report"
elif is_osmosis_condition_report(first_page_text):
return "osmosis condition report"
elif is_elmhurst_evidence_report(first_page_text):
return "elmhurst evidence report"
elif is_pulse_air_permeability(first_page_text):
return "pulse air permeability"
return None
@ -911,7 +928,7 @@ class ElmhurstSummaryReportExtractor:
# Join non-empty parts with a comma
data["Address"] = ", ".join([part for part in address_parts if part])
data["Postcode"] = postcode.group(1).strip()
data["Postcode"] = postcode
data["Region"] = region
data["House Name"] = house_name
data["House No"] = house_no
@ -977,3 +994,66 @@ class ElmhurstSummaryReportExtractor:
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
return data
class PulseAirPermeabilityExtractor:
"""
A utility class for extracting specific data from Pulse Air Permeability Test Reports.
"""
def __init__(self, file_path):
self.file_path = file_path
@staticmethod
def extract_table(text):
patterns = {
"Air Leakage Rate": r"Air Leakage Rate\s*([\d,@.]+)\s*m/h\s*([\d,@.]+)\s*m3/h",
"Air Permeability": r"Air Permeability\s*([\d,@.]+)\s*=.*?\s*([\d,@.]+)\s*m\?/m\?h",
"Air Changes per Hour": r"Air Changes per Hour\s*([\d,@.]+)\s*([\d,@.]+)",
"Equivalent Leakage Area": r"Equivalent Leakage Area\s*([\d,@.]+)\s*([\d,@.]+)",
"Calculation Uncertainty": r"Calculation Uncertainty\s*([\d,@.]+)\s*([\d,@.]+)",
}
# Initialize results dictionary
table_data = []
# Parse each metric using the corresponding regex
for metric, pattern in patterns.items():
match = re.search(pattern, text)
if match:
# Extract the two column values
first_value = match.group(1)
second_value = match.group(2)
# Post-process values: replace '@' with '0' and remove commas
first_value = first_value.replace("@", "0").replace(",", "")
second_value = second_value.replace("@", "0").replace(",", "")
table_data.append(
{
"Metric": metric,
"Measured @ 4PA": first_value,
"Extrapolated @ 50PA": second_value,
}
)
else:
raise ValueError(f"Could not extract metric: {metric}")
return table_data
def extract(self):
# Extract the pdf using tesseract
logger.info("Extracting data from pdf image - this may take a while...")
pages = convert_from_path(self.file_path, dpi=300)
# Extract all of the pages
text = ""
for page in pages:
text += image_to_string(page)
# We extract the air permeability reading
results_table = self.extract_table(text)
data = {
"Results Table": results_table
}
return data