From b4296db52d7b3c3e26ce3869ac31753bd731c379 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 30 Jan 2025 00:51:39 +0000 Subject: [PATCH] adding quidos extraction functions --- .../stonewater/Wave 3 Preparation.py | 7 ++ survey_report/app.py | 44 +++++++++ .../extraction/detect_report_type.py | 19 ++++ survey_report/extraction/quidos.py | 99 +++++++++++++++++++ survey_report/requirements.txt | 0 5 files changed, 169 insertions(+) create mode 100644 survey_report/app.py create mode 100644 survey_report/extraction/detect_report_type.py create mode 100644 survey_report/extraction/quidos.py create mode 100644 survey_report/requirements.txt diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index 7cbf04f1..70c531c0 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -3366,8 +3366,15 @@ def revised_model(): "24 Rabley Wood View": "Wave 2.1 Surveys/3. Wiltshire/24-25 Rabley Wood View" } wates_matching_lookup = [] + # Examples to skip when we cannot get the data + wates_to_skip = [ + "66 Abbatt Close", # File type is unusual, couldn't extract the data + ] for _, home in tqdm(wates_coordination.iterrows(), total=len(wates_coordination)): + if home["Name"] in wates_to_skip: + continue + # Handle the case that has the wrong postcode in the asset data if home["Name"] in wates_manual_filters: filtered = retrofit_assessment_data[ diff --git a/survey_report/app.py b/survey_report/app.py new file mode 100644 index 00000000..825a3658 --- /dev/null +++ b/survey_report/app.py @@ -0,0 +1,44 @@ +import os +import PyPDF2 +from survey_report.extraction.detect_report_type import detect_report_type +from survey_report.extraction.quidos import SiteNotesExtractor + + +def handle(): + """ + Performs the data extraction process for the survey report + :return: + """ + + data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 2" + + folder_contents = os.listdir(data_folder) + # We look for the following files: + # Site notes + file_mapping = {} + for file in folder_contents: + # Check if it's a pdf file + if not file.endswith(".pdf"): + continue + filepath = os.path.join(data_folder, file) + with (open(filepath, "rb") as f): + pdf = PyPDF2.PdfReader(f) + first_page = pdf.pages[0].extract_text() + text = "" + for page in pdf.pages: + text += page.extract_text() + + # Check the report type + report_type = detect_report_type(first_page) + if report_type is not None: + file_mapping[report_type] = text + + # Check the report type + report_type = detect_report_type(os.path.join(data_folder, file)) + + # This is only set up to work with quido site notes so we must have it + if "quidos_site_notes" not in file_mapping: + raise ValueError("No quidos site notes found") + + site_notes_extractor = SiteNotesExtractor(file_mapping["quidos_site_notes"]) + site_notes = site_notes_extractor.extract_all() diff --git a/survey_report/extraction/detect_report_type.py b/survey_report/extraction/detect_report_type.py new file mode 100644 index 00000000..fe1600e7 --- /dev/null +++ b/survey_report/extraction/detect_report_type.py @@ -0,0 +1,19 @@ +import re + + +def detect_report_type(first_page): + """ + Detects the type of report based on the first page of the report + :param first_page: + :return: + """ + # Set up for the minute to handle quidos files. We have the Elmhurst logic so we can introduce + # this when we need + + if re.match( + r"^Created \d{2}/\d{2}/\d{4} for Quidos Ltd using Argyle software BRE approved calculator", + first_page + ): + return "quidos_site_notes" + + return None diff --git a/survey_report/extraction/quidos.py b/survey_report/extraction/quidos.py new file mode 100644 index 00000000..f11ffcb1 --- /dev/null +++ b/survey_report/extraction/quidos.py @@ -0,0 +1,99 @@ +import re + + +class SiteNotesExtractor: + """ + Extracts SAP rating, carbon emissions, and building dimensions from an EPC summary report. + """ + + def __init__(self, pdf_text): + """ + Initializes the SiteNotesExtractor with the extracted PDF text. + """ + self.text = pdf_text + self.data = {} + + def extract_sap_rating(self): + """ + Extracts the current and potential SAP rating from the report. + """ + pattern = re.search(r"Current SAP rating\s*([A-G])\s*(\d+)\s*Potential SAP rating\s*([A-G])\s*(\d+)", self.text) + + if not pattern: + raise ValueError("No SAP rating found in the report") + + self.data.update({ + "Current EPC Band": pattern.group(1), + "Current SAP Rating": int(pattern.group(2)), + "Potential EPC Band": pattern.group(3), + "Potential SAP Rating": int(pattern.group(4)), + }) + + def extract_carbon_emissions(self): + """ + Extracts the current and adjusted annual carbon emissions (TCO2). + """ + pattern = re.search(r"Current annual emissions\s*([\d.]+)\s*\(TCO2\)", self.text) + + if not pattern: + raise ValueError("No carbon emissions found in the report") + + self.data.update({ + "Current Carbon Emissions (TCO2)": float(pattern.group(1)), + }) + + def extract_building_dimensions(self): + """ + Extracts dimensions for each building part and stores them in a list. + Handles Main Property and multiple extensions. + """ + + # Locate the Dimensions section + dimensions_section = re.search( + r"Dimension Type (?:internal|external)\nPart Floor Area \(m2\) Room Height \(m\) Loss Perimeter \(m\) " + r"Party Wall " + r"Length \(m\)\n" + r"(.*?)\n5\.0 Conservatory", self.text, re.DOTALL + ) + + if not dimensions_section: + raise ValueError("Failed to locate the dimensions section in the text.") + + dimensions_text = dimensions_section.group(1) + + # Pattern to match each building part (Main Property, Extension 1, Extension 2, etc.) + building_part_pattern = re.compile( + r"(Main Property|Extension \d+)\s*(?:Property)?\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + + building_parts = [] + for match in building_part_pattern.finditer(dimensions_text): + to_append = { + "Building Part": match.group(1).strip(), + "Part Floor Area (m2)": float(match.group(2)), + "Room Height (m)": float(match.group(3)), + "Loss Perimeter (m)": float(match.group(4)), + "Party Wall Length (m)": float(match.group(5)), + } + # We calculate the heat loss area + to_append["Heat Loss Area (m2)"] = to_append["Loss Perimeter (m)"] * to_append["Room Height (m)"] + building_parts.append(to_append) + + if not building_parts: + raise ValueError("No building dimensions found in the report") + + self.data["Building Dimensions"] = building_parts + # We calculate some totals + self.data["Total Building Dimensions"] = { + "floor_area": sum([part["Part Floor Area (m2)"] for part in building_parts]), + "heat_loss_area": sum([part["Heat Loss Area (m2)"] for part in building_parts]), + } + + def extract_all(self): + """ + Runs all extraction methods and returns a dictionary with extracted data. + """ + self.extract_sap_rating() + self.extract_carbon_emissions() + self.extract_building_dimensions() + return self.data diff --git a/survey_report/requirements.txt b/survey_report/requirements.txt new file mode 100644 index 00000000..e69de29b