import boto3 from xml.dom.minidom import parseString PROPERTY_AGE_BAND = { "A": "before 1900", "B": "1900-1929", "C": "1930-1949", "D": "1950-1966", "E": "1967-1975", "F": "1976-1982", "G": "1983-1990", "H": "1991-1995", "I": "1996-2002", "J": "2003-2006", "K": "2007-2011", "L": "2012 onwards" } POSITION_OF_FLAT = { "TopFloorFlat": "(top floor)" } MAINHEATING_LOOKUP = { "SEB": "Electric (SEB modern slimline storage heaters)" } WINDOWS_YEAR_LOOKUP = { "unknown install date": "unknown year", "unknown install": "unknown year", "post or during 2002": "2002 onwards", } class FullSapParser: full_address = None archetype = None age_band = None unheated_corridor = None property_type = None built_form = None # ventilation mechanical_ventilation = None cross_ventilation = None night_ventilation = None # dimensions number_of_storeys = None property_dimensions = None # fabric low_energy_lighting = None # Heating heating1 = None cylinder = None cylinder_stat = None def __init__(self, filekey, bucket_name=None): self.s3_client = boto3.client('s3') self.bucket_name = bucket_name self.filekey = filekey self.full_sap = None self._read_file() def _read_file(self): """ Reads the XML file either locally or from S3 and parses it using minidom. Raises: ValueError: If the file cannot be found, read, or parsed. """ try: if self.bucket_name: # Read from S3 response = self.s3_client.get_object(Bucket=self.bucket_name, Key=self.filekey) xml_content = response['Body'].read() else: # Read locally with open(self.filekey, "r") as f: xml_content = f.read() # Parse the XML content using minidom self.full_sap = parseString(xml_content) except FileNotFoundError: raise ValueError(f"Local file not found: {self.filekey}") except Exception as e: raise ValueError(f"An error occurred while reading or parsing the XML: {e}") def extract(self, _return=True): self.get_address() self.get_archetype() self.get_age_band() self.get_unheated_corridor() self.get_heating_1() self.get_ventilation() self.get_floor_area() self.get_low_energy_lighting() self.get_cylinder() if _return: return { "Property Type": self.property_type, "Built Form": self.built_form, "Age Band": self.age_band, } def get_address(self): if not self.full_sap: raise ValueError("You need to read the file first") address = self.full_sap.getElementsByTagName("AddressAsDesigned") if len(address) != 1: raise ValueError("Non-unique address tag found - investigate me") address = address[0] data = {} for node in address.childNodes: if node.nodeType == node.ELEMENT_NODE: data[node.nodeName] = node.firstChild.nodeValue if node.firstChild else None self.full_address = " ".join( [ x.title() for x in [data["AddressLine1"], data["AddressLine2"], data["AddressLine3"], data["Town"]] if x is not None ] ) + " " + data["Postcode"] def get_archetype(self): if not self.full_sap: raise ValueError("You need to read the file first") property_type1 = self.full_sap.getElementsByTagName('PropertyType1') property_type2 = self.full_sap.getElementsByTagName('PropertyType2') position_of_flat = self.full_sap.getElementsByTagName('PositionOfFlat') if len(property_type1) != 1 or len(property_type2) != 1: raise ValueError("Non-unique property tag found - investigate me") property_type1 = property_type1[0].firstChild.nodeValue property_type2 = property_type2[0].firstChild.nodeValue if position_of_flat[0].firstChild: position_of_flat = POSITION_OF_FLAT[position_of_flat[0].firstChild.nodeValue] else: position_of_flat = None self.property_type = property_type1 self.built_form = property_type2 self.archetype = property_type1 + " - " + property_type2 if position_of_flat: self.archetype = self.archetype + " " + position_of_flat def get_age_band(self): if not self.full_sap: raise ValueError("You need to read the file first") property_age_band = self.full_sap.getElementsByTagName('PropertyAgeBand') if len(property_age_band) != 1: raise ValueError("Non-unique property age band tag found - investigate me") property_age_band = property_age_band[0].firstChild.nodeValue self.age_band = PROPERTY_AGE_BAND[property_age_band] def get_wall_area_for_description(self, description): wall_recs = self.full_sap.getElementsByTagName("WallRec") for wall_rec in wall_recs: desc_elements = wall_rec.getElementsByTagName("Description") if desc_elements and desc_elements[0].firstChild.data == description: area_elements = wall_rec.getElementsByTagName("Area") if area_elements: area = float(area_elements[0].firstChild.data) # Placeholder for wall_description which you'll populate later return f"Unheated corridor - {area} area" return None def get_unheated_corridor(self): """ Unheated corridors don't always exist so we'll need to search for it :return: """ if not self.full_sap: raise ValueError("You need to read the file first") self.unheated_corridor = self.get_wall_area_for_description("Flat corridor Main") def get_heating_1(self): if not self.full_sap: raise ValueError("You need to read the file first") main_heating_system = self.full_sap.getElementsByTagName('MainHeatingSystem1') if len(main_heating_system) != 1: raise ValueError("Non-unique main heating system tag found - investigate me") main_heating_system = main_heating_system[0] mhs = main_heating_system.getElementsByTagName('MHS')[0].firstChild.nodeValue mhs = MAINHEATING_LOOKUP.get(mhs, mhs) fraction = main_heating_system.getElementsByTagName('Fraction')[0].firstChild.nodeValue self.heating1 = f"{mhs} : {fraction}% of heating" def get_ventilation(self): bool_lookup = { "true": True, "false": False } # Extract MechanicalVentilationDecentralised mech_vent = self.full_sap.getElementsByTagName("MechanicalVentilationDecentralised") if mech_vent and mech_vent[0].childNodes: mech_vent_value = mech_vent[0].firstChild.nodeValue else: mech_vent_value = None # Extract CrossVentilation cross_vent = self.full_sap.getElementsByTagName("CrossVentilation") if cross_vent and cross_vent[0].childNodes: cross_vent_value = cross_vent[0].firstChild.nodeValue cross_vent_value = bool_lookup.get(cross_vent_value, cross_vent_value) else: cross_vent_value = None # Extract NightVentilation night_vent = self.full_sap.getElementsByTagName("NightVentilation") if night_vent and night_vent[0].childNodes: night_vent_value = night_vent[0].firstChild.nodeValue night_vent_value = bool_lookup.get(night_vent_value, night_vent_value) else: night_vent_value = None # Create the outputs self.mechanical_ventilation = "Mechanical ventilation present" if mech_vent_value else "No mechanical " \ "ventilation" self.cross_ventilation = "Cross ventilation present" if cross_vent_value else "No cross ventilation" self.night_ventilation = "Night ventilation present" if night_vent_value else "No night ventilation" def get_floor_area(self): self.number_of_storeys = int(self.full_sap.getElementsByTagName('NumberOfStoreys')[0].firstChild.nodeValue) storeys = self.full_sap.getElementsByTagName('StoreyMeasurementRec') # TODO: The first StoreyMeasurementRec tag looks like this in the examples we've seen: # # Indicating that the tag is explicitly indicated as empty storey_data = [] storey_index = -1 for storey in storeys: storey_index += 1 if storey.getAttribute("xsi:nil") == "true": continue if storey_index == -1: raise NotImplementedError( "Investigated me - potentially basement found but need to confirm with Basement tag" ) floor_area = storey.getElementsByTagName('InternalFloorArea') if not floor_area: continue floor_area = float(floor_area[0].firstChild.nodeValue) # If floor area is 0, skip this storey if not floor_area: continue perimeter = float(storey.getElementsByTagName('InternalPerimeter')[0].firstChild.nodeValue) height = float(storey.getElementsByTagName('StoreyHeight')[0].firstChild.nodeValue) storey_data.append({ "storey_index": storey_index, "Floor Area": floor_area, "Perimeter": perimeter, "Height": height }) # We will convert this into a table in the markdown self.property_dimensions = storey_data def get_low_energy_lighting(self): # Extract the values of the LightFittings and LELFittings tags light_fittings = self.full_sap.getElementsByTagName('LightFittings')[0].firstChild.data lel_fittings = self.full_sap.getElementsByTagName('LELFittings')[0].firstChild.data # Construct the string message self.low_energy_lighting = f"{lel_fittings} out of {light_fittings} lighting fittings are low energy." def get_cylinder(self): insulation_type = self.full_sap.getElementsByTagName('InsulationType')[0].firstChild.data insulation_thickness = self.full_sap.getElementsByTagName('InsulationThickness')[0].firstChild.data if insulation_type and insulation_thickness: self.cylinder = f"Insulated, {insulation_type}: {insulation_thickness}mm." else: self.cylinder = "Not insulated." self.cylinder_stat = self.full_sap.getElementsByTagName('CylinderStat')[0].firstChild.data