diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index bb100ae1..7f4f81e9 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -123,8 +123,10 @@ def extract_summary_report(pdf_path): secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) secondary_text = secondary_heating_section.group(1) - main_heating_code_match = re.search(r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text) - data["Existing Secondary Heating System"] = main_heating_code_match.group(1).strip() + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip() data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", secondary_text).group(1) second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) @@ -299,11 +301,14 @@ def extract_epr(pdf_path): else: secondary_text = secondary_heating_section.group(1) - data["Existing Secondary Heating System"] = re.search( - r"Main Heating Code\s*(.*?)\n", secondary_text - ).group(1).strip() - data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", - secondary_text).group(1) + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip() + + data["Existing Secondary Heating PCDF Reference"] = re.search( + r"PCDF boiler Reference\s*(\d+)", secondary_text + ).group(1) if data["Existing Secondary Heating System"] == "": data["Existing Secondary Heating Controls"] = "" @@ -334,20 +339,57 @@ def extract_epr(pdf_path): return data +def detect_report_type(pdf_path, pdf_file): + """ + Detects the type of report based on content or filename. + :param pdf_path: String path to the PDF file + :param pdf_file: String name of the PDF file + :return: String type of the report ("epr", "summary", or None) + """ + # Attempt to read the first page of the PDF to determine type + with open(pdf_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + first_page_text = reader.pages[0].extract_text() if reader.pages else "" + + if is_energy_report(first_page_text): + return "epr" + elif "summary" in pdf_file.lower() or is_summary_report(first_page_text): + return "summary" + elif is_condition_report(first_page_text): + return "condition" + + return None + + def extract_retrofit_pdfs(data_folder_path): """ Handles extraction from a retrofit data folder if it exists and has content. + Prioritizes extracting data from an EPR if both EPR and summary report are present. """ retrofit_files = [f for f in os.listdir(data_folder_path) if f.endswith(".pdf")] + report_types = {"epr": None, "summary": None} + # First, identify the types of reports available for pdf_file in retrofit_files: pdf_path = os.path.join(data_folder_path, pdf_file) - extracted = detect_and_parse_report(pdf_path, pdf_file) - if extracted is not None: - return extracted - continue + report_type = detect_report_type(pdf_path, pdf_file) - # If no relevant PDF is found, exit + if report_type == "epr": + report_types["epr"] = pdf_path + elif report_type == "summary": + report_types["summary"] = pdf_path + + # Stop checking further if both EPR and summary are found + if report_types["epr"] and report_types["summary"]: + break + + # Extract data based on report availability and priority + if report_types["epr"]: + return extract_epr(report_types["epr"]) + elif report_types["summary"]: + return extract_summary_report(report_types["summary"]) + + # If no relevant PDF is found, return None return None