diff --git a/.idea/Model.iml b/.idea/Model.iml
index 762580d9..9b63b142 100644
--- a/.idea/Model.iml
+++ b/.idea/Model.iml
@@ -7,7 +7,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index c916a158..acd935c1 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/etl/lodgement/app.py b/etl/lodgement/app.py
new file mode 100644
index 00000000..ede644b8
--- /dev/null
+++ b/etl/lodgement/app.py
@@ -0,0 +1,47 @@
+import os
+import utils.file_data_extraction as file_extraction_tools
+
+
+def handler():
+ """
+ This is a simple application that will extract the data from documents that have been uploaded to Sharepoint
+ to populate the lodgement spreadsheet with
+ :return:
+ """
+
+ # Ths source data will eventually come from Sharepoint
+ source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot"
+ output_template = "Trustmark Details - Template REV.25.11.24.xlsx"
+
+ # List the folders in the source data path
+ folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))]
+
+ extractors = {
+ "elmhurst epr": file_extraction_tools.ElmhurstEprExtractor,
+ "elmhurst summary report": None,
+ "osmosis condition report": None,
+ "elmhurst evidence report": None
+ }
+
+ for property_folder in folders:
+ coordinator_folder = os.path.join(source_data_path, property_folder, "2. RA Coordinator Info")
+
+ # Get the contents of the folder
+ coordinator_folder_contents = [
+ file for file in os.listdir(coordinator_folder) if os.path.isfile(os.path.join(coordinator_folder, file))
+ ]
+
+ # We detect the various file types
+ extracted_contents = {}
+ for filename in coordinator_folder_contents:
+ filepath = os.path.join(coordinator_folder, filename)
+ if file_extraction_tools.is_pdf(filepath):
+ report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
+ if report_type is None:
+ raise ValueError(f"Unknown report type for {filename}")
+
+ file_extractor = extractors.get(report_type)
+ if file_extractor is None:
+ continue
+
+ extracted_contents[report_type] = file_extractor(filepath).extract()
diff --git a/etl/lodgement/requirements.txt b/etl/lodgement/requirements.txt
new file mode 100644
index 00000000..601907ed
--- /dev/null
+++ b/etl/lodgement/requirements.txt
@@ -0,0 +1,8 @@
+PyPDF2
+pandas
+tqdm
+openpyxl
+boto3
+usaddress==0.5.11
+fuzzywuzzy==0.18.0
+python-dotenv
\ No newline at end of file
diff --git a/utils/file_data_extraction.py b/utils/file_data_extraction.py
new file mode 100644
index 00000000..cdd25f8a
--- /dev/null
+++ b/utils/file_data_extraction.py
@@ -0,0 +1,343 @@
+import PyPDF2
+import re
+from collections import Counter
+
+"""
+This script contains functions used to extract data from retrofit survey files, including EPRs,
+summary reports, etc
+"""
+
+
+def is_elmhurst_energy_report(text):
+ """
+ Determines if the provided text indicates that the PDF is an Energy Report.
+ Returns True if the text contains 'Energy Report'.
+ """
+ return text.startswith("ENERGY REPORT")
+
+
+def is_elmhurst_summary_report(text):
+ """
+ Determines if the provided text indicates that the PDF is a Summary Report.
+ """
+ return text.startswith("Summary Information")
+
+
+def is_osmosis_condition_report(text):
+ """
+ Determines if the provided text indicates that the PDF is a Condition Report.
+ """
+ return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport")
+
+
+def is_elmhurst_evidence_report(text):
+ """
+ Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report.
+ """
+ return text.startswith("RdSAP Evidence Report")
+
+
+def detect_pdf_report_type(pdf_path):
+ """
+ Detects the type of report based on content or filename.
+ :param pdf_path: String path to the PDF file
+ :param pdf_file: String name of the PDF file
+ :return: String type of the report ("epr", "summary", or None)
+ """
+ # Attempt to read the first page of the PDF to determine type
+ with open(pdf_path, "rb") as file:
+ reader = PyPDF2.PdfReader(file)
+ first_page_text = reader.pages[0].extract_text() if reader.pages else ""
+
+ if is_elmhurst_energy_report(first_page_text):
+ return "elmhurst epr"
+ elif is_elmhurst_summary_report(first_page_text):
+ return "elmhurst summary report"
+ elif is_osmosis_condition_report(first_page_text):
+ return "osmosis condition report"
+ elif is_elmhurst_evidence_report(first_page_text):
+ return "elmhurst evidence report"
+
+ return None
+
+
+def is_pdf(filename):
+ """
+ Determines if the provided filename is a PDF file.
+ """
+ return filename.endswith(".pdf")
+
+
+class ElmhurstEprExtractor:
+ def __init__(self, file_path):
+ self.file_path = file_path
+
+ @staticmethod
+ def extract_window_age_description(windows_text):
+ """
+ Extracts the most common window age description and its proportion.
+ """
+ windows_text = windows_text.replace("\n", "")
+ window_descriptions = [
+ "Double post or during 2002",
+ "Double pre 2002",
+ "Double with unknown install date",
+ "Secondary glazing",
+ "Triple glazing",
+ "Single glazing",
+ ]
+ description_counts = Counter()
+ for description in window_descriptions:
+ matches = re.findall(re.escape(description), windows_text)
+ description_counts[description] = len(matches)
+
+ if not description_counts or not sum(description_counts.values()):
+ raise ValueError("Failed to extract window data.")
+
+ most_common_description, window_count = description_counts.most_common(1)[0]
+ window_proportion = window_count / sum(description_counts.values()) * 100
+
+ if window_proportion == 100:
+ second_most_common_description = None
+ second_most_common_proportion = 0
+ else:
+ second_most_common_description, second_window_count = description_counts.most_common(2)[1]
+ second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100
+
+ return {
+ "Window Age Description": most_common_description,
+ "Window Age Description Proportion (%)": window_proportion,
+ "Secondary Window Age Description": second_most_common_description,
+ "Secondary Window Age Description Proportion (%)": second_most_common_proportion,
+ "Number of Windows": sum(description_counts.values())
+ }
+
+ @staticmethod
+ def extract_building_parts(text):
+ """
+ Extracts building parts and associated dimensions from the provided text.
+ """
+ data = []
+ building_part_pattern = re.compile(
+ r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party "
+ r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)",
+ re.DOTALL
+ )
+ for match in building_part_pattern.finditer(text):
+ part_name = match.group(1).strip()
+ floor_data = match.group(2)
+ room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name)
+ if room_in_roof_match:
+ floor_area = float(room_in_roof_match.group(1))
+ cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
+ data.append({
+ "Building Part": cleaned_part_name,
+ "Floor Level": "Room in Roof",
+ "Floor Area (m2)": floor_area,
+ "Room Height (m)": None,
+ "Perimeter (m)": None,
+ "Party Wall Length (m)": None
+ })
+ else:
+ cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip()
+
+ floor_pattern = re.compile(
+ r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
+ )
+ for floor_match in floor_pattern.finditer(floor_data):
+ floor_level = floor_match.group(1)
+ floor_area = float(floor_match.group(2))
+ room_height = float(floor_match.group(3))
+ perimeter = float(floor_match.group(4))
+ party_wall_length = float(floor_match.group(5))
+ data.append({
+ "Building Part": cleaned_part_name,
+ "Floor Level": floor_level,
+ "Floor Area (m2)": floor_area,
+ "Room Height (m)": room_height,
+ "Perimeter (m)": perimeter,
+ "Party Wall Length (m)": party_wall_length
+ })
+
+ return data
+
+ @staticmethod
+ def extract_roof_details(text):
+ """
+ Extracts roof details for each building part in the provided text.
+ """
+ roof_data = []
+ building_part_pattern = re.compile(
+ r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
+ re.DOTALL
+ )
+ for match in building_part_pattern.finditer(text):
+ part_name = match.group(1).strip()
+ cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
+ part_details = match.group(2)
+ roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details)
+ roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details)
+ roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details)
+
+ roof_data.append({
+ "Building Part": cleaned_part_name,
+ "Roof Type": roof_type_match.group(1).strip() if roof_type_match else None,
+ "Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None,
+ "Roof Insulation Thickness": roof_insulation_thickness_match.group(
+ 1).strip() if roof_insulation_thickness_match else None,
+ })
+
+ return roof_data
+
+ @staticmethod
+ def extract_wall_details(text):
+ """
+ Extracts wall details for each building part in the provided text.
+ """
+ wall_data = []
+ building_part_pattern = re.compile(
+ r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
+ re.DOTALL
+ )
+ for match in building_part_pattern.finditer(text):
+ part_name = match.group(1).strip()
+ cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
+ part_details = match.group(2)
+ wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details)
+ wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
+ wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
+ wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
+
+ wall_data.append({
+ "Building Part": cleaned_part_name,
+ "Wall Type": wall_type_match.group(1).strip() if wall_type_match else None,
+ "Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None,
+ "Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None,
+ "Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None,
+ })
+
+ return wall_data
+
+ @staticmethod
+ def extract_primary_heating(text):
+
+ # Extract Primary Heating Section (Main Heating 1)
+ primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL)
+ # We may not have a secondary heating
+ primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL)
+ primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
+ primary_text = primary_heating_section.group(1)
+
+ primary_heating_output = {
+ "Existing Primary Heating System": re.search(
+ r"Main Heating Code\s*(.*?)\n", primary_text
+ ).group(1).strip(),
+ "Existing Primary Heating PCDF Reference": re.search(
+ r"PCDF boiler Reference\s*(\d+)", primary_text
+ ).group(1),
+ "Existing Primary Heating Controls": re.search(
+ r"Main Heating Controls\s*(.*?)\n", primary_text
+ ).group(1).strip(),
+ "Existing Primary Heating % of Heat": int(
+ re.search(r"Percentage of Heat\s*(\d+)\s*%?", primary_text).group(1)
+ )
+ }
+
+ return primary_heating_output
+
+ @staticmethod
+ def extract_secondary_heating(text):
+ # Extract Secondary Heating Section (Main Heating 2)
+ secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL)
+ output = {}
+ if secondary_heating_section is None:
+ output["Existing Heating System"] = ""
+ output["Existing Heating PCDF Reference"] = ""
+ output["Existing Heating Controls"] = ""
+ output["Existing Heating % of Heat"] = 0
+
+ else:
+ secondary_text = secondary_heating_section.group(1)
+
+ main_heating_code_match_secondary = re.search(
+ r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
+ )
+ output["Existing Heating System"] = main_heating_code_match_secondary.group(1).strip()
+
+ output["Existing Heating PCDF Reference"] = re.search(
+ r"PCDF boiler Reference\s*(\d+)", secondary_text
+ ).group(1)
+
+ if output["Existing Heating System"] == "":
+ output["Existing Heating Controls"] = ""
+ else:
+ # Might not have heating controls on 2nd system
+ secondary_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text)
+ output["Existing Heating Controls"] = (
+ secondary_controls_match.group(1).strip() if secondary_controls_match else ""
+ )
+ output["Existing Heating % of Heat"] = int(
+ re.search(r"Percentage of Heat\s*(\d+)\s*%?", secondary_text).group(1)
+ )
+
+ return output
+
+ def extract(self):
+ data = {}
+
+ with open(self.file_path, "rb") as file:
+ reader = PyPDF2.PdfReader(file)
+ text = "".join(page.extract_text() for page in reader.pages)
+
+ # Extracting individual components
+ address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
+ data["Address"] = address_match.group(1).strip()
+ data["Postcode"] = data["Address"].split(",")[-1].strip()
+
+ sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text)
+ data["Current SAP Rating"] = int(sap_match.group(1))
+
+ energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text)
+ data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1))
+
+ storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
+ data["Number of Storeys"] = int(storeys_match.group(1))
+
+ fuel_match = re.search(r"TOTAL\s*£(\d+)", text)
+ data["Fuel Bill"] = f"£{fuel_match.group(1)}"
+
+ total_doors_match = re.search(r"Total Doors:\s*(\d+)", text)
+ data["Total Number of Doors"] = int(total_doors_match.group(1))
+
+ # Extract Number of Insulated Doors
+ insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text)
+ data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
+
+ # Get number of lighting outlets and number of fittings needing LEL
+ lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text)
+ data["Number of Light Fittings"] = int(lighting_fittings_match.group(1))
+ lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text)
+ data["Number of LEL Fittings"] = int(lel_fittings_match.group(1))
+ data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
+
+ windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
+ data["Windows"] = self.extract_window_age_description(windows_section.group(1))
+
+ data["Primary Heating"] = self.extract_primary_heating(text)
+ data["Secondary Heating"] = self.extract_secondary_heating(text)
+ data["Building Parts"] = self.extract_building_parts(text)
+ data["Roof Details"] = self.extract_roof_details(text)
+ data["Wall Details"] = self.extract_wall_details(text)
+
+ secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text)
+ water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
+
+ if data["Secondary Heating"]["Existing Heating System"] == "":
+ data["Secondary Heating Code"] = ""
+ else:
+ data["Secondary Heating Code"] = secondary_heating_code_match.group(
+ 1).strip() if secondary_heating_code_match else ""
+
+ data["Water Heating Code"] = water_heating_code_match.group(1).strip()
+
+ return data