""" This script produces the dataset used to model the wall area of properties, which is used to estimate the cost of insulation measures within homes """ import os import boto3 import PyPDF2 import re import json from epc_api.client import EpcClient from io import BytesIO from datetime import datetime bucket = "retrofit-datalake-dev" EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", None) def list_files_in_s3_folder(bucket_name, folder_name): """ List files in a specific S3 bucket and folder. Parameters: - bucket_name: Name of the S3 bucket. - folder_name: Name of the folder (prefix) within the bucket. Returns: - A list of file names within the specified folder. """ # Ensure folder name ends with a '/' if not folder_name.endswith('/'): folder_name += '/' s3_client = boto3.client('s3') # Initialize empty list to store file names files = [] # Initialize paginator paginator = s3_client.get_paginator('list_objects_v2') # Create a PageIterator from the Paginator page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_name) for page in page_iterator: # Extract file names from the current page and append to the list files.extend([item['Key'] for item in page.get('Contents', [])]) return files def fetch_and_parse_pdf_from_s3(bucket_name, filename): """ Fetch a PDF from an S3 bucket and parse its content. Parameters: - bucket_name: Name of the S3 bucket. - pdf_key: Path (key) of the PDF file within the bucket. Returns: - text: Extracted text from the PDF. """ s3_client = boto3.client('s3') response = s3_client.get_object(Bucket=bucket_name, Key=filename) # Create a BytesIO object from the PDF bytes pdf_content = BytesIO(response['Body'].read()) # Use PyPDF2 to read the PDF content reader = PyPDF2.PdfReader(pdf_content) # Extract text from each page pages = [] for page_num in range(len(reader.pages)): page = reader.pages[page_num] text = page.extract_text() text = remove_excess_newlines(text) pages.append(text.split("\n")) return pages def fetch_json_from_s3(bucket_name, file_name): # Create an S3 client s3 = boto3.client('s3') # Fetch the file from S3 response = s3.get_object(Bucket=bucket_name, Key=file_name) # Parse and return the JSON data return json.loads(response['Body'].read().decode('utf-8')) def write_json_to_s3(bucket_name, file_name, json_data): """ Write JSON data to a file in an S3 bucket. Parameters: - bucket_name: Name of the S3 bucket. - file_name: Path (key) of the file within the bucket. - json_data: JSON data to be saved. """ s3_client = boto3.client('s3') # Convert the JSON data to a string json_string = json.dumps(json_data) # Upload the JSON string to S3 s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json_string) def check_s3_file_exists(bucket_name, file_name): """ Check if a file exists in an S3 bucket. Parameters: - bucket_name: Name of the S3 bucket. - file_name: Path (key) of the file within the bucket. Returns: - bool: True if the file exists, False otherwise. """ s3_client = boto3.client('s3') try: # Check if the object exists by attempting to retrieve its metadata s3_client.head_object(Bucket=bucket_name, Key=file_name) return True except s3_client.exceptions.ClientError as e: # If the error code is 404 (Not Found), then the file doesn't exist if e.response['Error']['Code'] == '404': return False # If there's any other exception, raise it raise def remove_excess_newlines(text): return re.sub('\n+', '\n', text).strip() def search_pages(pages, search_term) -> ( str | None, int | None, int | None ): """ This method looks for a search term in the EPR and returns the first instance of it :param pages: list of pages to search through :param search_term: The term to search for :return: The text, page number and page index of the first instance of the search term """ to_page = len(pages) from_page = 0 from_index = 0 for page_num in range(from_page, to_page + 1): page_to_index = len(pages[page_num]) for page_index in range(from_index, page_to_index): if search_term in pages[page_num][page_index]: return pages[page_num][page_index], page_num, page_index return None, None, None def check_page(pages, page_num, page_index): if page_num > len(pages): return False if page_index > len(pages[page_num]): return False return True def extract_areas(): """ This function extracts the net and gross wall areas from the pdf sap calculation files """ files = list_files_in_s3_folder(bucket, "full_sap_calculations") # get pdfs sap_calulation_pdfs = [file for file in files if file.endswith(".pdf")] # For each pdf, we pull out the net & gross wall areas if check_s3_file_exists(bucket_name=bucket, file_name="wall-area-data/wall-area.json"): data = fetch_json_from_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json") data = json.loads(data) else: data = [] used_files = [x["filename"] for x in data] sap_calulation_pdfs = [filename for filename in sap_calulation_pdfs if filename.split("/")[-1] not in used_files] for sap_calculation_file in sap_calulation_pdfs: # Download pdf pdf_pages = fetch_and_parse_pdf_from_s3(bucket, sap_calculation_file) # We search for net and gross wall areas result = search_pages(pdf_pages, "External walls Main")[0] # This is a row in a table where the columns are: # Element, Gross, Openings, NetArea, U-value, A x U, K-value, A x K # The values we're interested in are Gross and NetArea values = result.split("External walls Main")[1].strip().split(" ") # Remove the empty white space - we should now have the fields we want values = [v for v in values if v] gross_area = float(values[0]) net_area = float(values[2]) # Search for property identifiers _, pagenum, page_idx = search_pages(pdf_pages, 'Prop Type Ref') if pagenum != 0: raise ValueError("Property reference not found on the first page") # the reference will be on the next line property_reference = pdf_pages[pagenum][page_idx + 1] property_reference_number = pdf_pages[pagenum][page_idx + 2] address = pdf_pages[pagenum][page_idx + 4] # Search for issued date - the date appears in the field before _, date_pagenum, date_page_idx = search_pages(pdf_pages, 'Issued on Date') issued_date = pdf_pages[date_pagenum][date_page_idx + -1] data.append( { "property_reference": property_reference, "reference_number": property_reference_number, "address": address, "gross_area": gross_area, "net_area": net_area, "filename": sap_calculation_file, "issued_date": issued_date, } ) write_json_to_s3(bucket_name=bucket, file_name="wall-area-data/wall-area.json", json_data=json.dumps(data)) return data def combine_area_data(area_data=None): """ This function will merge the area data onto additional features which are :param area_data: list of dictionaries, containing the areas and the """ epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN) model_data = [] for area_config in area_data: address = area_config["address"] # The addresses seems to have this structure: # , , , " # Where area is not always in the address address_destructured = address.split(",") house_number = address_destructured[0].strip() street_name = address_destructured[1].strip() postcode = address_destructured[-1].strip() # Fetch epc data epc_response = epc_client.domestic.search( params={ "postcode": postcode, "address": ", ".join([house_number, street_name]), } ) epc_data = epc_response["rows"] if len(epc_data) == 0: raise ValueError("No EPC data - investigate me") if len(epc_data) > 1: issued_date = datetime.strptime(area_config["issued_date"], '%d/%m/%Y') # We get the epc data closest to the issued date. On the edge case that we have two EPC records that are # equally far away from the issued_date, we take the most recent EPC record # We sort on lodgement date epc_data = sorted( epc_data, key=lambda x: datetime.strptime(x['lodgement-date'], '%Y-%m-%d'), reverse=True ) days_since = [ abs((datetime.strptime(x["lodgement-date"], '%Y-%m-%d') - issued_date).days) for x in epc_data ] # find the locaton of the closest closest_index = [i for i, days in enumerate(days_since) if days == min(days_since)][0] # Take just that epc record epc_data = [epc_data[closest_index]] model_data.append( dict( gross_area=area_config["gross_area"], net_area=area_config["net_area"], **epc_data[0] ) ) # Save data write_json_to_s3( bucket_name=bucket, file_name="wall-area-data/wall-area-model-data.json", json_data=json.dumps(model_data) ) def handler(): area_data = extract_areas() combine_area_data(area_data)