xml extraction wip

This commit is contained in:
Khalim Conn-Kowlessar 2024-07-24 18:46:39 +01:00
parent 1d642e71e3
commit 3257485248
4 changed files with 1794 additions and 1 deletions

View file

@ -0,0 +1,540 @@
import re
import usaddress
from xml.dom.minidom import parseString
from backend.app.utils import sap_to_epc
from etl.xml_survey_extraction.pcdb import heating_data
PROPERTY_TYPE_LOOKUP = {
"0": "House",
"House": "House",
}
def get_house_number(address: str) -> str | None:
"""
This method will use the usaddress library to parse an address and extract the house number
:return:
"""
parsed = usaddress.parse(address)
parsed_house_number = [x for x in parsed if (x[1] == "AddressNumber")]
parsed_house_number = parsed_house_number[0][0] if parsed_house_number else None
if parsed_house_number is None:
# Because usaddress isn't optimal for parsing addresses with some prefixes such as 'Flat',
# we also add a custom approach
# Pattern to look for 'Flat' or 'Apartment' followed by a number, or just a number at the beginning
pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)'
match = re.search(pattern, address)
if match:
# Return the first non-None group found
return next(g for g in match.groups() if g is not None)
else:
return None
# Remove training commas
parsed_house_number = parsed_house_number.replace(",", "")
return parsed_house_number
class XmlParser:
uprn = None
property_type = None
current_energy_efficiency = None
current_energy_rating = None
# heating/emissions information
space_heating_kwh = None
water_heating_kwh = None
co2_emissions_current = None
heating_cost_current = None
hot_water_cost_current = None
lighting_cost_current = None
energy_consumption_current = None
heating_system = None
heating_controls = None
# Assessor details
surveyor_name = None
# Addresses
address1 = None
address2 = None
address3 = None
posttown = None
postcode = None
address = None
# Dates
survey_date = None
# Building Fabric
# Walls
walls_description = None
walls_classification = None
walls_energy_rating = None
# Roof
roof_description = None
roof_energy_rating = None
is_loft = None
# Floor
floor_description = None
floor_energy_rating = None
# Windows
windows_description = None
windows_energy_rating = None
# main heating
main_heating_description = None
main_heating_energy_rating = None
# Heating controls
main_heating_controls_description = None
main_heating_controls_energy_rating = None
# Hot water
hot_water_description = None
hot_water_energy_rating = None
# Lighting
lighting_description = None
lighting_energy_rating = None
# Second Heating
second_heating_description = None
second_heating_energy_rating = None
number_of_doors = None
number_of_insulated_doors = None
photo_supply = None
# Property dimensions
number_of_floors = None
perimeter = None
heat_loss_perimeter = None
party_wall_length = None
total_floor_area = None
ground_floor_area = None
is_there_party_wall = None
floor_height = None
insulation_wall_area = None
rrn = None
database_data = None
# We assume that the insulation wall area is 85% of the total wall area, as a standard estimate
INSULATION_WALL_AREA_FACTOR = 0.85
# The value of the URPN tells us about the file type that we're parsing
UPRN_FILETYPE_MAP = {
0: "EPR",
-1: "RDSAP_EPR"
}
RATINGS_MAP = {
"0": "N/A",
"1": "Very Poor",
"2": "Poor",
"3": "Average",
"4": "Good",
"5": "Very Good"
}
def __init__(self, file, filekey, uprn=None):
file.seek(0) # Ensure the file pointer is at the beginning
xml_string = file.read().decode('utf-8')
self.xml = parseString(xml_string)
self.filekey = filekey
# The xml parser is use to parse the EPC and EPR xmls and different file types will contain different
# information
# In order to identify the file type, we can look for the presence of the 'UPRN' tag
# If the UPRN tag is present, we can assume that the file is an EPC
# If the UPRN tag is not present, we can assume that the file is an EPR
self.get_uprn()
self.file_type = self.UPRN_FILETYPE_MAP.get(self.uprn, "EPC")
@staticmethod
def get_node(node):
"""
Utility function to get the node value from the xml, where data might be optional
:return:
"""
node_first_child = node.firstChild
if node_first_child is None:
return None
return node_first_child.nodeValue
def run(self):
if self.file_type == "RDSAP_EPR":
# This file type contains just limited information compared to a regular EPR/EPC, and so we just exit
# unless we learn something else that determines that we need information from this file
return
self.get_property_type()
self.get_sap()
self.get_property_address()
self.get_dates()
self.get_assessor_details()
self.get_heating_and_emissions_data()
self.get_detailed_heating_specs()
# Building fabric
self.get_walls()
self.get_roof()
self.get_floor()
self.get_windows()
self.get_heating()
self.get_hot_water()
self.get_lighting()
self.get_doors()
self.get_photo_supply()
# Property dimensions
self.get_property_dimensions()
def get_uprn(self, uprn):
if uprn is not None:
self.uprn = uprn
return
uprn_tag = self.xml.getElementsByTagName('UPRN')[0].firstChild
if uprn_tag is None:
self.uprn = -1
return
self.uprn = uprn_tag.nodeValue
# If all of the characters in the UPRN are 0, then there is not set UPRN
if self.uprn.count("0") == len(self.uprn):
self.uprn = 0
else:
self.uprn = self.uprn.lower().split("uprn-")[1]
def get_property_type(self):
if not self.xml:
raise ValueError("You need to read the file first")
property_type = self.xml.getElementsByTagName('Property-Type')
if not property_type:
property_type = self.xml.getElementsByTagName('PropertyType1')
self.property_type = PROPERTY_TYPE_LOOKUP[property_type[0].firstChild.nodeValue]
def get_sap(self):
sap_score = self.xml.getElementsByTagName('Energy-Rating-Current')
sap_score = int(sap_score[0].firstChild.nodeValue)
epc_rating = sap_to_epc(sap_score)
self.current_energy_efficiency = str(sap_score)
self.current_energy_rating = epc_rating
def get_heating_and_emissions_data(self):
"""
This method will extract the following pieces of information:
1) Space heating requirement
2) Water heating requirement
3) CO2 emissions
4) Heat demand per square meter per year
5) Bills
:return:
"""
self.space_heating_kwh = self.xml.getElementsByTagName(
'Space-Heating-Existing-Dwelling'
)[0].firstChild.nodeValue
self.water_heating_kwh = self.xml.getElementsByTagName('Water-Heating')[0].firstChild.nodeValue
self.co2_emissions_current = self.xml.getElementsByTagName('CO2-Emissions-Current')[0].firstChild.nodeValue
self.heating_cost_current = self.xml.getElementsByTagName('Heating-Cost-Current')[0].firstChild.nodeValue
self.hot_water_cost_current = self.xml.getElementsByTagName('Hot-Water-Cost-Current')[0].firstChild.nodeValue
self.lighting_cost_current = self.xml.getElementsByTagName('Lighting-Cost-Current')[0].firstChild.nodeValue
self.energy_consumption_current = (
self.xml.getElementsByTagName("Energy-Consumption-Current")[0].firstChild.nodeValue
)
def get_detailed_heating_specs(self):
"""
Given the heating data that is found in the <SAP-Heating> tag, we extract the detailed about the heating
system
:return:
"""
sap_main_heating_details = (
self.xml.getElementsByTagName('SAP-Heating')[0]
.getElementsByTagName("Main-Heating-Details")[0]
.getElementsByTagName("Main-Heating")[0]
)
heating_code = sap_main_heating_details.getElementsByTagName("SAP-Main-Heating-Code")[0].firstChild.nodeValue
# Get the heating system
heating_system = heating_data[heating_data["code"] == int(heating_code)]["description"]
heating_system = heating_system.values[0] if not heating_system.empty else f"Heating code: {heating_code}"
# Get the heating controls
heating_controls_code = (
sap_main_heating_details.getElementsByTagName("Main-Heating-Control")[0].firstChild.nodeValue
)
heating_controls = heating_data[heating_data["code"] == int(heating_controls_code)]["description"]
heating_controls = (
heating_controls.values[0] if not heating_controls.empty else f"Heating Controls code: {heating_code}"
)
self.heating_system = heating_system
self.heating_controls = heating_controls
def get_walls(self):
wall_xml_data = self.xml.getElementsByTagName('Property-Summary')[0].getElementsByTagName('Wall')[0]
self.walls_description = (
wall_xml_data
.getElementsByTagName("Description")[0]
.firstChild.nodeValue
)
self.walls_energy_rating = (
wall_xml_data
.getElementsByTagName("Energy-Efficiency-Rating")[0]
.firstChild.nodeValue
)
is_cavity = "cavity wall" in self.walls_description.lower()
is_empty = "no insulation" in self.walls_description.lower()
is_partial = "partial insulation" in self.walls_description.lower()
if not is_cavity:
self.walls_classification = "NON CAVITY"
return
if is_empty:
self.walls_classification = "EMPTY"
return
if is_partial:
self.walls_classification = "PARTIAL"
return
if is_cavity and not is_empty and not is_partial:
self.walls_classification = "FULL"
return
raise NotImplementedError("Implement me")
def get_roof(self):
room_xml_data = self.xml.getElementsByTagName('Property-Summary')[0].getElementsByTagName('Roof')[0]
self.roof_description = (
room_xml_data
.getElementsByTagName("Description")[0]
.firstChild.nodeValue
)
self.roof_energy_rating = (
room_xml_data
.getElementsByTagName("Energy-Efficiency-Rating")[0]
.firstChild.nodeValue
)
loft_recommendation_tag = self.xml.getElementsByTagName("Impact-Of-Loft-Insulation")
description_contains_loft = "loft" in self.roof_description.lower()
if not loft_recommendation_tag and not description_contains_loft:
self.is_loft = "No"
return
self.is_loft = "Yes"
return
def get_floor(self):
floor_xml_data = self.xml.getElementsByTagName('Property-Summary')[0].getElementsByTagName('Floor')[0]
self.floor_description = (
floor_xml_data
.getElementsByTagName("Description")[0]
.firstChild.nodeValue
)
self.floor_energy_rating = (
floor_xml_data
.getElementsByTagName("Energy-Efficiency-Rating")[0]
.firstChild.nodeValue
)
def get_windows(self):
windows_xml_data = self.xml.getElementsByTagName('Property-Summary')[0].getElementsByTagName('Window')[0]
self.windows_description = (
windows_xml_data
.getElementsByTagName("Description")[0]
.firstChild.nodeValue
)
self.windows_energy_rating = (
windows_xml_data
.getElementsByTagName("Energy-Efficiency-Rating")[0]
.firstChild.nodeValue
)
def get_heating(self):
"""
This function will retrieve the main heating and the main heating controls
:return:
"""
mainheating_xml_data = self.xml.getElementsByTagName('Main-Heating')[0]
self.main_heating_description = (
mainheating_xml_data.getElementsByTagName('Description')[0].firstChild.nodeValue
)
self.main_heating_energy_rating = (
mainheating_xml_data.getElementsByTagName('Energy-Efficiency-Rating')[0].firstChild.nodeValue
)
mainheating_controls_xml_data = self.xml.getElementsByTagName('Main-Heating-Controls')[0]
self.main_heating_controls_description = (
mainheating_controls_xml_data.getElementsByTagName('Description')[0].firstChild.nodeValue
)
self.main_heating_controls_energy_rating = (
mainheating_controls_xml_data.getElementsByTagName('Energy-Efficiency-Rating')[0].firstChild.nodeValue
)
second_heating_xml_data = self.xml.getElementsByTagName('Secondary-Heating')[0]
self.second_heating_description = (
second_heating_xml_data.getElementsByTagName('Description')[0].firstChild.nodeValue
)
self.second_heating_energy_rating = (
second_heating_xml_data.getElementsByTagName('Energy-Efficiency-Rating')[0].firstChild.nodeValue
)
def get_hot_water(self):
hot_water_xml_data = self.xml.getElementsByTagName('Hot-Water')[0]
self.hot_water_description = (
hot_water_xml_data.getElementsByTagName('Description')[0].firstChild.nodeValue
)
self.hot_water_energy_rating = (
hot_water_xml_data.getElementsByTagName('Energy-Efficiency-Rating')[0].firstChild.nodeValue
)
def get_lighting(self):
lighting_xml_data = self.xml.getElementsByTagName('Lighting')[0]
self.lighting_description = (
lighting_xml_data.getElementsByTagName('Description')[0].firstChild.nodeValue
)
self.lighting_energy_rating = (
lighting_xml_data.getElementsByTagName('Energy-Efficiency-Rating')[0].firstChild.nodeValue
)
def get_doors(self):
# Doors can be found in the SAP-Property-Details tag
self.number_of_doors = int(
self.xml.getElementsByTagName('SAP-Property-Details')[0]
.getElementsByTagName('Door-Count')[0]
.firstChild.nodeValue
)
self.number_of_insulated_doors = int(
self.xml.getElementsByTagName('SAP-Property-Details')[0]
.getElementsByTagName('Insulated-Door-Count')[0]
.firstChild.nodeValue
)
def get_photo_supply(self):
self.photo_supply = float(
self.xml.getElementsByTagName('Photovoltaic-Supply')[0]
.getElementsByTagName('Percent-Roof-Area')[0]
.firstChild.nodeValue
)
def get_assessor_details(self):
energy_assessor_tag = self.xml.getElementsByTagName('Energy-Assessor')[0]
self.surveyor_name = (
energy_assessor_tag.getElementsByTagName("Name")[0].firstChild.nodeValue
)
def get_property_address(self):
property_tag = self.xml.getElementsByTagName("Property")[0]
self.address1 = self.get_node(property_tag.getElementsByTagName("Address-Line-1")[0])
self.address2 = self.get_node(property_tag.getElementsByTagName("Address-Line-2")[0])
self.address3 = self.get_node(property_tag.getElementsByTagName("Address-Line-3")[0])
self.posttown = self.get_node(property_tag.getElementsByTagName("Post-Town")[0])
self.postcode = self.get_node(property_tag.getElementsByTagName("Postcode")[0])
self.address = ", ".join(
[x for x in [self.address1, self.address2, self.address3, self.posttown, self.postcode] if x is not None]
)
def get_dates(self):
self.survey_date = (
self.xml.getElementsByTagName("Inspection-Date")[0].firstChild.nodeValue
)
def get_property_dimensions(self):
"""
This function will extract the relevant property dimensions including the floor area,
number of floors, perimeter, party wall length and the insulation_wall_area.
insulation_wall_area is typically simplified down to perimeter * height * 0.85
:return:
"""
# Each floor has its own SAP-Floor-Dimension tag
floor_dimensions = (
self.xml.getElementsByTagName("SAP-Floor-Dimensions")[0]
.getElementsByTagName("SAP-Floor-Dimension")
)
self.number_of_floors = len(floor_dimensions)
self.heat_loss_perimeter = float(
floor_dimensions[0].getElementsByTagName("Heat-Loss-Perimeter")[0].firstChild.nodeValue
)
self.party_wall_length = float(
floor_dimensions[0].getElementsByTagName("Party-Wall-Length")[0].firstChild.nodeValue
)
party_wall_construction_tag = (
self.xml.getElementsByTagName("Party-Wall-Construction")[0].firstChild.nodeValue.replace("\n", "").strip()
)
self.is_there_party_wall = (
"Yes" if (self.party_wall_length > 0) or (party_wall_construction_tag != "") else "No"
)
# We pull out all of the floor areas
floor_areas = [
float(x.getElementsByTagName("Total-Floor-Area")[0].firstChild.nodeValue) for x in floor_dimensions
]
self.total_floor_area = sum(floor_areas)
self.ground_floor_area = floor_areas[0]
self.floor_height = float(
floor_dimensions[0]
.getElementsByTagName("Room-Height")[0]
.firstChild.nodeValue
)
self.insulation_wall_area = self.heat_loss_perimeter * self.floor_height * self.INSULATION_WALL_AREA_FACTOR
self.perimeter = self.heat_loss_perimeter + self.party_wall_length

View file

@ -1,3 +1,16 @@
from utils.s3 import read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder
from utils.logger import setup_logger
from etl.xml_survey_extraction.XmlParser import XmlParser
import os
from io import BytesIO
logger = setup_logger()
SURVEYORS = "JAFFERSONS ENERGY CONSULTANTS"
PROJECT_CODE = "VDE001"
BUCKET = "retrofit-energy-assessments-dev"
def main():
"""
This function executes the main process, which will retrieve data from the specified locations, extract the data
@ -6,4 +19,32 @@ def main():
"""
# TODO: Build solution to get this data from Onedrive and store what we need in S3
# In s3, we have a bucket called retrofit-energy-assessments-{stage} which
# In s3, we have a bucket called retrofit-energy-assessments-{stage} which contains the data we need
# The data is stored in a folder called {surveyors}/{project_code}/{uprn}
# We'll need to get the uprn from the folder name, which we can do with EpcSearcher class
#
energy_assessments = list_files_and_subfolders_in_s3_folder(
bucket_name=BUCKET, folder_name=f"{SURVEYORS}/{PROJECT_CODE}/"
)
logger.info(f"Found {len(energy_assessments)} energy assessments for {SURVEYORS} and {PROJECT_CODE}")
assessments_map = {}
for assessment in energy_assessments:
uploaded_xmls = list_xmls_in_s3_folder(
bucket_name=BUCKET, folder_name=os.path.join(assessment, "docs & plans")
)
uprn = int(assessment.rstrip("/").split("/")[-1])
assessments_map[uprn] = uploaded_xmls
logger.info(f"Exatracted XMLS for the energy assessments")
# For each property, we download the xmls and extract the data
for uprn, xmls in assessments_map.items():
extracted_data = {}
for xml in xmls:
xml_data = read_from_s3(bucket_name=BUCKET, s3_file_name=xml)
xml_data_io = BytesIO(xml_data)
xml_parser = XmlParser(file=xml_data_io, filekey=xml, uprn=uprn)
xml_parser.run()
logger.info(f"Extracted data from {xml}")

File diff suppressed because it is too large Load diff

View file

@ -276,3 +276,86 @@ def list_files_in_s3_folder(bucket_name, folder_name):
except Exception as e:
logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []
def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name):
"""
List all files and immediate subfolders in a given folder in an S3 bucket.
E.g. if we have a folder structure in S3 like this:
- folder1/
- file1.csv
- file2.csv
- subfolder1/
- file3.csv
Then calling list_files_and_subfolders_in_s3_folder(bucket_name='my-bucket', folder_name='folder1/')
would return ['folder1/file1.csv', 'folder1/file2.csv', 'folder1/subfolder1/'].
Namely, the nested files are not included in the list, only the immediate files and subfolders.
:param bucket_name: The name of the S3 bucket.
:param folder_name: The folder name within the S3 bucket.
:return: A list of file keys and subfolder prefixes in the specified S3 folder.
"""
# For this function, folder_name should end with a forward slash
if not folder_name.endswith('/'):
folder_name += '/'
try:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name, Delimiter='/')
items = []
# Add files to the list
if 'Contents' in response:
items.extend([content['Key'] for content in response['Contents'] if content['Key'] != folder_name])
# Add immediate subfolders to the list
if 'CommonPrefixes' in response:
items.extend([prefix['Prefix'] for prefix in response['CommonPrefixes']])
return items
except NoCredentialsError:
logger.error("Credentials not available.")
return []
except PartialCredentialsError:
logger.error("Incomplete credentials provided.")
return []
except Exception as e:
logger.error(f'Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []
def list_xmls_in_s3_folder(bucket_name, folder_name):
"""
List all XML files in a given folder in an S3 bucket.
:param bucket_name: The name of the S3 bucket.
:param folder_name: The folder name within the S3 bucket.
:return: A list of XML file keys in the specified S3 folder.
"""
try:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
if 'Contents' not in response:
logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.")
return []
# Filter XML files
xml_files = [content['Key'] for content in response['Contents'] if content['Key'].endswith('.xml')]
return xml_files
except NoCredentialsError:
logger.error("Credentials not available.")
return []
except PartialCredentialsError:
logger.error("Incomplete credentials provided.")
return []
except Exception as e:
logger.error(f'Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []