Model/model_data/simulation_system/area_data.py
2023-09-04 12:08:55 +01:00

314 lines
9.7 KiB
Python

"""
This script produces the dataset used to model the wall area of properties, which is used to estimate the cost
of insulation measures within homes
"""
import os
import boto3
import PyPDF2
import re
import json
from epc_api.client import EpcClient
from io import BytesIO
from datetime import datetime
bucket = "retrofit-datalake-dev"
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", None)
def list_files_in_s3_folder(bucket_name, folder_name):
"""
List files in a specific S3 bucket and folder.
Parameters:
- bucket_name: Name of the S3 bucket.
- folder_name: Name of the folder (prefix) within the bucket.
Returns:
- A list of file names within the specified folder.
"""
# Ensure folder name ends with a '/'
if not folder_name.endswith('/'):
folder_name += '/'
s3_client = boto3.client('s3')
# Initialize empty list to store file names
files = []
# Initialize paginator
paginator = s3_client.get_paginator('list_objects_v2')
# Create a PageIterator from the Paginator
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_name)
for page in page_iterator:
# Extract file names from the current page and append to the list
files.extend([item['Key'] for item in page.get('Contents', [])])
return files
def fetch_and_parse_pdf_from_s3(bucket_name, filename):
"""
Fetch a PDF from an S3 bucket and parse its content.
Parameters:
- bucket_name: Name of the S3 bucket.
- pdf_key: Path (key) of the PDF file within the bucket.
Returns:
- text: Extracted text from the PDF.
"""
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket_name, Key=filename)
# Create a BytesIO object from the PDF bytes
pdf_content = BytesIO(response['Body'].read())
# Use PyPDF2 to read the PDF content
reader = PyPDF2.PdfReader(pdf_content)
# Extract text from each page
pages = []
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
text = remove_excess_newlines(text)
pages.append(text.split("\n"))
return pages
def fetch_json_from_s3(bucket_name, file_name):
# Create an S3 client
s3 = boto3.client('s3')
# Fetch the file from S3
response = s3.get_object(Bucket=bucket_name, Key=file_name)
# Parse and return the JSON data
return json.loads(response['Body'].read().decode('utf-8'))
def write_json_to_s3(bucket_name, file_name, json_data):
"""
Write JSON data to a file in an S3 bucket.
Parameters:
- bucket_name: Name of the S3 bucket.
- file_name: Path (key) of the file within the bucket.
- json_data: JSON data to be saved.
"""
s3_client = boto3.client('s3')
# Convert the JSON data to a string
json_string = json.dumps(json_data)
# Upload the JSON string to S3
s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json_string)
def check_s3_file_exists(bucket_name, file_name):
"""
Check if a file exists in an S3 bucket.
Parameters:
- bucket_name: Name of the S3 bucket.
- file_name: Path (key) of the file within the bucket.
Returns:
- bool: True if the file exists, False otherwise.
"""
s3_client = boto3.client('s3')
try:
# Check if the object exists by attempting to retrieve its metadata
s3_client.head_object(Bucket=bucket_name, Key=file_name)
return True
except s3_client.exceptions.ClientError as e:
# If the error code is 404 (Not Found), then the file doesn't exist
if e.response['Error']['Code'] == '404':
return False
# If there's any other exception, raise it
raise
def remove_excess_newlines(text):
return re.sub('\n+', '\n', text).strip()
def search_pages(pages, search_term) -> (
str | None, int | None, int | None
):
"""
This method looks for a search term in the EPR and returns the first instance of it
:param pages: list of pages to search through
:param search_term: The term to search for
:return: The text, page number and page index of the first instance of the search term
"""
to_page = len(pages)
from_page = 0
from_index = 0
for page_num in range(from_page, to_page + 1):
page_to_index = len(pages[page_num])
for page_index in range(from_index, page_to_index):
if search_term in pages[page_num][page_index]:
return pages[page_num][page_index], page_num, page_index
return None, None, None
def check_page(pages, page_num, page_index):
if page_num > len(pages):
return False
if page_index > len(pages[page_num]):
return False
return True
def extract_areas():
"""
This function extracts the net and gross wall areas from the pdf sap calculation files
"""
files = list_files_in_s3_folder(bucket, "full_sap_calculations")
# get pdfs
sap_calulation_pdfs = [file for file in files if file.endswith(".pdf")]
# For each pdf, we pull out the net & gross wall areas
if check_s3_file_exists(bucket_name=bucket, file_name="wall-area-data/wall-area.json"):
data = fetch_json_from_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json")
data = json.loads(data)
else:
data = []
used_files = [x["filename"] for x in data]
sap_calulation_pdfs = [filename for filename in sap_calulation_pdfs if filename.split("/")[-1] not in used_files]
for sap_calculation_file in sap_calulation_pdfs:
# Download pdf
pdf_pages = fetch_and_parse_pdf_from_s3(bucket, sap_calculation_file)
# We search for net and gross wall areas
result = search_pages(pdf_pages, "External walls Main")[0]
# This is a row in a table where the columns are:
# Element, Gross, Openings, NetArea, U-value, A x U, K-value, A x K
# The values we're interested in are Gross and NetArea
values = result.split("External walls Main")[1].strip().split(" ")
# Remove the empty white space - we should now have the fields we want
values = [v for v in values if v]
gross_area = float(values[0])
net_area = float(values[2])
# Search for property identifiers
_, pagenum, page_idx = search_pages(pdf_pages, 'Prop Type Ref')
if pagenum != 0:
raise ValueError("Property reference not found on the first page")
# the reference will be on the next line
property_reference = pdf_pages[pagenum][page_idx + 1]
property_reference_number = pdf_pages[pagenum][page_idx + 2]
address = pdf_pages[pagenum][page_idx + 4]
# Search for issued date - the date appears in the field before
_, date_pagenum, date_page_idx = search_pages(pdf_pages, 'Issued on Date')
issued_date = pdf_pages[date_pagenum][date_page_idx + -1]
data.append(
{
"property_reference": property_reference,
"reference_number": property_reference_number,
"address": address,
"gross_area": gross_area,
"net_area": net_area,
"filename": sap_calculation_file,
"issued_date": issued_date,
}
)
write_json_to_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json", json_data=json.dumps(data))
return data
def combine_area_data(area_data=None):
"""
This function will merge the area data onto additional features which are
:param area_data: list of dictionaries, containing the areas and the
"""
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
model_data = []
for area_config in area_data:
address = area_config["address"]
# The addresses seems to have this structure:
# <house number>, <street name>, <area (optional)>, <postcode>"
# Where area is not always in the address
address_destructured = address.split(",")
house_number = address_destructured[0].strip()
street_name = address_destructured[1].strip()
postcode = address_destructured[-1].strip()
# Fetch epc data
epc_response = epc_client.domestic.search(
params={
"postcode": postcode,
"address": ", ".join([house_number, street_name]),
}
)
epc_data = epc_response["rows"]
if len(epc_data) == 0:
raise ValueError("No EPC data - investigate me")
if len(epc_data) > 1:
issued_date = datetime.strptime(area_config["issued_date"], '%d/%m/%Y')
# We get the epc data closest to the issued date. On the edge case that we have two EPC records that are
# equally far away from the issued_date, we take the most recent EPC record
# We sort on lodgement date
epc_data = sorted(
epc_data, key=lambda x: datetime.strptime(x['lodgement-date'], '%Y-%m-%d'), reverse=True
)
days_since = [
abs((datetime.strptime(x["lodgement-date"], '%Y-%m-%d') - issued_date).days) for x in epc_data
]
# find the locaton of the closest
closest_index = [i for i, days in enumerate(days_since) if days == min(days_since)][0]
# Take just that epc record
epc_data = [epc_data[closest_index]]
model_data.append(
dict(
gross_area=area_config["gross_area"],
net_area=area_config["net_area"],
**epc_data[0]
)
)
# Save data
write_json_to_s3(
bucket_name=bucket,
file_name="wall-area-data/wall-area-model-data.json",
json_data=json.dumps(model_data)
)
def handler():
area_data = extract_areas()
combine_area_data(area_data)