mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
extending extraction
This commit is contained in:
parent
0efd0163ee
commit
749faaebca
3 changed files with 562 additions and 11 deletions
|
|
@ -1,5 +1,83 @@
|
|||
import os
|
||||
import utils.file_data_extraction as file_extraction_tools
|
||||
from utils.fullSapParser import FullSapParser
|
||||
|
||||
output_template = {
|
||||
"Property Address": None,
|
||||
"Osm. ID": None,
|
||||
"Postcode": None,
|
||||
"City/County": None,
|
||||
"District/Town": None,
|
||||
"Funding Stream": None,
|
||||
"Local Authority": None,
|
||||
"Trustmark Lodgement ID": None,
|
||||
"Certificate Number": None,
|
||||
"EWI UMR": None,
|
||||
"Loft UMR": None,
|
||||
"Windows UMR": None,
|
||||
"Doors UMR": None,
|
||||
"Measure Lodgement Date": None,
|
||||
"Full Lodgement Date": None,
|
||||
"Name": None,
|
||||
"Phone": None,
|
||||
"Email": None,
|
||||
"Secondary Contact Name": None,
|
||||
"Secondary Contact Phone": None,
|
||||
"Trustmark Licence Number": None,
|
||||
"Retrofit Assessment Date": None,
|
||||
"Company Name": None,
|
||||
"Retrofit Designer Name": None,
|
||||
"Property Type": None,
|
||||
"Property Detachment": None,
|
||||
"No. of Bedrooms": None,
|
||||
"Property Age": None,
|
||||
"SAP Rating Pre (from IMA)": None,
|
||||
"Pre Heat Transfer": None,
|
||||
"Pre Total Floor Area": None,
|
||||
"Pre Heat Demand": None,
|
||||
"Pre Air Tightness": None,
|
||||
"SAP Rating Post (from EPC)": None,
|
||||
"Post Heat Transfer": None,
|
||||
"Post Total Floor Area": None,
|
||||
"Post Heat Demand": None,
|
||||
"Post Air Tightness": None,
|
||||
"Number of Eligible Measures Installed": None,
|
||||
"Total Cost of Works": None,
|
||||
"Annual Fuel Saving (MTP)": None,
|
||||
"Work Type ID": None,
|
||||
"Measure Category": None,
|
||||
"Installer": None,
|
||||
"Operative Name": None,
|
||||
"Operative Certif. Reference": None,
|
||||
"Manufacturer": None,
|
||||
"Model": None,
|
||||
"Financial Protection Body (IBG)": None,
|
||||
"Policy Start Date": None,
|
||||
"IBG Policy Reference": None,
|
||||
"Warranty Duration": None,
|
||||
"Total Invoiced (Including VAT)": None,
|
||||
"Installation Date": None,
|
||||
"Handover Date": None,
|
||||
"Percentage": None,
|
||||
"Reference Number": None,
|
||||
}
|
||||
|
||||
|
||||
def update_dictionary_with_check(dictionary, updates):
|
||||
"""
|
||||
Updates a dictionary with key-value pairs, raising an error if the key does not exist.
|
||||
|
||||
Args:
|
||||
dictionary (dict): The dictionary to update.
|
||||
updates (dict): The updates to apply.
|
||||
|
||||
Raises:
|
||||
KeyError: If a key in updates does not exist in the dictionary.
|
||||
"""
|
||||
for key, value in updates.items():
|
||||
if key not in dictionary:
|
||||
raise KeyError(f"Key '{key}' does not exist in the dictionary.")
|
||||
dictionary[key] = value
|
||||
|
||||
|
||||
def handler():
|
||||
|
|
@ -11,7 +89,11 @@ def handler():
|
|||
|
||||
# Ths source data will eventually come from Sharepoint
|
||||
source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot"
|
||||
output_template = "Trustmark Details - Template REV.25.11.24.xlsx"
|
||||
output_template_file = "Trustmark Details - Template REV.25.11.24.xlsx"
|
||||
funding_stream = "HUG2"
|
||||
customer_name = "Shropshire Council"
|
||||
customer_phone = "0345 678 9000"
|
||||
customer_email = "affordablewarmth@shropshire.gov.uk"
|
||||
|
||||
# List the folders in the source data path
|
||||
folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))]
|
||||
|
|
@ -20,7 +102,8 @@ def handler():
|
|||
"elmhurst epr": file_extraction_tools.ElmhurstEprExtractor,
|
||||
"elmhurst summary report": None,
|
||||
"osmosis condition report": None,
|
||||
"elmhurst evidence report": None
|
||||
"elmhurst evidence report": None,
|
||||
"full sap xml": FullSapParser,
|
||||
}
|
||||
|
||||
for property_folder in folders:
|
||||
|
|
@ -43,4 +126,68 @@ def handler():
|
|||
file_extractor = extractors.get(report_type)
|
||||
if file_extractor is None:
|
||||
continue
|
||||
|
||||
extracted_contents[report_type] = file_extractor(filepath).extract()
|
||||
|
||||
if file_extraction_tools.is_xml(filepath):
|
||||
xml_type = file_extraction_tools.detect_xml_report_type(xml_path=filepath)
|
||||
if xml_type is None:
|
||||
raise ValueError(f"Unknown report type for {filename}")
|
||||
file_extractor = extractors.get(xml_type)
|
||||
if file_extractor is None:
|
||||
continue
|
||||
extracted_contents[xml_type] = file_extractor(filepath).extract()
|
||||
|
||||
output_row_data = output_template.copy()
|
||||
|
||||
# dict_keys([, , , 'City/County', 'District/Town',
|
||||
# 'Local Authority',
|
||||
# 'Trustmark Lodgement ID',
|
||||
# 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR',
|
||||
# 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Name', 'Phone', 'Email', 'Secondary Contact
|
||||
# Name', 'Secondary Contact Phone', 'Trustmark Licence Number', 'Retrofit Assessment Date', 'Company Name',
|
||||
# 'Retrofit Designer Name', , 'No. of Bedrooms',
|
||||
# , 'Pre Heat Transfer', 'Pre Total Floor Area', 'Pre Heat Demand',
|
||||
# 'Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat Transfer', 'Post Total Floor Area',
|
||||
# 'Post Heat Demand', 'Post Air Tightness', 'Number of Eligible Measures Installed', 'Total Cost of Works',
|
||||
# 'Annual Fuel Saving (MTP)', 'Work Type ID', 'Measure Category', 'Installer', 'Operative Name', 'Operative
|
||||
# Certif. Reference', 'Manufacturer', 'Model', 'Financial Protection Body (IBG)', 'Policy Start Date',
|
||||
# 'IBG Policy Reference', 'Warranty Duration', 'Total Invoiced (Including VAT)', 'Installation Date',
|
||||
# 'Handover Date', 'Percentage', 'Reference Number'])
|
||||
# Populate the output row data
|
||||
if extracted_contents["elmhurst epr"]:
|
||||
total_floor_area = sum(
|
||||
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] +
|
||||
# Get the conservatory floor area
|
||||
extracted_contents["elmhurst epr"]["Conservatory"]["Floor Area (m2)"]
|
||||
)
|
||||
|
||||
to_insert = {
|
||||
"Property Address": property_folder.split(")")[1].strip(),
|
||||
"Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(),
|
||||
"Postcode": extracted_contents["elmhurst epr"]["Postcode"],
|
||||
"City/County": None,
|
||||
"District/Town": None,
|
||||
"Funding Stream": funding_stream,
|
||||
"Local Authority": None,
|
||||
'Property Age': extracted_contents["elmhurst epr"]["Property Age"],
|
||||
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"],
|
||||
'Pre Heat Transfer': extracted_contents["elmhurst epr"][
|
||||
"Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area,
|
||||
}
|
||||
|
||||
output_row_data["Property Address"] = property_folder.split(")")[1].strip()
|
||||
output_row_data["Osm. ID"] = property_folder.split(")")[0].strip().lstrip("(").strip()
|
||||
output_row_data["Postcode"] = extracted_contents["elmhurst epr"]["Postcode"]
|
||||
output_row_data["City/County"] = ()
|
||||
output_row_data["Batch"] = ()
|
||||
output_row_data["Funding Stream"] = funding_stream
|
||||
output_row_data["Risk Path"] = ()
|
||||
|
||||
if extracted_contents["full sap xml"]:
|
||||
to_insert = {
|
||||
"Property Type": extracted_contents["full sap xml"]["Property Type"],
|
||||
"Property Detachment": extracted_contents["full sap xml"]["Built Form"],
|
||||
"Property Age": extracted_contents["full sap xml"]["Age Band"],
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
import PyPDF2
|
||||
import re
|
||||
from collections import Counter
|
||||
from utils.logger import setup_logger
|
||||
from xml.dom.minidom import parseString
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
"""
|
||||
This script contains functions used to extract data from retrofit survey files, including EPRs,
|
||||
|
|
@ -61,6 +65,25 @@ def detect_pdf_report_type(pdf_path):
|
|||
return None
|
||||
|
||||
|
||||
def detect_xml_report_type(xml_path):
|
||||
"""
|
||||
Detects the type of XML report based on content or filename.
|
||||
:param xml_path: String path to the XML file
|
||||
:return: String type of the report ("full sap xml", or None)
|
||||
"""
|
||||
# Attempt to read the first page of the PDF to determine type
|
||||
with open(xml_path, "r") as file:
|
||||
contents = file.read()
|
||||
|
||||
contents = parseString(contents)
|
||||
product_tag_search = contents.getElementsByTagName("Product")
|
||||
if product_tag_search:
|
||||
if product_tag_search[0].firstChild.nodeValue == "Sap 2012 Desktop":
|
||||
return "full sap xml"
|
||||
|
||||
raise Exception("Not implemented")
|
||||
|
||||
|
||||
def is_pdf(filename):
|
||||
"""
|
||||
Determines if the provided filename is a PDF file.
|
||||
|
|
@ -68,6 +91,13 @@ def is_pdf(filename):
|
|||
return filename.endswith(".pdf")
|
||||
|
||||
|
||||
def is_xml(filename):
|
||||
"""
|
||||
Determines if the provided filename is an XML file.
|
||||
"""
|
||||
return filename.endswith(".xml")
|
||||
|
||||
|
||||
class ElmhurstEprExtractor:
|
||||
"""
|
||||
A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR).
|
||||
|
|
@ -223,26 +253,82 @@ class ElmhurstEprExtractor:
|
|||
return wall_data
|
||||
|
||||
@staticmethod
|
||||
def _extract_heating_details(section_text):
|
||||
def extract_conservatory(text):
|
||||
"""
|
||||
Extracts conservatory data from the provided text.
|
||||
The section is located between "Conservatory" and "Doors".
|
||||
|
||||
Args:
|
||||
text (str): The full text of the EPR PDF.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary with conservatory details:
|
||||
- "Conservatory Present"
|
||||
- "Conservatory Separated"
|
||||
- "Conservatory Floor Area"
|
||||
- "Conservatory Double Glazed"
|
||||
- "Conservatory Glazed Perimeter"
|
||||
- "Heated Conservatory Height"
|
||||
"""
|
||||
|
||||
conservatory_match = re.search(r"Conservatory\s*(.*?)\s*Doors", text, re.DOTALL)
|
||||
if not conservatory_match:
|
||||
logger.error("Failed to extract conservatory data.")
|
||||
raise ValueError("Could not extract conservatory data.")
|
||||
|
||||
conservatory_text = conservatory_match.group(1)
|
||||
|
||||
# Check if conservatory is present
|
||||
present_match = re.search(r"Conservatory Present:\s*(Yes|No)", conservatory_text)
|
||||
|
||||
if not present_match or present_match.group(1).strip() == "No":
|
||||
logger.info("Conservatory not present.")
|
||||
return {
|
||||
"Conservatory Present": "No",
|
||||
"Conservatory Separated": "",
|
||||
"Conservatory Floor Area": 0,
|
||||
"Conservatory Double Glazed": "",
|
||||
"Conservatory Glazed Perimeter": 0,
|
||||
"Heated Conservatory Height": "",
|
||||
}
|
||||
|
||||
# Extract conservatory details
|
||||
separated_match = re.search(r"Conservatory Separated:\s*(Yes|No)", conservatory_text)
|
||||
floor_area_match = re.search(r"Conservatory Floor Area:\s*([\d.]+)", conservatory_text)
|
||||
double_glazed_match = re.search(r"Conservatory Double Glazed:\s*(Yes|No)", conservatory_text)
|
||||
glazed_perimeter_match = re.search(r"Conservatory Glazed Perimeter:\s*([\d.]+)", conservatory_text)
|
||||
height_match = re.search(r"Heated Conservatory Height:\s*(.*?)(?=\n|$)", conservatory_text)
|
||||
|
||||
return {
|
||||
"Conservatory Present": "Yes",
|
||||
"Conservatory Separated": separated_match.group(1).strip() if separated_match else "",
|
||||
"Conservatory Floor Area": float(floor_area_match.group(1)) if floor_area_match else 0,
|
||||
"Conservatory Double Glazed": double_glazed_match.group(1).strip() if double_glazed_match else "",
|
||||
"Conservatory Glazed Perimeter": float(glazed_perimeter_match.group(1)) if glazed_perimeter_match else 0,
|
||||
"Heated Conservatory Height": height_match.group(1).strip() if height_match else "",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _extract_heating_details(section_text, default_value=""):
|
||||
"""
|
||||
Extracts heating details from a given section of text.
|
||||
|
||||
Args:
|
||||
section_text (str): The section of text containing heating details.
|
||||
default_value (str, optional): The default value to return for missing fields. Defaults to "".
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing heating system details.
|
||||
"""
|
||||
|
||||
system_search = re.search(r"Main Heating Code\s*(.*?)\n", section_text)
|
||||
pcdf_search = re.search(r"PCDF boiler Reference\s*(\d+)", section_text)
|
||||
controls_search = re.search(r"Main Heating Controls\s*(.*?)\n", section_text)
|
||||
heat_search = re.search(r"Percentage of Heat\s*(\d+)\s*%?", section_text)
|
||||
|
||||
return {
|
||||
"System": system_search.group(1).strip() if system_search else "",
|
||||
"PCDF Reference": pcdf_search.group(1) if pcdf_search else "",
|
||||
"Controls": controls_search.group(1).strip() if controls_search else "",
|
||||
"System": system_search.group(1).strip() if system_search else default_value,
|
||||
"PCDF Reference": pcdf_search.group(1) if pcdf_search else default_value,
|
||||
"Controls": controls_search.group(1).strip() if controls_search else default_value,
|
||||
"% of Heat": int(heat_search.group(1)) if heat_search else 0,
|
||||
}
|
||||
|
||||
|
|
@ -257,7 +343,7 @@ class ElmhurstEprExtractor:
|
|||
|
||||
return self._extract_heating_details(primary_text)
|
||||
|
||||
def extract_secondary_heating(self, text):
|
||||
def extract_secondary_heating_details(self, text):
|
||||
# Extract Secondary Heating Section (Main Heating 2)
|
||||
secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL)
|
||||
|
||||
|
|
@ -265,7 +351,7 @@ class ElmhurstEprExtractor:
|
|||
if secondary_heating_section is None:
|
||||
|
||||
output["System"] = ""
|
||||
output[" PCDF Reference"] = ""
|
||||
output["PCDF Reference"] = ""
|
||||
output["Controls"] = ""
|
||||
output["% of Heat"] = 0
|
||||
|
||||
|
|
@ -304,65 +390,77 @@ class ElmhurstEprExtractor:
|
|||
# Extracting individual components
|
||||
address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
|
||||
if not address_match:
|
||||
logger.error("Failed to extract address.")
|
||||
raise ValueError("Failed to extract address.")
|
||||
data["Address"] = address_match.group(1).strip()
|
||||
data["Postcode"] = data["Address"].split(",")[-1].strip()
|
||||
|
||||
sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text)
|
||||
if not sap_match:
|
||||
logger.error("Failed to extract SAP rating.")
|
||||
raise ValueError("Failed to extract SAP rating.")
|
||||
data["Current SAP Rating"] = int(sap_match.group(1))
|
||||
|
||||
energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text)
|
||||
if not energy_match:
|
||||
logger.error("Failed to extract primary energy use.")
|
||||
raise ValueError("Failed to extract primary energy use.")
|
||||
data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1))
|
||||
|
||||
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
|
||||
if not storeys_match:
|
||||
raise ValueError("Failed to extract number of storeys.")
|
||||
logger.error("Failed to extract the number of storeys.")
|
||||
raise ValueError("Failed to extract the number of storeys.")
|
||||
data["Number of Storeys"] = int(storeys_match.group(1))
|
||||
|
||||
fuel_match = re.search(r"TOTAL\s*£(\d+)", text)
|
||||
if not fuel_match:
|
||||
logger.error("Failed to extract fuel bill.")
|
||||
raise ValueError("Failed to extract fuel bill.")
|
||||
data["Fuel Bill"] = f"£{fuel_match.group(1)}"
|
||||
|
||||
total_doors_match = re.search(r"Total Doors:\s*(\d+)", text)
|
||||
if not total_doors_match:
|
||||
logger.error("Failed to extract total doors.")
|
||||
raise ValueError("Failed to extract total doors.")
|
||||
data["Total Number of Doors"] = int(total_doors_match.group(1))
|
||||
|
||||
# Extract Number of Insulated Doors
|
||||
insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text)
|
||||
if not insulated_doors_match:
|
||||
logger.error("Failed to extract insulated doors.")
|
||||
raise ValueError("Failed to extract insulated doors.")
|
||||
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
|
||||
|
||||
# Get number of lighting outlets and number of fittings needing LEL
|
||||
lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text)
|
||||
if not lighting_fittings_match:
|
||||
logger.error("Failed to extract lighting.")
|
||||
raise ValueError("Failed to extract lighting")
|
||||
data["Number of Light Fittings"] = int(lighting_fittings_match.group(1))
|
||||
lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text)
|
||||
if not lel_fittings_match:
|
||||
logger.error("Failed to extract LEL fittings.")
|
||||
raise ValueError("Failed to extract LEL fittings.")
|
||||
data["Number of LEL Fittings"] = int(lel_fittings_match.group(1))
|
||||
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
|
||||
|
||||
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
|
||||
if not windows_section:
|
||||
logger.error("Failed to extract window data.")
|
||||
raise ValueError("Failed to extract window data.")
|
||||
data["Windows"] = self.extract_window_age_description(windows_section.group(1))
|
||||
|
||||
data["Primary Heating"] = self.extract_primary_heating(text)
|
||||
data["Secondary Heating"] = self.extract_secondary_heating(text)
|
||||
data["Secondary Heating"] = self.extract_secondary_heating_details(text)
|
||||
data["Building Parts"] = self.extract_building_parts(text)
|
||||
data["Roof Details"] = self.extract_roof_details(text)
|
||||
data["Wall Details"] = self.extract_wall_details(text)
|
||||
data["Conservatory"] = self.extract_conservatory(text)
|
||||
|
||||
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
|
||||
if not water_heating_code_match:
|
||||
logger.error("Failed to extract water heating code.")
|
||||
raise ValueError("Failed to extract water heating code.")
|
||||
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
|
||||
|
||||
|
|
|
|||
306
utils/fullSapParser.py
Normal file
306
utils/fullSapParser.py
Normal file
|
|
@ -0,0 +1,306 @@
|
|||
import boto3
|
||||
from xml.dom.minidom import parseString
|
||||
|
||||
PROPERTY_AGE_BAND = {
|
||||
"A": "before 1900",
|
||||
"B": "1900-1929",
|
||||
"C": "1930-1949",
|
||||
"D": "1950-1966",
|
||||
"E": "1967-1975",
|
||||
"F": "1976-1982",
|
||||
"G": "1983-1990",
|
||||
"H": "1991-1995",
|
||||
"I": "1996-2002",
|
||||
"J": "2003-2006",
|
||||
"K": "2007-2011",
|
||||
"L": "2012 onwards"
|
||||
}
|
||||
|
||||
POSITION_OF_FLAT = {
|
||||
"TopFloorFlat": "(top floor)"
|
||||
}
|
||||
|
||||
MAINHEATING_LOOKUP = {
|
||||
"SEB": "Electric (SEB modern slimline storage heaters)"
|
||||
}
|
||||
|
||||
WINDOWS_YEAR_LOOKUP = {
|
||||
"unknown install date": "unknown year",
|
||||
"unknown install": "unknown year",
|
||||
"post or during 2002": "2002 onwards",
|
||||
}
|
||||
|
||||
|
||||
class FullSapParser:
|
||||
full_address = None
|
||||
archetype = None
|
||||
age_band = None
|
||||
unheated_corridor = None
|
||||
property_type = None
|
||||
built_form = None
|
||||
|
||||
# ventilation
|
||||
mechanical_ventilation = None
|
||||
cross_ventilation = None
|
||||
night_ventilation = None
|
||||
|
||||
# dimensions
|
||||
number_of_storeys = None
|
||||
property_dimensions = None
|
||||
|
||||
# fabric
|
||||
low_energy_lighting = None
|
||||
|
||||
# Heating
|
||||
heating1 = None
|
||||
cylinder = None
|
||||
cylinder_stat = None
|
||||
|
||||
def __init__(self, filekey, bucket_name=None):
|
||||
self.s3_client = boto3.client('s3')
|
||||
self.bucket_name = bucket_name
|
||||
self.filekey = filekey
|
||||
self.full_sap = None
|
||||
|
||||
self._read_file()
|
||||
|
||||
def _read_file(self):
|
||||
"""
|
||||
Reads the XML file either locally or from S3 and parses it using minidom.
|
||||
|
||||
Raises:
|
||||
ValueError: If the file cannot be found, read, or parsed.
|
||||
"""
|
||||
try:
|
||||
if self.bucket_name:
|
||||
# Read from S3
|
||||
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=self.filekey)
|
||||
xml_content = response['Body'].read()
|
||||
else:
|
||||
# Read locally
|
||||
with open(self.filekey, "r") as f:
|
||||
xml_content = f.read()
|
||||
|
||||
# Parse the XML content using minidom
|
||||
self.full_sap = parseString(xml_content)
|
||||
except FileNotFoundError:
|
||||
raise ValueError(f"Local file not found: {self.filekey}")
|
||||
except Exception as e:
|
||||
raise ValueError(f"An error occurred while reading or parsing the XML: {e}")
|
||||
|
||||
def extract(self, _return=True):
|
||||
self.get_address()
|
||||
self.get_archetype()
|
||||
self.get_age_band()
|
||||
self.get_unheated_corridor()
|
||||
self.get_heating_1()
|
||||
self.get_ventilation()
|
||||
self.get_floor_area()
|
||||
self.get_low_energy_lighting()
|
||||
self.get_cylinder()
|
||||
|
||||
if _return:
|
||||
return {
|
||||
"Property Type": self.property_type,
|
||||
"Built Form": self.built_form,
|
||||
"Age Band": self.age_band,
|
||||
}
|
||||
|
||||
def get_address(self):
|
||||
if not self.full_sap:
|
||||
raise ValueError("You need to read the file first")
|
||||
|
||||
address = self.full_sap.getElementsByTagName("AddressAsDesigned")
|
||||
if len(address) != 1:
|
||||
raise ValueError("Non-unique address tag found - investigate me")
|
||||
|
||||
address = address[0]
|
||||
data = {}
|
||||
for node in address.childNodes:
|
||||
if node.nodeType == node.ELEMENT_NODE:
|
||||
data[node.nodeName] = node.firstChild.nodeValue if node.firstChild else None
|
||||
|
||||
self.full_address = " ".join(
|
||||
[
|
||||
x.title() for x in [data["AddressLine1"], data["AddressLine2"], data["AddressLine3"], data["Town"]]
|
||||
if x is not None
|
||||
]
|
||||
) + " " + data["Postcode"]
|
||||
|
||||
def get_archetype(self):
|
||||
if not self.full_sap:
|
||||
raise ValueError("You need to read the file first")
|
||||
|
||||
property_type1 = self.full_sap.getElementsByTagName('PropertyType1')
|
||||
property_type2 = self.full_sap.getElementsByTagName('PropertyType2')
|
||||
position_of_flat = self.full_sap.getElementsByTagName('PositionOfFlat')
|
||||
|
||||
if len(property_type1) != 1 or len(property_type2) != 1:
|
||||
raise ValueError("Non-unique property tag found - investigate me")
|
||||
|
||||
property_type1 = property_type1[0].firstChild.nodeValue
|
||||
property_type2 = property_type2[0].firstChild.nodeValue
|
||||
if position_of_flat[0].firstChild:
|
||||
position_of_flat = POSITION_OF_FLAT[position_of_flat[0].firstChild.nodeValue]
|
||||
else:
|
||||
position_of_flat = None
|
||||
|
||||
self.property_type = property_type1
|
||||
self.built_form = property_type2
|
||||
self.archetype = property_type1 + " - " + property_type2
|
||||
|
||||
if position_of_flat:
|
||||
self.archetype = self.archetype + " " + position_of_flat
|
||||
|
||||
def get_age_band(self):
|
||||
if not self.full_sap:
|
||||
raise ValueError("You need to read the file first")
|
||||
|
||||
property_age_band = self.full_sap.getElementsByTagName('PropertyAgeBand')
|
||||
|
||||
if len(property_age_band) != 1:
|
||||
raise ValueError("Non-unique property age band tag found - investigate me")
|
||||
|
||||
property_age_band = property_age_band[0].firstChild.nodeValue
|
||||
self.age_band = PROPERTY_AGE_BAND[property_age_band]
|
||||
|
||||
def get_wall_area_for_description(self, description):
|
||||
wall_recs = self.full_sap.getElementsByTagName("WallRec")
|
||||
for wall_rec in wall_recs:
|
||||
desc_elements = wall_rec.getElementsByTagName("Description")
|
||||
if desc_elements and desc_elements[0].firstChild.data == description:
|
||||
area_elements = wall_rec.getElementsByTagName("Area")
|
||||
if area_elements:
|
||||
area = float(area_elements[0].firstChild.data)
|
||||
# Placeholder for wall_description which you'll populate later
|
||||
return f"Unheated corridor - {area} area"
|
||||
return None
|
||||
|
||||
def get_unheated_corridor(self):
|
||||
"""
|
||||
Unheated corridors don't always exist so we'll need to search for it
|
||||
:return:
|
||||
"""
|
||||
|
||||
if not self.full_sap:
|
||||
raise ValueError("You need to read the file first")
|
||||
|
||||
self.unheated_corridor = self.get_wall_area_for_description("Flat corridor Main")
|
||||
|
||||
def get_heating_1(self):
|
||||
|
||||
if not self.full_sap:
|
||||
raise ValueError("You need to read the file first")
|
||||
|
||||
main_heating_system = self.full_sap.getElementsByTagName('MainHeatingSystem1')
|
||||
|
||||
if len(main_heating_system) != 1:
|
||||
raise ValueError("Non-unique main heating system tag found - investigate me")
|
||||
|
||||
main_heating_system = main_heating_system[0]
|
||||
|
||||
mhs = main_heating_system.getElementsByTagName('MHS')[0].firstChild.nodeValue
|
||||
mhs = MAINHEATING_LOOKUP.get(mhs, mhs)
|
||||
|
||||
fraction = main_heating_system.getElementsByTagName('Fraction')[0].firstChild.nodeValue
|
||||
|
||||
self.heating1 = f"{mhs} : {fraction}% of heating"
|
||||
|
||||
def get_ventilation(self):
|
||||
|
||||
bool_lookup = {
|
||||
"true": True,
|
||||
"false": False
|
||||
}
|
||||
|
||||
# Extract MechanicalVentilationDecentralised
|
||||
mech_vent = self.full_sap.getElementsByTagName("MechanicalVentilationDecentralised")
|
||||
if mech_vent and mech_vent[0].childNodes:
|
||||
mech_vent_value = mech_vent[0].firstChild.nodeValue
|
||||
else:
|
||||
mech_vent_value = None
|
||||
|
||||
# Extract CrossVentilation
|
||||
cross_vent = self.full_sap.getElementsByTagName("CrossVentilation")
|
||||
if cross_vent and cross_vent[0].childNodes:
|
||||
cross_vent_value = cross_vent[0].firstChild.nodeValue
|
||||
cross_vent_value = bool_lookup.get(cross_vent_value, cross_vent_value)
|
||||
else:
|
||||
cross_vent_value = None
|
||||
|
||||
# Extract NightVentilation
|
||||
night_vent = self.full_sap.getElementsByTagName("NightVentilation")
|
||||
if night_vent and night_vent[0].childNodes:
|
||||
night_vent_value = night_vent[0].firstChild.nodeValue
|
||||
night_vent_value = bool_lookup.get(night_vent_value, night_vent_value)
|
||||
else:
|
||||
night_vent_value = None
|
||||
|
||||
# Create the outputs
|
||||
self.mechanical_ventilation = "Mechanical ventilation present" if mech_vent_value else "No mechanical " \
|
||||
"ventilation"
|
||||
self.cross_ventilation = "Cross ventilation present" if cross_vent_value else "No cross ventilation"
|
||||
self.night_ventilation = "Night ventilation present" if night_vent_value else "No night ventilation"
|
||||
|
||||
def get_floor_area(self):
|
||||
|
||||
self.number_of_storeys = int(self.full_sap.getElementsByTagName('NumberOfStoreys')[0].firstChild.nodeValue)
|
||||
storeys = self.full_sap.getElementsByTagName('StoreyMeasurementRec')
|
||||
|
||||
# TODO: The first StoreyMeasurementRec tag looks like this in the examples we've seen:
|
||||
# <StoreyMeasurementRec xsi:nil="true" />
|
||||
# Indicating that the tag is explicitly indicated as empty
|
||||
|
||||
storey_data = []
|
||||
storey_index = -1
|
||||
for storey in storeys:
|
||||
storey_index += 1
|
||||
|
||||
if storey.getAttribute("xsi:nil") == "true":
|
||||
continue
|
||||
|
||||
if storey_index == -1:
|
||||
raise NotImplementedError(
|
||||
"Investigated me - potentially basement found but need to confirm with Basement tag"
|
||||
)
|
||||
|
||||
floor_area = storey.getElementsByTagName('InternalFloorArea')
|
||||
if not floor_area:
|
||||
continue
|
||||
|
||||
floor_area = float(floor_area[0].firstChild.nodeValue)
|
||||
# If floor area is 0, skip this storey
|
||||
if not floor_area:
|
||||
continue
|
||||
|
||||
perimeter = float(storey.getElementsByTagName('InternalPerimeter')[0].firstChild.nodeValue)
|
||||
height = float(storey.getElementsByTagName('StoreyHeight')[0].firstChild.nodeValue)
|
||||
|
||||
storey_data.append({
|
||||
"storey_index": storey_index,
|
||||
"Floor Area": floor_area,
|
||||
"Perimeter": perimeter,
|
||||
"Height": height
|
||||
})
|
||||
|
||||
# We will convert this into a table in the markdown
|
||||
self.property_dimensions = storey_data
|
||||
|
||||
def get_low_energy_lighting(self):
|
||||
# Extract the values of the LightFittings and LELFittings tags
|
||||
light_fittings = self.full_sap.getElementsByTagName('LightFittings')[0].firstChild.data
|
||||
lel_fittings = self.full_sap.getElementsByTagName('LELFittings')[0].firstChild.data
|
||||
|
||||
# Construct the string message
|
||||
self.low_energy_lighting = f"{lel_fittings} out of {light_fittings} lighting fittings are low energy."
|
||||
|
||||
def get_cylinder(self):
|
||||
insulation_type = self.full_sap.getElementsByTagName('InsulationType')[0].firstChild.data
|
||||
insulation_thickness = self.full_sap.getElementsByTagName('InsulationThickness')[0].firstChild.data
|
||||
|
||||
if insulation_type and insulation_thickness:
|
||||
self.cylinder = f"Insulated, {insulation_type}: {insulation_thickness}mm."
|
||||
else:
|
||||
self.cylinder = "Not insulated."
|
||||
|
||||
self.cylinder_stat = self.full_sap.getElementsByTagName('CylinderStat')[0].firstChild.data
|
||||
Loading…
Add table
Reference in a new issue