diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index 81b5915f..aa9e4488 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -291,26 +291,11 @@ def extract_summary_report(pdf_path): data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] - roof_section = re.search(r"8\.0 Roofs:\n(.*?)\n9\.0 Floors:", text, re.DOTALL) - roof_text = roof_section.group(1).strip() - roof_type_match = re.search(r"Type\s*([A-Za-z0-9\s]+)", roof_text) - data["Main Roof Type"] = roof_type_match.group(1).strip() if roof_type_match else None - - # Check if "Insulation" exists between Type and Insulation Thickness - insulation_search = re.search( - r"Type\s+.*?\n(Insulation\s+(.*?)\n)?(Insulation Thickness\s+(.*?)\n)", roof_text, re.DOTALL - ) - - if insulation_search: - # Insulation match will be present if it exists, otherwise it will be None - insulation_match = insulation_search.group(2) # Optional group for Insulation - insulation_thickness_match = insulation_search.group(4) # Required group for Insulation Thickness - - # Populate insulation fields - data["Main Roof Insulation"] = insulation_match.strip() if insulation_match else None - data["Main Roof Insulation Thickness"] = ( - insulation_thickness_match.strip() if insulation_thickness_match else None - ) + extracted_roof_data = extract_roof_details_summary(text) + main_roof_data = [roof for roof in extracted_roof_data if "Main" in roof["Building Part"]][0] + data["Main Roof Type"] = main_roof_data["Roof Type"] + data["Main Roof Insulation"] = main_roof_data["Roof Insulation"] + data["Main Roof Insulation Thickness"] = main_roof_data["Roof Insulation Thickness"] walls_data = extract_wall_details_summary(text) # Get the main building wall data @@ -593,6 +578,54 @@ def extract_roof_details_epr(text): return roof_data +def extract_roof_details_summary(text): + """ + Extracts roof type, insulation, and insulation thickness for each building part + in the 8.0 Roofs section of the summary report. + """ + # Define data structure to hold results + roof_data = [] + + # Locate the entire 8.0 Roofs section + roof_section_match = re.search(r"8\.0 Roofs:\n(.*?)(?=\n9\.0 Floors:|$)", text, re.DOTALL) + if not roof_section_match: + return roof_data # Return empty if no roof section is found + + # Extract the roof section and append "9.0 Floors:" as the boundary + roof_section = roof_section_match.group(1).strip() + "\n9.0 Floors:" + + # Define pattern to match each building part's roof entry + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)(?=\n(?:Insulation|9\.0 Floors:|[A-Z]))" # Matches Roof Type until the next field, label, or end + r"(?:\nInsulation\s+(.*?)(?=\n(?:Insulation Thickness|9\.0 Floors:|[A-Z])))?" # Optional Insulation + r"(?:\nInsulation Thickness\s+(.*?)(?=\n(?:9\.0 Floors:|[A-Z])))?", # Optional Insulation Thickness + re.DOTALL + ) + + # Extract each building part's data + for match in building_part_pattern.finditer(roof_section): + part_name = match.group(1).strip() # Building part label + roof_type = match.group(2).strip() # Roof Type + roof_insulation = match.group(3).strip() if match.group(3) else None # Optional Insulation + roof_insulation_thickness = match.group(4).strip() if match.group(4) else None # Optional Thickness + + # Cleaning to handle annoying cases when it comes out like this: + # 'A Another dwelling above\n1st Extension' + if roof_type.startswith("A Another dwelling above"): + roof_type = "A Another dwelling above" + + # Store results for this building part + roof_data.append({ + "Building Part": part_name, + "Roof Type": roof_type, + "Roof Insulation": roof_insulation, + "Roof Insulation Thickness": roof_insulation_thickness, + }) + + return roof_data + + def extract_wall_details_epr(text): """ Extracts wall type, insulation, dry-lining, and thickness for each building part @@ -1691,21 +1724,21 @@ def propsed_wave_3_sample(): ] ].rename( columns={ - "Existing Primary Heating System": "Surveyed Primary Heating System" + "Existing Primary Heating System": "Survey: Primary Heating System" } ) # Concatenate from the wall information - survey_results["Surveyed: Wall Type"] = survey_results["Main Wall Type"] + ": " + survey_results[ - "Main Wall Insulation Type"] + survey_results["Survey: Main Wall Type"] = survey_results["Main Wall Type"].astype(str) + ": " + survey_results[ + "Main Wall Insulation Type"].astype(str) # Alternative wall survey_results["Survey: Main Alternative Wall"] = ( - survey_results["Main Building Alternative Wall Type"] + ": " + survey_results[ - "Main Building Alternative Wall Insulation"] + survey_results["Main Building Alternative Wall Type"].astype(str) + ": " + survey_results[ + "Main Building Alternative Wall Insulation"].astype(str) ) # Roof information - survey_results["Survey: Type"] = survey_results["Main Roof Type"] + ": " + survey_results[ - "Main Roof Insulation"] + ": " + survey_results["Main Roof Insulation Thickness"].astype(str) + survey_results["Survey: Main Roof Type"] = survey_results["Main Roof Type"].astype(str) + ": " + survey_results[ + "Main Roof Insulation"].astype(str) + ": " + survey_results["Main Roof Insulation Thickness"].astype(str) # Drop the individual columns: survey_results = survey_results.drop( @@ -1834,6 +1867,11 @@ def propsed_wave_3_sample(): return surveyed + survey_attribute_columns = [ + "Survey: Main Wall Type", 'Survey: Main Alternative Wall', 'Survey: Main Roof Type', + 'Survey: Primary Heating System' + ] + results = [] for region in tqdm(unique_postal_regions): # Take all of the properties in that region @@ -1845,7 +1883,8 @@ def propsed_wave_3_sample(): ] region_assets = region_assets.merge( - exact_surveyed[["Address ID", "Current EPC Band"]], + exact_surveyed[ + ["Address ID", "Current EPC Band", "Current SAP Rating"] + survey_attribute_columns], on="Address ID", how="left" ) @@ -2286,6 +2325,11 @@ def propsed_wave_3_sample(): results = pd.concat(results) + # Check if there are missings in current epc band, current sap rating or any of the survey attributes + for c in ["Current EPC Band", "Current SAP Rating"] + survey_attribute_columns: + if pd.isnull(results[c]).sum(): + raise Exception("Something went wrong") + # home = results[results["Confidence Tier"] == "5 - EPC C or above"].sample(1) # region = home["Postal Region"].values[0]