diff --git a/etl/lodgement/app.py b/etl/lodgement/app.py index 629c10e0..6fe9fdc4 100644 --- a/etl/lodgement/app.py +++ b/etl/lodgement/app.py @@ -85,6 +85,10 @@ def handler(): customer_phone = "0345 678 9000" customer_email = "affordablewarmth@shropshire.gov.uk" + # TODO: In order for this to go live, we need to use Poppler, which needs to be installed + # w/ brew install poppler + # We also need to install Tesseract: brew install tesseract + # List the folders in the source data path folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))] @@ -94,25 +98,28 @@ def handler(): "osmosis condition report": OsmosisConditionReportParser, "elmhurst evidence report": None, "full sap xml": FullSapParser, + "pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor } extracted = [] for property_folder in folders: - coordinator_folder = os.path.join(source_data_path, property_folder, "2. RA Coordinator Info") - # Check if this folder exists - if not os.path.exists(coordinator_folder): - coordinator_folder = os.path.join(source_data_path, property_folder, "1. RA Coordinator Info") + property_folder_path = os.path.join(source_data_path, property_folder) + # List the folders in the source data path + subfolders = [ + x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x)) + ] + coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0]) # Get the contents of the folder coordinator_folder_contents = [ - file for file in os.listdir(coordinator_folder) if os.path.isfile(os.path.join(coordinator_folder, file)) + file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file)) ] # We detect the various file types extracted_contents = {} for filename in coordinator_folder_contents: - filepath = os.path.join(coordinator_folder, filename) + filepath = os.path.join(coord_folder, filename) if file_extraction_tools.is_pdf(filepath): report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) if report_type is None: @@ -134,6 +141,24 @@ def handler(): extracted_contents[xml_type] = file_extractor(filepath).extract() + att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0]) + att_folder_contents = [ + file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file)) + ] + + for filename in att_folder_contents: + filepath = os.path.join(att_folder, filename) + if file_extraction_tools.is_pdf(filepath): + report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) + if report_type is None: + raise ValueError(f"Unknown report type for {filename}") + file_extractor = extractors[report_type] + + if file_extractor is None: + continue + + extracted_contents[report_type] = file_extractor(filepath).extract() + output_row_data = output_template.copy() # dict_keys([ 'City/County', 'District/Town', diff --git a/etl/lodgement/requirements.txt b/etl/lodgement/requirements.txt index 75c63b26..09e475fe 100644 --- a/etl/lodgement/requirements.txt +++ b/etl/lodgement/requirements.txt @@ -8,3 +8,6 @@ fuzzywuzzy==0.18.0 python-dotenv python-docx pymupdf +pytesseract +pdf2image +pillow diff --git a/utils/file_data_extraction.py b/utils/file_data_extraction.py index c3cc8a10..c60f01b4 100644 --- a/utils/file_data_extraction.py +++ b/utils/file_data_extraction.py @@ -3,6 +3,8 @@ import re from collections import Counter from utils.logger import setup_logger from xml.dom.minidom import parseString +from pdf2image import convert_from_path +from pytesseract import image_to_string logger = setup_logger() @@ -41,11 +43,17 @@ def is_elmhurst_evidence_report(text): return text.startswith("RdSAP Evidence Report") +def is_pulse_air_permeability(text): + """ + Determines if the provided text indicates that the PDF is a Pulse Air Permeability Report. + """ + return text.startswith("Air Permeability Test Report @O PULSE") + + def detect_pdf_report_type(pdf_path): """ Detects the type of report based on content or filename. :param pdf_path: String path to the PDF file - :param pdf_file: String name of the PDF file :return: String type of the report ("epr", "summary", or None) """ # Attempt to read the first page of the PDF to determine type @@ -53,14 +61,23 @@ def detect_pdf_report_type(pdf_path): reader = PyPDF2.PdfReader(file) first_page_text = reader.pages[0].extract_text() if reader.pages else "" - if is_elmhurst_energy_report(first_page_text): - return "elmhurst epr" - elif is_elmhurst_summary_report(first_page_text): - return "elmhurst summary report" - elif is_osmosis_condition_report(first_page_text): - return "osmosis condition report" - elif is_elmhurst_evidence_report(first_page_text): - return "elmhurst evidence report" + if first_page_text == "": + # Convert PDF pages to images + logger.info("Extracting text from PDF images..., this may take a moment.") + pages = convert_from_path(pdf_path, dpi=300) + if pages: + first_page_text = image_to_string(pages[0]) + + if is_elmhurst_energy_report(first_page_text): + return "elmhurst epr" + elif is_elmhurst_summary_report(first_page_text): + return "elmhurst summary report" + elif is_osmosis_condition_report(first_page_text): + return "osmosis condition report" + elif is_elmhurst_evidence_report(first_page_text): + return "elmhurst evidence report" + elif is_pulse_air_permeability(first_page_text): + return "pulse air permeability" return None @@ -911,7 +928,7 @@ class ElmhurstSummaryReportExtractor: # Join non-empty parts with a comma data["Address"] = ", ".join([part for part in address_parts if part]) - data["Postcode"] = postcode.group(1).strip() + data["Postcode"] = postcode data["Region"] = region data["House Name"] = house_name data["House No"] = house_no @@ -977,3 +994,66 @@ class ElmhurstSummaryReportExtractor: data["Water Heating Code"] = water_heating_code_match.group(1).strip() return data + + +class PulseAirPermeabilityExtractor: + """ + A utility class for extracting specific data from Pulse Air Permeability Test Reports. + """ + + def __init__(self, file_path): + self.file_path = file_path + + @staticmethod + def extract_table(text): + patterns = { + "Air Leakage Rate": r"Air Leakage Rate\s*([\d,@.]+)\s*m/h\s*([\d,@.]+)\s*m3/h", + "Air Permeability": r"Air Permeability\s*([\d,@.]+)\s*=.*?\s*([\d,@.]+)\s*m\?/m\?h", + "Air Changes per Hour": r"Air Changes per Hour\s*([\d,@.]+)\s*([\d,@.]+)", + "Equivalent Leakage Area": r"Equivalent Leakage Area\s*([\d,@.]+)\s*([\d,@.]+)", + "Calculation Uncertainty": r"Calculation Uncertainty\s*([\d,@.]+)\s*([\d,@.]+)", + } + + # Initialize results dictionary + table_data = [] + + # Parse each metric using the corresponding regex + for metric, pattern in patterns.items(): + match = re.search(pattern, text) + if match: + # Extract the two column values + first_value = match.group(1) + second_value = match.group(2) + + # Post-process values: replace '@' with '0' and remove commas + first_value = first_value.replace("@", "0").replace(",", "") + second_value = second_value.replace("@", "0").replace(",", "") + + table_data.append( + { + "Metric": metric, + "Measured @ 4PA": first_value, + "Extrapolated @ 50PA": second_value, + } + ) + else: + raise ValueError(f"Could not extract metric: {metric}") + + return table_data + + def extract(self): + # Extract the pdf using tesseract + logger.info("Extracting data from pdf image - this may take a while...") + pages = convert_from_path(self.file_path, dpi=300) + # Extract all of the pages + text = "" + for page in pages: + text += image_to_string(page) + + # We extract the air permeability reading + results_table = self.extract_table(text) + data = { + "Results Table": results_table + } + + return data