mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
314 lines
9.7 KiB
Python
314 lines
9.7 KiB
Python
"""
|
|
This script produces the dataset used to model the wall area of properties, which is used to estimate the cost
|
|
of insulation measures within homes
|
|
"""
|
|
import os
|
|
import boto3
|
|
import PyPDF2
|
|
import re
|
|
import json
|
|
from epc_api.client import EpcClient
|
|
from io import BytesIO
|
|
from datetime import datetime
|
|
|
|
bucket = "retrofit-datalake-dev"
|
|
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", None)
|
|
|
|
|
|
def list_files_in_s3_folder(bucket_name, folder_name):
|
|
"""
|
|
List files in a specific S3 bucket and folder.
|
|
|
|
Parameters:
|
|
- bucket_name: Name of the S3 bucket.
|
|
- folder_name: Name of the folder (prefix) within the bucket.
|
|
|
|
Returns:
|
|
- A list of file names within the specified folder.
|
|
"""
|
|
|
|
# Ensure folder name ends with a '/'
|
|
if not folder_name.endswith('/'):
|
|
folder_name += '/'
|
|
|
|
s3_client = boto3.client('s3')
|
|
|
|
# Initialize empty list to store file names
|
|
files = []
|
|
|
|
# Initialize paginator
|
|
paginator = s3_client.get_paginator('list_objects_v2')
|
|
|
|
# Create a PageIterator from the Paginator
|
|
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_name)
|
|
|
|
for page in page_iterator:
|
|
# Extract file names from the current page and append to the list
|
|
files.extend([item['Key'] for item in page.get('Contents', [])])
|
|
|
|
return files
|
|
|
|
|
|
def fetch_and_parse_pdf_from_s3(bucket_name, filename):
|
|
"""
|
|
Fetch a PDF from an S3 bucket and parse its content.
|
|
|
|
Parameters:
|
|
- bucket_name: Name of the S3 bucket.
|
|
- pdf_key: Path (key) of the PDF file within the bucket.
|
|
|
|
Returns:
|
|
- text: Extracted text from the PDF.
|
|
"""
|
|
|
|
s3_client = boto3.client('s3')
|
|
response = s3_client.get_object(Bucket=bucket_name, Key=filename)
|
|
|
|
# Create a BytesIO object from the PDF bytes
|
|
pdf_content = BytesIO(response['Body'].read())
|
|
|
|
# Use PyPDF2 to read the PDF content
|
|
reader = PyPDF2.PdfReader(pdf_content)
|
|
|
|
# Extract text from each page
|
|
pages = []
|
|
for page_num in range(len(reader.pages)):
|
|
page = reader.pages[page_num]
|
|
|
|
text = page.extract_text()
|
|
text = remove_excess_newlines(text)
|
|
pages.append(text.split("\n"))
|
|
|
|
return pages
|
|
|
|
|
|
def fetch_json_from_s3(bucket_name, file_name):
|
|
# Create an S3 client
|
|
s3 = boto3.client('s3')
|
|
|
|
# Fetch the file from S3
|
|
response = s3.get_object(Bucket=bucket_name, Key=file_name)
|
|
|
|
# Parse and return the JSON data
|
|
return json.loads(response['Body'].read().decode('utf-8'))
|
|
|
|
|
|
def write_json_to_s3(bucket_name, file_name, json_data):
|
|
"""
|
|
Write JSON data to a file in an S3 bucket.
|
|
|
|
Parameters:
|
|
- bucket_name: Name of the S3 bucket.
|
|
- file_name: Path (key) of the file within the bucket.
|
|
- json_data: JSON data to be saved.
|
|
"""
|
|
|
|
s3_client = boto3.client('s3')
|
|
|
|
# Convert the JSON data to a string
|
|
json_string = json.dumps(json_data)
|
|
|
|
# Upload the JSON string to S3
|
|
s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json_string)
|
|
|
|
|
|
def check_s3_file_exists(bucket_name, file_name):
|
|
"""
|
|
Check if a file exists in an S3 bucket.
|
|
|
|
Parameters:
|
|
- bucket_name: Name of the S3 bucket.
|
|
- file_name: Path (key) of the file within the bucket.
|
|
|
|
Returns:
|
|
- bool: True if the file exists, False otherwise.
|
|
"""
|
|
|
|
s3_client = boto3.client('s3')
|
|
|
|
try:
|
|
# Check if the object exists by attempting to retrieve its metadata
|
|
s3_client.head_object(Bucket=bucket_name, Key=file_name)
|
|
return True
|
|
except s3_client.exceptions.ClientError as e:
|
|
# If the error code is 404 (Not Found), then the file doesn't exist
|
|
if e.response['Error']['Code'] == '404':
|
|
return False
|
|
# If there's any other exception, raise it
|
|
raise
|
|
|
|
|
|
def remove_excess_newlines(text):
|
|
return re.sub('\n+', '\n', text).strip()
|
|
|
|
|
|
def search_pages(pages, search_term) -> (
|
|
str | None, int | None, int | None
|
|
):
|
|
"""
|
|
This method looks for a search term in the EPR and returns the first instance of it
|
|
:param pages: list of pages to search through
|
|
:param search_term: The term to search for
|
|
:return: The text, page number and page index of the first instance of the search term
|
|
"""
|
|
|
|
to_page = len(pages)
|
|
from_page = 0
|
|
from_index = 0
|
|
|
|
for page_num in range(from_page, to_page + 1):
|
|
|
|
page_to_index = len(pages[page_num])
|
|
|
|
for page_index in range(from_index, page_to_index):
|
|
if search_term in pages[page_num][page_index]:
|
|
return pages[page_num][page_index], page_num, page_index
|
|
|
|
return None, None, None
|
|
|
|
|
|
def check_page(pages, page_num, page_index):
|
|
if page_num > len(pages):
|
|
return False
|
|
|
|
if page_index > len(pages[page_num]):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def extract_areas():
|
|
"""
|
|
This function extracts the net and gross wall areas from the pdf sap calculation files
|
|
"""
|
|
files = list_files_in_s3_folder(bucket, "full_sap_calculations")
|
|
|
|
# get pdfs
|
|
sap_calulation_pdfs = [file for file in files if file.endswith(".pdf")]
|
|
|
|
# For each pdf, we pull out the net & gross wall areas
|
|
if check_s3_file_exists(bucket_name=bucket, file_name="wall-area-data/wall-area.json"):
|
|
data = fetch_json_from_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json")
|
|
data = json.loads(data)
|
|
else:
|
|
data = []
|
|
|
|
used_files = [x["filename"] for x in data]
|
|
|
|
sap_calulation_pdfs = [filename for filename in sap_calulation_pdfs if filename.split("/")[-1] not in used_files]
|
|
|
|
for sap_calculation_file in sap_calulation_pdfs:
|
|
|
|
# Download pdf
|
|
pdf_pages = fetch_and_parse_pdf_from_s3(bucket, sap_calculation_file)
|
|
|
|
# We search for net and gross wall areas
|
|
result = search_pages(pdf_pages, "External walls Main")[0]
|
|
# This is a row in a table where the columns are:
|
|
# Element, Gross, Openings, NetArea, U-value, A x U, K-value, A x K
|
|
# The values we're interested in are Gross and NetArea
|
|
values = result.split("External walls Main")[1].strip().split(" ")
|
|
# Remove the empty white space - we should now have the fields we want
|
|
values = [v for v in values if v]
|
|
gross_area = float(values[0])
|
|
net_area = float(values[2])
|
|
|
|
# Search for property identifiers
|
|
_, pagenum, page_idx = search_pages(pdf_pages, 'Prop Type Ref')
|
|
if pagenum != 0:
|
|
raise ValueError("Property reference not found on the first page")
|
|
# the reference will be on the next line
|
|
property_reference = pdf_pages[pagenum][page_idx + 1]
|
|
property_reference_number = pdf_pages[pagenum][page_idx + 2]
|
|
address = pdf_pages[pagenum][page_idx + 4]
|
|
|
|
# Search for issued date - the date appears in the field before
|
|
_, date_pagenum, date_page_idx = search_pages(pdf_pages, 'Issued on Date')
|
|
issued_date = pdf_pages[date_pagenum][date_page_idx + -1]
|
|
|
|
data.append(
|
|
{
|
|
"property_reference": property_reference,
|
|
"reference_number": property_reference_number,
|
|
"address": address,
|
|
"gross_area": gross_area,
|
|
"net_area": net_area,
|
|
"filename": sap_calculation_file,
|
|
"issued_date": issued_date,
|
|
}
|
|
)
|
|
|
|
write_json_to_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json", json_data=json.dumps(data))
|
|
|
|
return data
|
|
|
|
|
|
def combine_area_data(area_data=None):
|
|
"""
|
|
This function will merge the area data onto additional features which are
|
|
:param area_data: list of dictionaries, containing the areas and the
|
|
"""
|
|
|
|
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
|
|
|
|
model_data = []
|
|
for area_config in area_data:
|
|
address = area_config["address"]
|
|
|
|
# The addresses seems to have this structure:
|
|
# <house number>, <street name>, <area (optional)>, <postcode>"
|
|
# Where area is not always in the address
|
|
address_destructured = address.split(",")
|
|
house_number = address_destructured[0].strip()
|
|
street_name = address_destructured[1].strip()
|
|
postcode = address_destructured[-1].strip()
|
|
|
|
# Fetch epc data
|
|
epc_response = epc_client.domestic.search(
|
|
params={
|
|
"postcode": postcode,
|
|
"address": ", ".join([house_number, street_name]),
|
|
}
|
|
)
|
|
|
|
epc_data = epc_response["rows"]
|
|
if len(epc_data) == 0:
|
|
raise ValueError("No EPC data - investigate me")
|
|
|
|
if len(epc_data) > 1:
|
|
issued_date = datetime.strptime(area_config["issued_date"], '%d/%m/%Y')
|
|
# We get the epc data closest to the issued date. On the edge case that we have two EPC records that are
|
|
# equally far away from the issued_date, we take the most recent EPC record
|
|
# We sort on lodgement date
|
|
epc_data = sorted(
|
|
epc_data, key=lambda x: datetime.strptime(x['lodgement-date'], '%Y-%m-%d'), reverse=True
|
|
)
|
|
|
|
days_since = [
|
|
abs((datetime.strptime(x["lodgement-date"], '%Y-%m-%d') - issued_date).days) for x in epc_data
|
|
]
|
|
# find the locaton of the closest
|
|
closest_index = [i for i, days in enumerate(days_since) if days == min(days_since)][0]
|
|
# Take just that epc record
|
|
epc_data = [epc_data[closest_index]]
|
|
|
|
model_data.append(
|
|
dict(
|
|
gross_area=area_config["gross_area"],
|
|
net_area=area_config["net_area"],
|
|
**epc_data[0]
|
|
)
|
|
)
|
|
|
|
# Save data
|
|
write_json_to_s3(
|
|
bucket_name=bucket,
|
|
file_name="wall-area-data/wall-area-model-data.json",
|
|
json_data=json.dumps(model_data)
|
|
)
|
|
|
|
|
|
def handler():
|
|
area_data = extract_areas()
|
|
|
|
combine_area_data(area_data)
|