mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
commit
72cb30bcee
4 changed files with 320 additions and 2 deletions
2
.idea/Model.iml
generated
2
.idea/Model.iml
generated
|
|
@ -7,7 +7,7 @@
|
|||
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
|
||||
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
|
||||
</content>
|
||||
<orderEntry type="jdk" jdkName="Python 3.10 (backend)" jdkType="Python SDK" />
|
||||
<orderEntry type="jdk" jdkName="Python 3.10 (area_data)" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
||||
2
.idea/misc.xml
generated
2
.idea/misc.xml
generated
|
|
@ -1,6 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (backend)" project-jdk-type="Python SDK" />
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (area_data)" project-jdk-type="Python SDK" />
|
||||
<component name="PythonCompatibilityInspectionAdvertiser">
|
||||
<option name="version" value="3" />
|
||||
</component>
|
||||
|
|
|
|||
314
model_data/simulation_system/area_data.py
Normal file
314
model_data/simulation_system/area_data.py
Normal file
|
|
@ -0,0 +1,314 @@
|
|||
"""
|
||||
This script produces the dataset used to model the wall area of properties, which is used to estimate the cost
|
||||
of insulation measures within homes
|
||||
"""
|
||||
import os
|
||||
import boto3
|
||||
import PyPDF2
|
||||
import re
|
||||
import json
|
||||
from epc_api.client import EpcClient
|
||||
from io import BytesIO
|
||||
from datetime import datetime
|
||||
|
||||
bucket = "retrofit-datalake-dev"
|
||||
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", None)
|
||||
|
||||
|
||||
def list_files_in_s3_folder(bucket_name, folder_name):
|
||||
"""
|
||||
List files in a specific S3 bucket and folder.
|
||||
|
||||
Parameters:
|
||||
- bucket_name: Name of the S3 bucket.
|
||||
- folder_name: Name of the folder (prefix) within the bucket.
|
||||
|
||||
Returns:
|
||||
- A list of file names within the specified folder.
|
||||
"""
|
||||
|
||||
# Ensure folder name ends with a '/'
|
||||
if not folder_name.endswith('/'):
|
||||
folder_name += '/'
|
||||
|
||||
s3_client = boto3.client('s3')
|
||||
|
||||
# Initialize empty list to store file names
|
||||
files = []
|
||||
|
||||
# Initialize paginator
|
||||
paginator = s3_client.get_paginator('list_objects_v2')
|
||||
|
||||
# Create a PageIterator from the Paginator
|
||||
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_name)
|
||||
|
||||
for page in page_iterator:
|
||||
# Extract file names from the current page and append to the list
|
||||
files.extend([item['Key'] for item in page.get('Contents', [])])
|
||||
|
||||
return files
|
||||
|
||||
|
||||
def fetch_and_parse_pdf_from_s3(bucket_name, filename):
|
||||
"""
|
||||
Fetch a PDF from an S3 bucket and parse its content.
|
||||
|
||||
Parameters:
|
||||
- bucket_name: Name of the S3 bucket.
|
||||
- pdf_key: Path (key) of the PDF file within the bucket.
|
||||
|
||||
Returns:
|
||||
- text: Extracted text from the PDF.
|
||||
"""
|
||||
|
||||
s3_client = boto3.client('s3')
|
||||
response = s3_client.get_object(Bucket=bucket_name, Key=filename)
|
||||
|
||||
# Create a BytesIO object from the PDF bytes
|
||||
pdf_content = BytesIO(response['Body'].read())
|
||||
|
||||
# Use PyPDF2 to read the PDF content
|
||||
reader = PyPDF2.PdfReader(pdf_content)
|
||||
|
||||
# Extract text from each page
|
||||
pages = []
|
||||
for page_num in range(len(reader.pages)):
|
||||
page = reader.pages[page_num]
|
||||
|
||||
text = page.extract_text()
|
||||
text = remove_excess_newlines(text)
|
||||
pages.append(text.split("\n"))
|
||||
|
||||
return pages
|
||||
|
||||
|
||||
def fetch_json_from_s3(bucket_name, file_name):
|
||||
# Create an S3 client
|
||||
s3 = boto3.client('s3')
|
||||
|
||||
# Fetch the file from S3
|
||||
response = s3.get_object(Bucket=bucket_name, Key=file_name)
|
||||
|
||||
# Parse and return the JSON data
|
||||
return json.loads(response['Body'].read().decode('utf-8'))
|
||||
|
||||
|
||||
def write_json_to_s3(bucket_name, file_name, json_data):
|
||||
"""
|
||||
Write JSON data to a file in an S3 bucket.
|
||||
|
||||
Parameters:
|
||||
- bucket_name: Name of the S3 bucket.
|
||||
- file_name: Path (key) of the file within the bucket.
|
||||
- json_data: JSON data to be saved.
|
||||
"""
|
||||
|
||||
s3_client = boto3.client('s3')
|
||||
|
||||
# Convert the JSON data to a string
|
||||
json_string = json.dumps(json_data)
|
||||
|
||||
# Upload the JSON string to S3
|
||||
s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json_string)
|
||||
|
||||
|
||||
def check_s3_file_exists(bucket_name, file_name):
|
||||
"""
|
||||
Check if a file exists in an S3 bucket.
|
||||
|
||||
Parameters:
|
||||
- bucket_name: Name of the S3 bucket.
|
||||
- file_name: Path (key) of the file within the bucket.
|
||||
|
||||
Returns:
|
||||
- bool: True if the file exists, False otherwise.
|
||||
"""
|
||||
|
||||
s3_client = boto3.client('s3')
|
||||
|
||||
try:
|
||||
# Check if the object exists by attempting to retrieve its metadata
|
||||
s3_client.head_object(Bucket=bucket_name, Key=file_name)
|
||||
return True
|
||||
except s3_client.exceptions.ClientError as e:
|
||||
# If the error code is 404 (Not Found), then the file doesn't exist
|
||||
if e.response['Error']['Code'] == '404':
|
||||
return False
|
||||
# If there's any other exception, raise it
|
||||
raise
|
||||
|
||||
|
||||
def remove_excess_newlines(text):
|
||||
return re.sub('\n+', '\n', text).strip()
|
||||
|
||||
|
||||
def search_pages(pages, search_term) -> (
|
||||
str | None, int | None, int | None
|
||||
):
|
||||
"""
|
||||
This method looks for a search term in the EPR and returns the first instance of it
|
||||
:param pages: list of pages to search through
|
||||
:param search_term: The term to search for
|
||||
:return: The text, page number and page index of the first instance of the search term
|
||||
"""
|
||||
|
||||
to_page = len(pages)
|
||||
from_page = 0
|
||||
from_index = 0
|
||||
|
||||
for page_num in range(from_page, to_page + 1):
|
||||
|
||||
page_to_index = len(pages[page_num])
|
||||
|
||||
for page_index in range(from_index, page_to_index):
|
||||
if search_term in pages[page_num][page_index]:
|
||||
return pages[page_num][page_index], page_num, page_index
|
||||
|
||||
return None, None, None
|
||||
|
||||
|
||||
def check_page(pages, page_num, page_index):
|
||||
if page_num > len(pages):
|
||||
return False
|
||||
|
||||
if page_index > len(pages[page_num]):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def extract_areas():
|
||||
"""
|
||||
This function extracts the net and gross wall areas from the pdf sap calculation files
|
||||
"""
|
||||
files = list_files_in_s3_folder(bucket, "full_sap_calculations")
|
||||
|
||||
# get pdfs
|
||||
sap_calulation_pdfs = [file for file in files if file.endswith(".pdf")]
|
||||
|
||||
# For each pdf, we pull out the net & gross wall areas
|
||||
if check_s3_file_exists(bucket_name=bucket, file_name="wall-area-data/wall-area.json"):
|
||||
data = fetch_json_from_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json")
|
||||
data = json.loads(data)
|
||||
else:
|
||||
data = []
|
||||
|
||||
used_files = [x["filename"] for x in data]
|
||||
|
||||
sap_calulation_pdfs = [filename for filename in sap_calulation_pdfs if filename.split("/")[-1] not in used_files]
|
||||
|
||||
for sap_calculation_file in sap_calulation_pdfs:
|
||||
|
||||
# Download pdf
|
||||
pdf_pages = fetch_and_parse_pdf_from_s3(bucket, sap_calculation_file)
|
||||
|
||||
# We search for net and gross wall areas
|
||||
result = search_pages(pdf_pages, "External walls Main")[0]
|
||||
# This is a row in a table where the columns are:
|
||||
# Element, Gross, Openings, NetArea, U-value, A x U, K-value, A x K
|
||||
# The values we're interested in are Gross and NetArea
|
||||
values = result.split("External walls Main")[1].strip().split(" ")
|
||||
# Remove the empty white space - we should now have the fields we want
|
||||
values = [v for v in values if v]
|
||||
gross_area = float(values[0])
|
||||
net_area = float(values[2])
|
||||
|
||||
# Search for property identifiers
|
||||
_, pagenum, page_idx = search_pages(pdf_pages, 'Prop Type Ref')
|
||||
if pagenum != 0:
|
||||
raise ValueError("Property reference not found on the first page")
|
||||
# the reference will be on the next line
|
||||
property_reference = pdf_pages[pagenum][page_idx + 1]
|
||||
property_reference_number = pdf_pages[pagenum][page_idx + 2]
|
||||
address = pdf_pages[pagenum][page_idx + 4]
|
||||
|
||||
# Search for issued date - the date appears in the field before
|
||||
_, date_pagenum, date_page_idx = search_pages(pdf_pages, 'Issued on Date')
|
||||
issued_date = pdf_pages[date_pagenum][date_page_idx + -1]
|
||||
|
||||
data.append(
|
||||
{
|
||||
"property_reference": property_reference,
|
||||
"reference_number": property_reference_number,
|
||||
"address": address,
|
||||
"gross_area": gross_area,
|
||||
"net_area": net_area,
|
||||
"filename": sap_calculation_file,
|
||||
"issued_date": issued_date,
|
||||
}
|
||||
)
|
||||
|
||||
write_json_to_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json", json_data=json.dumps(data))
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def combine_area_data(area_data=None):
|
||||
"""
|
||||
This function will merge the area data onto additional features which are
|
||||
:param area_data: list of dictionaries, containing the areas and the
|
||||
"""
|
||||
|
||||
epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
|
||||
|
||||
model_data = []
|
||||
for area_config in area_data:
|
||||
address = area_config["address"]
|
||||
|
||||
# The addresses seems to have this structure:
|
||||
# <house number>, <street name>, <area (optional)>, <postcode>"
|
||||
# Where area is not always in the address
|
||||
address_destructured = address.split(",")
|
||||
house_number = address_destructured[0].strip()
|
||||
street_name = address_destructured[1].strip()
|
||||
postcode = address_destructured[-1].strip()
|
||||
|
||||
# Fetch epc data
|
||||
epc_response = epc_client.domestic.search(
|
||||
params={
|
||||
"postcode": postcode,
|
||||
"address": ", ".join([house_number, street_name]),
|
||||
}
|
||||
)
|
||||
|
||||
epc_data = epc_response["rows"]
|
||||
if len(epc_data) == 0:
|
||||
raise ValueError("No EPC data - investigate me")
|
||||
|
||||
if len(epc_data) > 1:
|
||||
issued_date = datetime.strptime(area_config["issued_date"], '%d/%m/%Y')
|
||||
# We get the epc data closest to the issued date. On the edge case that we have two EPC records that are
|
||||
# equally far away from the issued_date, we take the most recent EPC record
|
||||
# We sort on lodgement date
|
||||
epc_data = sorted(
|
||||
epc_data, key=lambda x: datetime.strptime(x['lodgement-date'], '%Y-%m-%d'), reverse=True
|
||||
)
|
||||
|
||||
days_since = [
|
||||
abs((datetime.strptime(x["lodgement-date"], '%Y-%m-%d') - issued_date).days) for x in epc_data
|
||||
]
|
||||
# find the locaton of the closest
|
||||
closest_index = [i for i, days in enumerate(days_since) if days == min(days_since)][0]
|
||||
# Take just that epc record
|
||||
epc_data = [epc_data[closest_index]]
|
||||
|
||||
model_data.append(
|
||||
dict(
|
||||
gross_area=area_config["gross_area"],
|
||||
net_area=area_config["net_area"],
|
||||
**epc_data[0]
|
||||
)
|
||||
)
|
||||
|
||||
# Save data
|
||||
write_json_to_s3(
|
||||
bucket_name=bucket,
|
||||
file_name="wall-area-data/wall-area-model-data.json",
|
||||
json_data=json.dumps(model_data)
|
||||
)
|
||||
|
||||
|
||||
def handler():
|
||||
area_data = extract_areas()
|
||||
|
||||
combine_area_data(area_data)
|
||||
4
model_data/simulation_system/requirements/area_data.txt
Normal file
4
model_data/simulation_system/requirements/area_data.txt
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
boto3==1.28.38
|
||||
PyPDF2==3.0.1
|
||||
pydantic==1.10.11
|
||||
epc-api-python==1.0.2
|
||||
Loading…
Add table
Reference in a new issue