Model/utils/fullSapParser.py
Khalim Conn-Kowlessar 749faaebca extending extraction
2024-11-27 17:07:56 +00:00

306 lines
11 KiB
Python

import boto3
from xml.dom.minidom import parseString
PROPERTY_AGE_BAND = {
"A": "before 1900",
"B": "1900-1929",
"C": "1930-1949",
"D": "1950-1966",
"E": "1967-1975",
"F": "1976-1982",
"G": "1983-1990",
"H": "1991-1995",
"I": "1996-2002",
"J": "2003-2006",
"K": "2007-2011",
"L": "2012 onwards"
}
POSITION_OF_FLAT = {
"TopFloorFlat": "(top floor)"
}
MAINHEATING_LOOKUP = {
"SEB": "Electric (SEB modern slimline storage heaters)"
}
WINDOWS_YEAR_LOOKUP = {
"unknown install date": "unknown year",
"unknown install": "unknown year",
"post or during 2002": "2002 onwards",
}
class FullSapParser:
full_address = None
archetype = None
age_band = None
unheated_corridor = None
property_type = None
built_form = None
# ventilation
mechanical_ventilation = None
cross_ventilation = None
night_ventilation = None
# dimensions
number_of_storeys = None
property_dimensions = None
# fabric
low_energy_lighting = None
# Heating
heating1 = None
cylinder = None
cylinder_stat = None
def __init__(self, filekey, bucket_name=None):
self.s3_client = boto3.client('s3')
self.bucket_name = bucket_name
self.filekey = filekey
self.full_sap = None
self._read_file()
def _read_file(self):
"""
Reads the XML file either locally or from S3 and parses it using minidom.
Raises:
ValueError: If the file cannot be found, read, or parsed.
"""
try:
if self.bucket_name:
# Read from S3
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=self.filekey)
xml_content = response['Body'].read()
else:
# Read locally
with open(self.filekey, "r") as f:
xml_content = f.read()
# Parse the XML content using minidom
self.full_sap = parseString(xml_content)
except FileNotFoundError:
raise ValueError(f"Local file not found: {self.filekey}")
except Exception as e:
raise ValueError(f"An error occurred while reading or parsing the XML: {e}")
def extract(self, _return=True):
self.get_address()
self.get_archetype()
self.get_age_band()
self.get_unheated_corridor()
self.get_heating_1()
self.get_ventilation()
self.get_floor_area()
self.get_low_energy_lighting()
self.get_cylinder()
if _return:
return {
"Property Type": self.property_type,
"Built Form": self.built_form,
"Age Band": self.age_band,
}
def get_address(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
address = self.full_sap.getElementsByTagName("AddressAsDesigned")
if len(address) != 1:
raise ValueError("Non-unique address tag found - investigate me")
address = address[0]
data = {}
for node in address.childNodes:
if node.nodeType == node.ELEMENT_NODE:
data[node.nodeName] = node.firstChild.nodeValue if node.firstChild else None
self.full_address = " ".join(
[
x.title() for x in [data["AddressLine1"], data["AddressLine2"], data["AddressLine3"], data["Town"]]
if x is not None
]
) + " " + data["Postcode"]
def get_archetype(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
property_type1 = self.full_sap.getElementsByTagName('PropertyType1')
property_type2 = self.full_sap.getElementsByTagName('PropertyType2')
position_of_flat = self.full_sap.getElementsByTagName('PositionOfFlat')
if len(property_type1) != 1 or len(property_type2) != 1:
raise ValueError("Non-unique property tag found - investigate me")
property_type1 = property_type1[0].firstChild.nodeValue
property_type2 = property_type2[0].firstChild.nodeValue
if position_of_flat[0].firstChild:
position_of_flat = POSITION_OF_FLAT[position_of_flat[0].firstChild.nodeValue]
else:
position_of_flat = None
self.property_type = property_type1
self.built_form = property_type2
self.archetype = property_type1 + " - " + property_type2
if position_of_flat:
self.archetype = self.archetype + " " + position_of_flat
def get_age_band(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
property_age_band = self.full_sap.getElementsByTagName('PropertyAgeBand')
if len(property_age_band) != 1:
raise ValueError("Non-unique property age band tag found - investigate me")
property_age_band = property_age_band[0].firstChild.nodeValue
self.age_band = PROPERTY_AGE_BAND[property_age_band]
def get_wall_area_for_description(self, description):
wall_recs = self.full_sap.getElementsByTagName("WallRec")
for wall_rec in wall_recs:
desc_elements = wall_rec.getElementsByTagName("Description")
if desc_elements and desc_elements[0].firstChild.data == description:
area_elements = wall_rec.getElementsByTagName("Area")
if area_elements:
area = float(area_elements[0].firstChild.data)
# Placeholder for wall_description which you'll populate later
return f"Unheated corridor - {area} area"
return None
def get_unheated_corridor(self):
"""
Unheated corridors don't always exist so we'll need to search for it
:return:
"""
if not self.full_sap:
raise ValueError("You need to read the file first")
self.unheated_corridor = self.get_wall_area_for_description("Flat corridor Main")
def get_heating_1(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
main_heating_system = self.full_sap.getElementsByTagName('MainHeatingSystem1')
if len(main_heating_system) != 1:
raise ValueError("Non-unique main heating system tag found - investigate me")
main_heating_system = main_heating_system[0]
mhs = main_heating_system.getElementsByTagName('MHS')[0].firstChild.nodeValue
mhs = MAINHEATING_LOOKUP.get(mhs, mhs)
fraction = main_heating_system.getElementsByTagName('Fraction')[0].firstChild.nodeValue
self.heating1 = f"{mhs} : {fraction}% of heating"
def get_ventilation(self):
bool_lookup = {
"true": True,
"false": False
}
# Extract MechanicalVentilationDecentralised
mech_vent = self.full_sap.getElementsByTagName("MechanicalVentilationDecentralised")
if mech_vent and mech_vent[0].childNodes:
mech_vent_value = mech_vent[0].firstChild.nodeValue
else:
mech_vent_value = None
# Extract CrossVentilation
cross_vent = self.full_sap.getElementsByTagName("CrossVentilation")
if cross_vent and cross_vent[0].childNodes:
cross_vent_value = cross_vent[0].firstChild.nodeValue
cross_vent_value = bool_lookup.get(cross_vent_value, cross_vent_value)
else:
cross_vent_value = None
# Extract NightVentilation
night_vent = self.full_sap.getElementsByTagName("NightVentilation")
if night_vent and night_vent[0].childNodes:
night_vent_value = night_vent[0].firstChild.nodeValue
night_vent_value = bool_lookup.get(night_vent_value, night_vent_value)
else:
night_vent_value = None
# Create the outputs
self.mechanical_ventilation = "Mechanical ventilation present" if mech_vent_value else "No mechanical " \
"ventilation"
self.cross_ventilation = "Cross ventilation present" if cross_vent_value else "No cross ventilation"
self.night_ventilation = "Night ventilation present" if night_vent_value else "No night ventilation"
def get_floor_area(self):
self.number_of_storeys = int(self.full_sap.getElementsByTagName('NumberOfStoreys')[0].firstChild.nodeValue)
storeys = self.full_sap.getElementsByTagName('StoreyMeasurementRec')
# TODO: The first StoreyMeasurementRec tag looks like this in the examples we've seen:
# <StoreyMeasurementRec xsi:nil="true" />
# Indicating that the tag is explicitly indicated as empty
storey_data = []
storey_index = -1
for storey in storeys:
storey_index += 1
if storey.getAttribute("xsi:nil") == "true":
continue
if storey_index == -1:
raise NotImplementedError(
"Investigated me - potentially basement found but need to confirm with Basement tag"
)
floor_area = storey.getElementsByTagName('InternalFloorArea')
if not floor_area:
continue
floor_area = float(floor_area[0].firstChild.nodeValue)
# If floor area is 0, skip this storey
if not floor_area:
continue
perimeter = float(storey.getElementsByTagName('InternalPerimeter')[0].firstChild.nodeValue)
height = float(storey.getElementsByTagName('StoreyHeight')[0].firstChild.nodeValue)
storey_data.append({
"storey_index": storey_index,
"Floor Area": floor_area,
"Perimeter": perimeter,
"Height": height
})
# We will convert this into a table in the markdown
self.property_dimensions = storey_data
def get_low_energy_lighting(self):
# Extract the values of the LightFittings and LELFittings tags
light_fittings = self.full_sap.getElementsByTagName('LightFittings')[0].firstChild.data
lel_fittings = self.full_sap.getElementsByTagName('LELFittings')[0].firstChild.data
# Construct the string message
self.low_energy_lighting = f"{lel_fittings} out of {light_fittings} lighting fittings are low energy."
def get_cylinder(self):
insulation_type = self.full_sap.getElementsByTagName('InsulationType')[0].firstChild.data
insulation_thickness = self.full_sap.getElementsByTagName('InsulationThickness')[0].firstChild.data
if insulation_type and insulation_thickness:
self.cylinder = f"Insulated, {insulation_type}: {insulation_thickness}mm."
else:
self.cylinder = "Not insulated."
self.cylinder_stat = self.full_sap.getElementsByTagName('CylinderStat')[0].firstChild.data