diff --git a/backend/Property.py b/backend/Property.py index 31f207ab..cc5bf12b 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -426,6 +426,18 @@ class Property: if phase_epc_transformation[k] == v: continue + if k == "hotwater-description": + if ( + v == "From main system" + ) and ( + phase_epc_transformation["mainheat-description"] == "Electric storage heaters" + ) and ( + "Electric immersion" in phase_epc_transformation["hotwater-description"] + ): + # It means we've recommended HHR with electric immersion, and shouldn't overwrite + # the hot water description + continue + raise NotImplementedError( "Already have this key in the phase_epc_transformation - implement me" ) diff --git a/backend/app/assumptions.py b/backend/app/assumptions.py index 79f2a087..44838a47 100644 --- a/backend/app/assumptions.py +++ b/backend/app/assumptions.py @@ -50,4 +50,5 @@ DESCRIPTIONS_TO_FUEL_TYPES = { }, "Gas instantaneous at point of use": {"fuel": "Natural Gas", "cop": 0.85}, "Room heaters, wood logs": {"fuel": "Wood Logs", "cop": 1}, + "Boiler and radiators, coal": {"fuel": "Coal", "cop": 0.85}, } diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 4a5b3bd4..dbef6435 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -366,7 +366,7 @@ def extract_property_request_data( property_non_invasive_recommendations["recommendations"] = str(transformed) property_valution = next(( - float(x["value"]) for x in valuation_data if + float(x["valuation"]) for x in valuation_data if (str(x["uprn"]) == str(uprn)) ), None) @@ -611,6 +611,7 @@ async def trigger_plan(body: PlanTriggerRequest): property_instance=property_instance, all_predictions=all_predictions, recommendations=recommendations, + representative_recommendations=representative_recommendations ) ) diff --git a/etl/access_reporting/app.py b/etl/access_reporting/app.py new file mode 100644 index 00000000..830f4370 --- /dev/null +++ b/etl/access_reporting/app.py @@ -0,0 +1,394 @@ +import os +from msal import ConfidentialClientApplication +from datetime import datetime, timedelta +import requests +from functools import wraps +import time +import logging +from io import BytesIO +import pandas as pd + +# Configure logging +logger = logging.getLogger(__name__) +if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +def handle_error(response): + """ + Handle errors based on HTTP status codes and log detailed information. + """ + try: + error_json = response.json().get('error', {}) + except ValueError: + error_json = {} + + error_code = error_json.get('code', 'unknownError') + error_message = error_json.get('message', 'No detailed error message provided.') + inner_error = error_json.get('innererror', {}) + details = error_json.get('details', []) + + logger.error(f"Error Code: {error_code}") + logger.error(f"Error Message: {error_message}") + if inner_error: + logger.error(f"Inner Error: {inner_error}") + if details: + logger.error(f"Error Details: {details}") + + if response.status_code == 401: + logger.error("Unauthorized. Token might be invalid.") + elif response.status_code == 403: + logger.error("Forbidden. Access denied to the requested resource.") + elif response.status_code == 404: + logger.error("Not Found. The requested resource doesn’t exist.") + elif response.status_code == 429: + retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided + logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...") + time.sleep(retry_after) + return 'retry' + elif response.status_code in (500, 503): + retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided + logger.error(f"Server error. Retrying after {retry_after} seconds...") + time.sleep(retry_after) + return 'retry' + else: + raise ValueError(f"API request failed with status code {response.status_code} - {error_message}") + + raise ValueError(f"API request failed with status code {response.status_code} - {error_message}") + + +def api_call_decorator(func): + """ + Handles various aspects of the API call, including refreshing the access token if needed and handling pagination. + :param func: The function to be decorated. + :return: The wrapped function. + """ + + @wraps(func) + def wrapper(self, *args, **kwargs): + try: + # Check and refresh the access token if needed + if self.is_access_token_expired(): + self.retrieve_access_token() + logger.info("Access token refreshed.") + + # Get the HTTP method, URL, and optionally data from the function + http_method, url, data = func(self, *args, **kwargs) + + # Initialize the results list and handle pagination if page_size is provided + results = [] + page_size = kwargs.get('page_size', None) + response_data = {} + + while url: + response = requests.request(http_method, url, headers=self.headers, json=data) + + # Handle the response + if response.status_code == 200: + response_json = response.json() # Store the response JSON + if page_size: + results.extend(response_json.get('value', [])) + url = response_json.get('@odata.nextLink', None) + else: + response_data = response_json # Capture the full response for consistency + break + else: + retry = handle_error(response) + if retry == 'retry': + continue + + if page_size: + response_data = {'value': results} + + return response_data + + except Exception as e: + logger.exception("An error occurred during the API call.") + raise e + + return wrapper + + +class SharePointClient: + access_token = None + access_token_request_timestamp = None + access_token_expiry = None + headers = None + + TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + + def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None, + access_token_expiration_details=None): + """ + Initializes the SharePointClient with necessary credentials and site information. + :param tenant_id: The tenant ID. + :param client_id: The client ID. + :param client_secret: The client secret. + :param site_id: The site ID. + :param access_token: The access token (optional) + :param access_token_expiration_details: The access token expiration details (optional) + """ + self.tenant_id = tenant_id + self.client_id = client_id + self.client_secret = client_secret + + if access_token: + if not access_token_expiration_details: + raise ValueError("Access token expiration details must be provided.") + self.access_token = access_token + self.set_access_token_expiration_details(access_token_expiration_details) + self.headers = { + 'Authorization': f"Bearer {self.access_token['access_token']}" + } + else: + self.retrieve_access_token() + + # Retrieve static identifiers + self.site_id = site_id + self.document_drive = self.get_documents_drive() + + def get_token_expiration_details(self): + """ + Returns the access token expiration details. Converts the datetime objects to strings for serialization. + :return: + """ + return { + 'access_token_request_timestamp': datetime.strftime( + self.access_token_request_timestamp, self.TIMESTAMP_FORMAT + ), + 'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT) + } + + def set_access_token_expiration_details(self, access_token_expiration_details): + """ + Sets the access token expiration details from a serialized dictionary. + :param access_token_expiration_details: The serialized access token expiration details. + :return: + """ + self.access_token_request_timestamp = datetime.strptime( + access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT + ) + self.access_token_expiry = datetime.strptime( + access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT + ) + + def is_access_token_expired(self): + """ + Checks if the access token has expired. If it has, a new access token is retrieved. + :return: True if expired, False otherwise. + """ + return datetime.now() >= self.access_token_expiry + + def retrieve_access_token(self, refresh=False): + """ + Implements authentication using MSAL. + :param refresh: If True, force a refresh of the access token. + :return: None + """ + app = ConfidentialClientApplication( + self.client_id, + authority=f"https://login.microsoftonline.com/{self.tenant_id}", + client_credential=self.client_secret + ) + + scope = ["https://graph.microsoft.com/.default"] + + access_token_request_timestamp = datetime.now() + + if refresh: + logger.info("Forcing refresh of access token.") + token = app.acquire_token_for_client(scopes=scope) + else: + # Check if a token is already cached + token = app.acquire_token_silent(scope, account=None) + + if not token: + token = app.acquire_token_for_client(scopes=scope) + + if "access_token" not in token: + logger.error("Authentication failed.") + raise ValueError("Authentication failed") + + access_token_expiry = access_token_request_timestamp + timedelta( + seconds=token['expires_in'] - 20 + ) + + self.access_token = token + self.access_token_request_timestamp = access_token_request_timestamp + self.access_token_expiry = access_token_expiry + self.headers = { + 'Authorization': f"Bearer {self.access_token['access_token']}" + } + + logger.info("Access token retrieved successfully.") + + @api_call_decorator + def get_documents_drive(self): + """ + Get the document drive of the SharePoint site. + :return: Tuple containing HTTP method, URL, and None for data. + """ + url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive" + logger.info(f"Getting document drive from URL: {url}") + return 'GET', url, None + + @api_call_decorator + def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100): + """ + This function will list the contents of a folder in SharePoint. + :param drive_id: The ID of the drive. + :param folder_path: The path of the folder. + :param page_size: The number of items per page (default is 100). + :return: Tuple containing HTTP method, URL, and None for data. + """ + url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}" + logger.info(f"Listing folder contents from URL: {url}") + return 'GET', url, None + + @staticmethod + def download_sharepoint_file(download_url): + """ + Downloads a file from the given URL and returns its content. + + :param download_url: The URL to download the file from. + :return: The content of the downloaded file. + """ + response = requests.get(download_url, stream=True) + response.raise_for_status() # Check if the request was successful + + file_content = BytesIO() + + # Read the file content into memory + for chunk in response.iter_content(chunk_size=8192): + file_content.write(chunk) + + file_content.seek(0) # Reset the file pointer to the beginning + + return file_content + + +def app(): + # Customers for WC 18/11/2024 + # + # ----- Eastlight location ----- + # No data this week, low on data + # Housing Associations/Eastlight/Survey Outcomes/ + # + # ----- Settle location ----- + # No data this week, in separate files + # Housing Associations/Settle/Survey Outcomes/ + # + # ----- Community Housing ----- + # In separate files - will we get to a singular form? + # Housing Associations/Community Housing/Survey Outcomes/ + # + # ----- ACIS location ----- + # Doesn't have this week's data + # Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx + # + # ----- Southern location ----- + # + # + # ------ Unitas location ------ + # Does have this week's data + # Unitas location: Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx + + locations = { + "Unitas": "Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx", + "Eastlight": "Housing Associations/Eastlight/Survey Outcomes/", + "Settle": "Housing Associations/Settle/Survey Outcomes/", + "Community Housing": "Housing Associations/Community Housing/Survey Outcomes/", + "ACIS": "Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx", + "Southern": None, + } + + SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None) + SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None) + SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None) + WARMFRONT_SHAREPOINT_SITE_ID = os.getenv("WARMFRONT_SHAREPOINT_SITE_ID", None) + + sharepoint_client = SharePointClient( + tenant_id=SHAREPOINT_TENANT_ID, + client_id=SHAREPOINT_CLIENT_ID, + client_secret=SHAREPOINT_CLIENT_SECRET, + site_id=WARMFRONT_SHAREPOINT_SITE_ID + ) + + results = [] + for customer, location in locations.items(): + if location is None: + continue + + if location.endswith(".xlsx"): + # Read in the file + # List the contents of the folder + location_folder = os.path.dirname(location) + contents = sharepoint_client.list_folder_contents( + drive_id=sharepoint_client.document_drive["id"], + folder_path=location_folder + ) + filepaths = contents["value"] + + download_url = next( + (file['@microsoft.graph.downloadUrl'] for file in filepaths + if '@microsoft.graph.downloadUrl' in file and file['name'] == os.path.basename(location)), + None + ) + + if download_url is None: + raise ValueError("File not found in the SharePoint folder.") + + file_content = sharepoint_client.download_sharepoint_file(download_url) + + # Convert to pandas dataframe since file is an excel file + df = pd.read_excel(file_content) + df["Outcome"] = df["Outcome"].str.strip().str.lower() + + # We cannot group by funding type accurately because any job that is not funded will have a NaN value + # and therefore we have a 100% acces rate for funded jobs and 0% otherwise + surveyor_outcomes = [] + for (week, surveyor, funding), group in df.groupby(["Week Commencing", "DEA/REA"]): + funding_type = [x for x in group["Funding Type"].unique() if not pd.isnull(x)] + if funding_type: + funding_type = " + ".join(funding_type) + else: + funding_type = "No Funding" + surveyed = group[group["Outcome"] == "surveyed"] + no_answer = group[ + group["Outcome"] == "no answer" + ] + other_issue = group[~group["Outcome"].isin(["surveyed", "no answer"])] + + surveyor_outcomes.append( + { + "Surveyor": surveyor, + "Week": week, + "Funding": funding_type, + "Surveyed": surveyed.shape[0], + "No Answer": no_answer.shape[0], + "Other Issue": other_issue.shape[0], + } + ) + + surveyor_outcomes = pd.DataFrame(surveyor_outcomes) + surveyor_outcomes["Week"] = pd.to_datetime(surveyor_outcomes["Week"]) + + weekly_access = ( + surveyor_outcomes.drop(columns=["Surveyor"]).groupby(["Week", "Funding"]).sum().reset_index() + ) + # Sort by week and surveyor ascending + surveyor_outcomes = surveyor_outcomes.sort_values(["Week", "Surveyor"], ascending=[True, True]) + surveyor_outcomes["Access Rate"] = 100 * surveyor_outcomes["Surveyed"] / ( + surveyor_outcomes["Surveyed"] + surveyor_outcomes["No Answer"] + surveyor_outcomes["Other Issue"] + ) + + weekly_access["Total"] = ( + weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"] + ) + weekly_access["Access Rate"] = 100 * weekly_access["Surveyed"] / ( + weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"] + ) diff --git a/etl/access_reporting/requirements.txt b/etl/access_reporting/requirements.txt new file mode 100644 index 00000000..8e6dbb08 --- /dev/null +++ b/etl/access_reporting/requirements.txt @@ -0,0 +1,11 @@ +python-docx==0.8.11 +PyPDF2==3.0.1 +boto3 +requests +pandas +pyarrow==12.0.1 +openpyxl==3.1.2 +usaddress==0.5.10 +pdfplumber==0.10.3 +msgpack==1.0.5 +msal \ No newline at end of file diff --git a/etl/customers/cottons/parse_pdf_asset_list.py b/etl/customers/cottons/parse_pdf_asset_list.py new file mode 100644 index 00000000..7d442e97 --- /dev/null +++ b/etl/customers/cottons/parse_pdf_asset_list.py @@ -0,0 +1,64 @@ +import re +import pandas as pd +from PyPDF2 import PdfReader + +# Paths to the uploaded files +file_paths = [ + "/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged).pdf", + "/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 2.pdf", + "/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 3.pdf", + "/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 4.pdf", + "/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 5.pdf", + "/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 6.pdf" +] + + +# Function to extract text from PDFs +def extract_text_from_pdf_with_pypdf2(file_path): + text = "" + reader = PdfReader(file_path) + for page in reader.pages: + text += page.extract_text() + return text + + +# Initialize a list to hold all parsed data +all_parsed_data = [] + +# Process each PDF individually +for i, path in enumerate(file_paths): + # Extract text from the PDF + extracted_text = extract_text_from_pdf_with_pypdf2(path) + + # Step 1: Remove titles and repeated headers + cleaned_text = re.sub(r"Managed Property Report as at \d+ \w+ \d+", "", extracted_text) + cleaned_text = re.sub(r"Code Property Address Management Type", "", cleaned_text) + + # Step 2: Extract rows ending with "Managed" + rows = re.findall(r".*?Managed", cleaned_text) + + # Step 3: Parse rows into structured data + parsed_data = [] + for row in rows: + match = re.match(r"(\S+)\s+(.+?)\s+Managed", row.strip()) + if match: + code = match.group(1).strip() + address = match.group(2).strip() + parsed_data.append((code, address, "Managed")) + + # Append parsed data to the global list + all_parsed_data.extend(parsed_data) + + # Provide feedback for debugging + print(f"File {i + 1} processed: {len(parsed_data)} rows") + +# Step 4: Create a unified DataFrame +final_df = pd.DataFrame(all_parsed_data, columns=["Code", "Property Address", "Management Type"]) + +# Step 5: Save the unified DataFrame to an Excel file +final_output_file_path = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Unified_Managed_Properties_List.xlsx" +final_df.to_excel(final_output_file_path, index=False) + +# Provide feedback +print(f"All files processed and combined. Total rows: {len(final_df)}") +print(f"Unified file saved to: {final_output_file_path}") diff --git a/etl/customers/cottons/prep_asset_list.py b/etl/customers/cottons/prep_asset_list.py new file mode 100644 index 00000000..db7c6583 --- /dev/null +++ b/etl/customers/cottons/prep_asset_list.py @@ -0,0 +1,15 @@ +import pandas as pd + +df = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List.xlsx" +) + +# split up the address on commas. First section is address1, last seciton is postcode +df["address1"] = df["Property Address"].apply(lambda x: x.split(",")[0].strip()) +df["postcode"] = df["Property Address"].apply(lambda x: x.split(",")[-1].strip()) + +# Re-save +df.to_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List.xlsx", + index=False, +) diff --git a/etl/customers/cottons/remote_assessments.py b/etl/customers/cottons/remote_assessments.py new file mode 100644 index 00000000..7855a1a9 --- /dev/null +++ b/etl/customers/cottons/remote_assessments.py @@ -0,0 +1,124 @@ +import os +import time + +from tqdm import tqdm +import pandas as pd +from dotenv import load_dotenv +from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc +from backend.SearchEpc import SearchEpc +from utils.s3 import save_csv_to_s3 + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") +USER_ID = 8 +PORTFOLIO_ID = 121 + + +def app(): + """ + Prepares the inputs to produce the remote assessments for Cottons + :return: + """ + + # Read in the asset list + cottons_asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List EPC Data Pull with " + "valuations.xlsx" + ) + # A number are missing EPCs due to the space in the postcode + # Breakdowns: + # C 119 + # D 106 + # E 26 + # B 5 + # + # Take the EPC D/E properties + asset_list = cottons_asset_list[ + cottons_asset_list["EPC rating on register"].isin(["D", "E"]) + ] + asset_list = asset_list.reset_index(drop=True) + asset_list["row_id"] = asset_list.index + asset_list["uprn"] = asset_list["uprn"].astype(int) + + extracted_data = [] + model_asset_list = [] + for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): + add1 = home["address1"] + pc = home["postcode"] + # Retrieve the EPC data + epc_searcher = SearchEpc( + address1=add1, + postcode=pc, uprn=home["uprn"], auth_token=EPC_AUTH_TOKEN, os_api_key="" + ) + epc_searcher.find_property(skip_os=True) + + find_epc_searcher = RetrieveFindMyEpc(address=epc_searcher.newest_epc["address1"], + postcode=epc_searcher.newest_epc["postcode"]) + find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() + time.sleep(0.5) + # We need uprn + + extracted_data.append( + { + "uprn": home["uprn"], + **find_epc_data, + } + ) + + model_asset_list.append( + { + "uprn": home["uprn"], + "address": epc_searcher.newest_epc["address1"], + "postcode": epc_searcher.newest_epc["postcode"], + } + ) + + non_invasive_recommendations = [ + { + "uprn": r["uprn"], + "recommendations": r["recommendations"] + } for r in extracted_data + ] + + valuations_data = asset_list[["uprn", "Zoopla Valuation"]].copy().rename(columns={"Zoopla Valuation": "valuation"}) + valuations_data = valuations_data[~pd.isnull(valuations_data["valuation"])] + + filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv" + save_csv_to_s3( + dataframe=pd.DataFrame(model_asset_list), + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + # Store the non-invasive recommendations in s3 + non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv" + save_csv_to_s3( + dataframe=pd.DataFrame(non_invasive_recommendations), + bucket_name="retrofit-plan-inputs-dev", + file_name=non_invasive_recommendations_filename + ) + + # Store the valuations data in s3 + valuations_filename = f"{USER_ID}/{PORTFOLIO_ID}/valuations.csv" + save_csv_to_s3( + dataframe=valuations_data, + bucket_name="retrofit-plan-inputs-dev", + file_name=valuations_filename + ) + + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Social", + "goal": "Increasing EPC", + "goal_value": "C", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": non_invasive_recommendations_filename, + "valuation_file_path": valuations_filename, + "scenario_name": "Wave 3 Packages", + "multi_plan": True, + "budget": None, + "exclusions": ['air_source_heat_pump', 'boiler_upgrade', 'floor_insulation'] + } + print(body) diff --git a/etl/customers/gla/hug_postcodes.py b/etl/customers/gla/hug_postcodes.py new file mode 100644 index 00000000..fc89b6f2 --- /dev/null +++ b/etl/customers/gla/hug_postcodes.py @@ -0,0 +1,77 @@ +import inspect +import pandas as pd +from pathlib import Path +from tqdm import tqdm +from etl.epc.settings import EARLIEST_EPC_DATE +from etl.spatial.OpenUprnClient import OpenUprnClient + +src_file_path = inspect.getfile(lambda: None) + +EPC_DIRECTORY = Path("/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates") +epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] + +aggregation = [] +for directory in tqdm(epc_directories): + data = pd.read_csv(directory / "certificates.csv", low_memory=False) + # Rename the columns to the same format as the api returns + data.columns = [c.replace("_", "-").lower() for c in data.columns] + + data = data[data["posttown"].str.contains("London", case=False, na=False)] + if data.empty: + continue + # Take just date before the date threshold + data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE] + + data = data[~pd.isnull(data["uprn"])] + data["uprn"] = data["uprn"].astype(int) + # Take just the newest EPC per uprn, based on lodgement-date + data = data.sort_values("lodgement-date", ascending=False).drop_duplicates("uprn") + # Take EPC D and below + data = data[data["current-energy-rating"].isin(["D", "E", "F", "G"])] + data["postal_region"] = data["postcode"].str.split(" ").str[0] + + # Take homes that don't have a gas boiler + off_gas = data[~data["main-fuel"].str.contains("mains gas", case=False, na=False)] + + if off_gas.empty: + continue + + # Remote properties with conservation area issues + uprns = off_gas["uprn"].unique() + # Get data + ca_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev") + off_gas = off_gas.merge( + ca_data[["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"]].rename( + columns={"UPRN": "uprn"} + ), + how="left", + on="uprn", + ) + # Remove any restricted units + off_gas = off_gas[ + (off_gas["conservation_status"] != True) + & (off_gas["is_listed_building"] != True) + & (off_gas["is_heritage_building"] != True) + ] + + off_gas = off_gas[ + off_gas["tenure"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"]) + ] + + region_summary = off_gas.groupby("postal_region").size().reset_index(name="count") + + aggregation.append(region_summary) + +postal_region_aggregation = pd.concat(aggregation) +# Re-aggregate +postal_region_aggregation = postal_region_aggregation.groupby("postal_region")["count"].sum().reset_index() + +postal_region_aggregation = postal_region_aggregation.sort_values("count", ascending=False) +postal_region_aggregation = postal_region_aggregation.rename( + columns={"postal_region": "Postcode Region", "count": "Number of Homes"} +) +postal_region_aggregation.to_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/GLA/Off Gas EPC D-G Postal Regions - without conservation " + "area.xlsx", + index=False +) diff --git a/etl/customers/ksquared/Wave3 Modelling.py b/etl/customers/ksquared/Wave3 Modelling.py index 7bfa33b3..0bf6eb18 100644 --- a/etl/customers/ksquared/Wave3 Modelling.py +++ b/etl/customers/ksquared/Wave3 Modelling.py @@ -305,7 +305,7 @@ def caha(): # Get conservation area data uprns = [x["uprn"] for x in extracted_data if x["uprn"] not in ["", None]] - conservation_area_data = OpenUprnClient.get_spatial_data([100022526362], "retrofit-data-dev") + conservation_area_data = OpenUprnClient.get_spatial_data([36284], "retrofit-data-dev") addresses = pd.DataFrame(asset_list) addresses["uprn"] = addresses["uprn"].astype(str) diff --git a/etl/customers/stonewater/Wave 3 Preparation.py b/etl/customers/stonewater/Wave 3 Preparation.py index b6c29863..d2232f40 100644 --- a/etl/customers/stonewater/Wave 3 Preparation.py +++ b/etl/customers/stonewater/Wave 3 Preparation.py @@ -6,6 +6,8 @@ import numpy as np from tqdm import tqdm from collections import Counter from scipy.optimize import linprog + +from SearchEpc import SearchEpc from utils.s3 import read_pickle_from_s3 CUSTOMER_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater" @@ -2591,5 +2593,316 @@ def propsed_wave_3_sample(): os.path.join(CUSTOMER_FOLDER_PATH, "Individual units - programme V2.csv"), index=False ) + survey_results = pd.read_excel( + os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.19 V2.xlsx"), + header=13, + sheet_name="Modelled Packages" + ) + + indivual_units = pd.read_csv( + os.path.join(CUSTOMER_FOLDER_PATH, "Individual units - programme V2.csv") + ) + + u_aids = survey_results["Archetype ID"].astype(str).unique() + units_in_bid = indivual_units[indivual_units['Unit in Programme']]["Archetype ID"].astype(str).values + + len({v for v in units_in_bid if str(v) in u_aids}) + len(list(set(units_in_bid))) + + +def identify_incorrect_packages(): + """ + Due to limitations in the data collected during survey, we have some properties that do not have suitable packages + assigned. This function will identify those properties, which can be flagged for Stonewater's review + """ + + units_with_assigned_packages = pd.read_excel( + os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.20 V2.xlsx"), + header=2, + sheet_name="Individual Units Programme" + ) + + # This sheet contains information on the heating systems for properties, so we can flag any units that have + # been labelled as being electric but are actually gas + heating_survey_data = pd.read_excel( + os.path.join(CUSTOMER_FOLDER_PATH, "STOCKBOOK December 2024 data (5).xlsx"), + header=0, + sheet_name="Export" + ) + + units_with_assigned_packages = units_with_assigned_packages.merge( + heating_survey_data[["Asset Reference", "Heating Type"]], how="left", + left_on="Org. ref.", right_on="Asset Reference" + ) + + # Check the different heating types + units_with_assigned_packages["Gas properties: different to Parity"] = ( + ( + units_with_assigned_packages["Heating Type"].isin(["Gas", "Communal Gas"]) + ) & ( + units_with_assigned_packages["Heating"].isin( + [ + "Heat Pump: Electric Heat " + "pumps: Air source heat pump " + "with flow temperature <= 35°C", + "Electric Storage Systems: Fan " + "storage heaters", + "Electric (direct acting) room " + "heaters: Panel, convector or " + "radiant heaters" + ] + ) + ) + ) + + units_with_assigned_packages["Electric properties: different to Parity"] = ( + (units_with_assigned_packages["Heating Type"] == "Electric") & ( + units_with_assigned_packages["Heating"].isin( + [ + "Boiler: A rated Regular Boiler", + "Boiler: F rated Combi", + "No Heating", + "Boiler: A rated CPSU", + "Boiler: G rated Regular Boiler" + ] + ) + ) + ) + + units_with_assigned_packages["Ground Source properties: different to Parity"] = ( + (units_with_assigned_packages["Heating Type"] == "Ground Source") & ( + units_with_assigned_packages["Heating"].isin( + [ + "Heat Pump: Electric Heat pumps: Air source heat pump with flow temperature <= 35°C", + "Electric Storage Systems: Fan storage heaters", + "Electric Storage Systems: High heat retention storage heaters" + ] + ) + ) + ) + + units_with_assigned_packages["LPG properties: different to Parity"] = ( + (units_with_assigned_packages["Heating Type"] == "Lpg") & ( + units_with_assigned_packages["Main Fuel"].isin( + [ + "Gas: Mains Gas", "Solid Fuel: Wood Logs, Gas: Mains Gas" + ] + ) + ) + ) + + units_with_assigned_packages["Solid Fuel properties: different to Parity"] = ( + (units_with_assigned_packages["Heating Type"] == "Solid Fuel") & ( + units_with_assigned_packages["Main Fuel"].isin( + [ + "Gas: Mains Gas" + ] + ) + ) + ) + + # The next check is to identify properties with specific features that are not condusive to specific packages. E.g. + # Solar PV packages for properties that have another dwelling above + # Label properties that have been matched to a package, during coordination, that includes Solar PV and has + # a property with a dwelling above + units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = ( + (units_with_assigned_packages["Package Ref"].isin(["3A", "3B", "4", 4])) & ( + units_with_assigned_packages["Survey: Main Roof Type"].str.contains("A Another dwelling above") + ) + ) + + # Label properties that have a dwelling above in the Parity data, and weren't surveyed, but have been assigned + # a package that includes solar PV + units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = ( + (units_with_assigned_packages["Package Ref"].isin(["3A", "3B", "4", 4])) & ( + units_with_assigned_packages["Survey: Main Roof Type"].str.contains("A Another dwelling above") + ) + ) + + # We now iterate through postcodes and find anomalous properties based on the partiy data and survey data + fields_to_check = [ + 'Wall Type Category', + # 'Roof Type Category', - not very interesting + 'Heating', + 'Main Fuel', + 'Survey: Main Wall Type', + # 'Survey: Main Roof Type', + 'Survey: Primary Heating System' + ] + + units_with_assigned_packages['Wall Type Category'] = units_with_assigned_packages['Wall Type'].str.replace( + r'\s*\(.*?\)', '', regex=True + ) + + # Create roof type category by splitting in colon and taking the first part + units_with_assigned_packages['Roof Type Category'] = units_with_assigned_packages['Roof Type'].str.split(':').str[0] + + units_with_assigned_packages["Street, Region and Postcode"] = ( + units_with_assigned_packages["Street and Region"] + ", " + units_with_assigned_packages["Postcode"] + ) + + def check_mixed_types(row): + # Count distinct primary types with non-zero values + primary_types_present = set() + for col in field_counts.columns: + if ':' in col: + primary_type = col.split(':')[0] + if row[col] > 0: # Non-zero count means this type is present + primary_types_present.add(primary_type) + return len(primary_types_present) > 1 # True if more than one primary type + + aggregated_results = {} + for field in fields_to_check: + # Group by postcode and count occurrences of each unique value + field_counts = ( + units_with_assigned_packages.groupby(['Street, Region and Postcode', field]) + .size() + .unstack(fill_value=0) + .reset_index() + ) + + # Calculate dominant value and percentage before modifying the DataFrame + dominant_value = field_counts.iloc[:, 1:].idxmax(axis=1) + dominant_percentage = ( + (field_counts.iloc[:, 1:].max(axis=1) / field_counts.iloc[:, 1:].sum(axis=1)) * 100 + ) + number_of_properties = field_counts.iloc[:, 1:].sum(axis=1) + + # Add these as new columns after computation + field_counts['Dominant Value'] = dominant_value + field_counts['% Dominant'] = dominant_percentage + field_counts['Number of Properties'] = number_of_properties + field_counts['Mixed Type'] = field_counts.apply(check_mixed_types, axis=1) + + # Store the result in the dictionary + aggregated_results[field] = field_counts + + # Let's fetch the EPC data + # Read in the existing EPC data we stored + import json + from utils.s3 import read_from_s3, read_pickle_from_s3 + def read_epc_data(): + epc_data = json.loads( + read_from_s3( + bucket_name="retrofit-data-dev", + s3_file_name="customers/Stonewater/clustering/epc_data.json" + ) + ) + epc_data = pd.DataFrame(epc_data) + + epc_data["uprn"] = np.where( + epc_data["internal_id"] == 1091, + 83143766, + epc_data["uprn"] + ) + epc_data_batch_2 = read_pickle_from_s3( + s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl", + bucket_name="retrofit-data-dev" + ) + epc_data_batch_2 = pd.DataFrame(epc_data_batch_2) + + complete_epcs = pd.concat([epc_data, epc_data_batch_2]) + + return complete_epcs + + epc_data = read_epc_data() + # Get just the fields we want from the EPC: Uprn, Wall, Roof, Heating, Fuel, SAP Score, EPC Band, Date of EPC + epc_data_to_append = epc_data[ + [ + "uprn", "walls-description", "roof-description", "mainheat-description", "main-fuel", + "current-energy-efficiency", "current-energy-rating", "lodgement-date", + "estimated" + ] + ].rename( + columns={ + "uprn": "UPRN", + "walls-description": "EPC: Wall Type", + "roof-description": "EPC: Roof Type", + "mainheat-description": "EPC: Heating", + "mainfuel": "EPC: Main Fuel", + "current-energy-efficiency": "EPC: SAP Score", + "current-energy-rating": "EPC: EPC Band", + "lodgement-date": "EPC: Date of EPC", + "estimated": "EPC Estimated based on Nearby Properties" + } + ) + # Find entries where the SAP score is not an integer + non_integer_sap = epc_data_to_append[~epc_data_to_append["EPC: SAP Score"].astype(str).str.isnumeric()] + non_integer_sap["UPRN"].values[0] + + epc_data_to_append["EPC: Date of EPC"] = pd.to_datetime(epc_data_to_append["EPC: Date of EPC"]) + # Years since the EPC was lodged + epc_data_to_append["Years since EPC"] = (pd.Timestamp.now() - epc_data_to_append["EPC: Date of EPC"]).dt.days / 365 + epc_data_to_append = epc_data_to_append[epc_data_to_append["UPRN"] != ""] + epc_data_to_append["UPRN"] = epc_data_to_append["UPRN"].astype(int) + + units_with_assigned_packages = units_with_assigned_packages.merge( + epc_data_to_append, how="left", on="UPRN", + ) + + # Read in the wave 2.1 data + wave_2_data = pd.read_excel( + os.path.join( + CUSTOMER_FOLDER_PATH, "Stonewater 2.1 SAP Pre & Post.xlsx" + ), + header=3 + ) + # Remove any where the work is outstanding + wave_2_data = wave_2_data[wave_2_data["Retrofit Assessment"] == "Completed"] + wave_2_data = wave_2_data[~pd.isnull(wave_2_data["Package Approved (Client)"])] + wave_2_data["house_number"] = wave_2_data["Name"].apply(lambda x: SearchEpc.get_house_number(x, "")) + + # Filter postcodes in the units_with_assigned_packages, to find overlapping postcodes + related_to_wave_2 = units_with_assigned_packages[ + units_with_assigned_packages["Postcode"].isin( + wave_2_data["Post Code"].values + ) & ( + ~units_with_assigned_packages["Confidence Tier"].isin( + [ + "1 - same archetype, same postal region", "1 - property was surveyed" + ] + ) + ) + ] + + wave2_matches = [] + for _, home in related_to_wave_2.iterrows(): + # Get the related homes + assigned_wave_2_packages = wave_2_data[ + wave_2_data["Post Code"] == home["Postcode"] + ] + + if assigned_wave_2_packages.shape[0] != 1: + # In this case, we get the closest match based on door number + hn = SearchEpc.get_house_number(home["Name"], home["Postcode"]) + + assigned_wave_2_packages = assigned_wave_2_packages[ + abs(assigned_wave_2_packages["house_number"].astype(int) - int(hn)) == min( + abs(assigned_wave_2_packages["house_number"].astype(int) - int(hn))) + ] + + wave2_matches.append( + { + "UPRN": home["UPRN"], + "2.1 matched address": assigned_wave_2_packages["Name"].values[0], + "2.1 matched address: Package Ref": assigned_wave_2_packages["Package Approved (Client)"].values[0], + "2.1 matched address: Wall Insulation": assigned_wave_2_packages["Wall Insulation"].values[0], + "2.1 matched address: Loft Insulation": assigned_wave_2_packages["Loft Insulation"].values[0], + "2.1 matched address: Ventilation": assigned_wave_2_packages["Ventilation"].values[0], + "2.1 matched address: Windows": assigned_wave_2_packages["Windwos Upgrade"].values[0] + } + ) + + # Store each results to CSV + for field, df in aggregated_results.items(): + df.to_csv( + os.path.join(CUSTOMER_FOLDER_PATH, f"{field} - aggregated results.csv"), index=False + ) + + # Store units_with_assigned_packages + units_with_assigned_packages.to_csv( + os.path.join(CUSTOMER_FOLDER_PATH, "Units with assigned packages - with flags.csv"), index=False + ) + # if __name__ == "__main__": # main() diff --git a/etl/customers/stonewater/potential_eco_properties.py b/etl/customers/stonewater/potential_eco_properties.py index 4fb89113..c0301e9a 100644 --- a/etl/customers/stonewater/potential_eco_properties.py +++ b/etl/customers/stonewater/potential_eco_properties.py @@ -375,3 +375,41 @@ def app(): "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI - WIP.csv", index=False ) + + +def cross_reference_epc_programme(): + eco3_fallout = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/STONEWATER LIST OF ADDRESSES TO BE " + "SURVEYED - ECO3 NOT COMPLETED.xlsx" + ) + + eco3_fallout["house_number"] = eco3_fallout.apply( + lambda x: SearchEpc.get_house_number(x["ADDRESS"], ""), axis=1 + ) + + # for _, x in eco3_fallout.ite + + stonewater_modelled_above_c = pd.read_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Osmosis Reviewed - Parity Download 18.7 - " + "master sheet.csv", + encoding='latin1' + ) + + stonewater_modelled_above_c["house_number"] = stonewater_modelled_above_c.apply( + lambda x: SearchEpc.get_house_number(x["Address"], x["Postcode"]), axis=1 + ) + + eco3_fallout_matched_to_above_c = [] + for _, property in eco3_fallout.iterrows(): + # Match on house number + match = stonewater_modelled_above_c[ + stonewater_modelled_above_c["house_number"] == property["house_number"] + ] + + # We do a fuzzy match on the address, with levenstein distance + + from fuzzywuzzy import fuzz + match = stonewater_modelled_above_c[ + stonewater_modelled_above_c["Address"].apply(lambda x: fuzz.ratio(x, property["ADDRESS"]) > 90) + ] + match.head() diff --git a/etl/customers/waltham_forest/whlg eligibile properties.py b/etl/customers/waltham_forest/whlg eligibile properties.py new file mode 100644 index 00000000..fee988c1 --- /dev/null +++ b/etl/customers/waltham_forest/whlg eligibile properties.py @@ -0,0 +1,77 @@ +""" +This is the list of properties, based on the EPC data, that look eligible for WHLG +""" +import pandas as pd +from etl.epc.settings import EARLIEST_EPC_DATE +from etl.spatial.OpenUprnClient import OpenUprnClient + +epc_data = pd.read_csv( + "/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates/domestic-E09000031-Waltham-Forest/certificates.csv" +) +epc_data.columns = [c.replace("_", "-").lower() for c in epc_data.columns] +epc_data = epc_data[epc_data["lodgement-date"] >= EARLIEST_EPC_DATE] + +epc_data = epc_data[~pd.isnull(epc_data["uprn"])] +epc_data["uprn"] = epc_data["uprn"].astype(int) + +epc_data = epc_data[epc_data["current-energy-rating"].isin(["D", "E", "F", "G"])] +epc_data = epc_data[epc_data["tenure"].isin( + ["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"]) +] + +whlg_eligible_postcodes = pd.read_excel( + "/Users/khalimconn-kowlessar/Downloads/WHLG-eligible-postcodes.xlsx", + sheet_name="Eligible postcodes", + header=1 +) +# Format: +whlg_eligible_postcodes = whlg_eligible_postcodes[['Postcode', 'Local Authority']] + +uprns = epc_data["uprn"].unique() +# Get data +ca_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev") +epc_data = epc_data.merge( + ca_data[["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"]].rename( + columns={"UPRN": "uprn"} + ), + how="left", + on="uprn", +) + +epc_data["has_conservation_restrictions"] = ( + (epc_data["conservation_status"] == True) + | (epc_data["is_listed_building"] == True) + | (epc_data["is_heritage_building"] == True) +) + +# Pathway 1: +# Match based on eligible postcodes +pathway1 = epc_data[epc_data["postcode"].isin(whlg_eligible_postcodes["Postcode"].values)] +pathway1 = pathway1[ + [ + "uprn", "address", "address1", "postcode", "current-energy-rating", "current-energy-efficiency", + "lodgement-date", + "has_conservation_restrictions", "walls-description", "roof-description", "mainheat-description" + ] +] + +pathway1 = pathway1.rename( + columns={ + "current-energy-rating": "EPC Rating", "current-energy-efficiency": "SAP Score", + "lodgement-date": "EPC Date", "has_conservation_restrictions": "Conservation Area Restrictions", + "walls-description": "Wall Type", "roof-description": "Roof Type", "mainheat-description": "Main Heating" + } +) + +pathway1["EPC Date"] = pd.to_datetime(pathway1["EPC Date"]).dt.strftime("%Y-%m-%d") +# Create a year EPC was lodged +pathway1["EPC Year"] = pd.to_datetime(pathway1["EPC Date"]).dt.year + +pathway1.to_csv( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Waltham Forest WHLG - Pathway 1 Eligibility.csv", + index=False +) + +# Pathway 2 or 3 +# The household will need to be means tested +pathway2 = epc_data[~epc_data["uprn"].isin(pathway1["uprn"].values)] diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index b6394275..5ea35a64 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -282,7 +282,8 @@ class RetrieveFindMyEpc: "Low energy lighting for all fixed outlets": ["low_energy_lighting"], "Cylinder thermostat recommendation": [], "Heating controls recommendation": [], - "Replace boiler with Band A condensing boiler": [], + "Replace boiler with Band A condensing boiler": ["boiler_upgrade"], + "Band A condensing gas boiler": ["boiler_upgrade"], "Solar panel recommendation": [], "Double glazing recommendation": [], "Solid wall insulation recommendation": [], @@ -295,6 +296,19 @@ class RetrieveFindMyEpc: "Change room heaters to condensing boiler": ["boiler_upgrade"], "Cylinder thermostat": ["cylinder_thermostat"], "Heat recovery system for mixer showers": ["heat_recovery_shower"], + "Room-in-roof insulation": ["room_in_roof_insulation"], + "Fan assisted storage heaters": [], + "Fan-assisted storage heaters": [], + "Step 1:": [], + "Biomass stove with boiler": [], + "Replace boiler with biomass boiler": [], + "Heating controls (room thermostat and thermostatic radiator valves)": [ + "roomstat_programmer_trvs", "time_temperature_zone_control" + ], + "Heating controls (programmer, and thermostatic radiator valves)": [ + "roomstat_programmer_trvs", "time_temperature_zone_control" + ], + "Replacement warm air unit": [] } survey = True diff --git a/etl/lodgement/app.py b/etl/lodgement/app.py new file mode 100644 index 00000000..c1da35dd --- /dev/null +++ b/etl/lodgement/app.py @@ -0,0 +1,326 @@ +import os + +import pandas as pd + +import utils.file_data_extraction as file_extraction_tools +from utils.fullSapParser import FullSapParser +from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser + +output_template = { + "Property Address": None, + "Osm. ID": None, + "Postcode": None, + "City/County": None, + "District/Town": None, + "Funding Stream": None, + # "Risk Path": None, + "Local Authority": None, + "Trustmark Lodgement ID": None, + "Certificate Number": None, + "EWI UMR": None, + "Loft UMR": None, + "Windows UMR": None, + "Doors UMR": None, + "Measure Lodgement Date": None, + "Full Lodgement Date": None, + "Owner - Name": None, + "Owner - Phone": None, + "Owner - Email": None, + "Tenant - Name": None, + "Tenant - Phone": None, + "R. Assessor - Name": None, + "R. Coordinator - Name": None, + "Trustmark Licence Number": None, + "Retrofit Assessment Date": None, + "Company Name": None, + "Retrofit Designer Name": None, + "Property Type": None, + "Property Detachment": None, + "No. of Bedrooms": None, + "Property age": None, + "SAP Rating Pre (from IMA)": None, + "Pre Heat Transfer": None, + "Pre Total Floor Area": None, + "Pre Heat Demand": None, + "Pre Air Tightness": None, + "SAP Rating Post (from EPC)": None, + "Post Heat Transfer": None, + "Post Total Floor Area": None, + "Post Heat Demand": None, + "Post Air Tightness": None, + "Number of Eligible Measures Installed": None, + "Total Cost of Works": None, + "Annual Fuel Saving (MTP)": None, +} + + +def update_dictionary_with_check(dictionary, updates): + """ + Updates a dictionary with key-value pairs, raising an error if the key does not exist. + + Args: + dictionary (dict): The dictionary to update. + updates (dict): The updates to apply. + + Raises: + KeyError: If a key in updates does not exist in the dictionary. + """ + for key, value in updates.items(): + if key not in dictionary: + raise KeyError(f"Key '{key}' does not exist in the dictionary.") + dictionary[key] = value + + +def handler(): + """ + This is a simple application that will extract the data from documents that have been uploaded to Sharepoint + to populate the lodgement spreadsheet with + :return: + """ + + # Ths source data will eventually come from Sharepoint + source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot" + output_template_file = "Trustmark Details - Template REV.25.11.24.xlsx" + funding_stream = "HUG2" + customer_name = "Shropshire Council" + customer_phone = "0345 678 9000" + customer_email = "affordablewarmth@shropshire.gov.uk" + + # TODO: In order for this to go live, we need to use Poppler, which needs to be installed + # w/ brew install poppler + # We also need to install Tesseract: brew install tesseract + + # List the folders in the source data path + folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))] + + extractors = { + "elmhurst epr": file_extraction_tools.ElmhurstEprExtractor, + "elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor, + "osmosis condition report": OsmosisConditionReportParser, + "elmhurst evidence report": None, + "full sap xml": FullSapParser, + "pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor, + "elmhurst project handover": file_extraction_tools.ElmhurstProjectHandoverExtractor, + "core logic pas assessment report": file_extraction_tools.CoreLogicPasAssessmentReportExtractor, + } + + extracted = [] + for property_folder in folders: + + property_folder_path = os.path.join(source_data_path, property_folder) + # List the folders in the source data path + subfolders = [ + x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x)) + ] + coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0]) + + # Get the contents of the folder + coordinator_folder_contents = [ + file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file)) + ] + + # We detect the various file types + extracted_contents = {} + for filename in coordinator_folder_contents: + filepath = os.path.join(coord_folder, filename) + if file_extraction_tools.is_pdf(filepath): + report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) + if report_type is None: + raise ValueError(f"Unknown report type for {filename}") + + file_extractor = extractors[report_type] + if file_extractor is None: + continue + + extracted_contents[report_type] = file_extractor(filepath).extract() + + if file_extraction_tools.is_xml(filepath): + xml_type = file_extraction_tools.detect_xml_report_type(xml_path=filepath) + if xml_type is None: + raise ValueError(f"Unknown report type for {filename}") + file_extractor = extractors.get(xml_type) + if file_extractor is None: + continue + + extracted_contents[xml_type] = file_extractor(filepath).extract() + + att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0]) + att_folder_contents = [ + file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file)) + ] + + for filename in att_folder_contents: + filepath = os.path.join(att_folder, filename) + if file_extraction_tools.is_pdf(filepath): + report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) + if report_type is None: + raise ValueError(f"Unknown report type for {filename}") + file_extractor = extractors[report_type] + + if file_extractor is None: + continue + + extracted_contents[report_type] = file_extractor(filepath).extract() + + lodgement_folder = os.path.join( + property_folder_path, [f for f in subfolders if "TrustMark Lodgement" in f][0] + ) + # Within the lodgement folder, we want the required documents sub-folder + lodgement_subfolders = [ + file for file in os.listdir(lodgement_folder) if os.path.isdir(os.path.join(lodgement_folder, file)) + ] + required_documents_folder = os.path.join( + lodgement_folder, [f for f in lodgement_subfolders if "required documents" in f.lower()][0] + ) + # List the contents + required_documents_contents = [ + file for file in os.listdir(required_documents_folder) if + os.path.isfile(os.path.join(required_documents_folder, file)) + ] + + # There are only a few file types we actually want to process in here for the moment + for filename in required_documents_contents: + filepath = os.path.join(required_documents_folder, filename) + if file_extraction_tools.is_pdf(filepath): + report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath) + if report_type != "elmhurst project handover": + continue + file_extractor = extractors[report_type] + + extracted_contents[report_type] = file_extractor(filepath).extract() + + output_row_data = output_template.copy() + + # dict_keys([ 'City/County', 'District/Town', + # 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR', + # 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone', + # 'Owner - Email', 'Tenant - Name', 'Tenant - Phone', + # 'Trustmark Licence Number', + # Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat + # Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness', + # 'Total Cost of Works', 'Annual Fuel Saving (MTP)']) + + update_dictionary_with_check( + output_row_data, + { + "Funding Stream": funding_stream, + "Property Address": property_folder.split(")")[1].strip(), + "Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(), + } + ) + + if extracted_contents.get("elmhurst epr"): + total_floor_area = sum( + [x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] + + # Get the conservatory floor area + [extracted_contents["elmhurst epr"]["Conservatory"]["Conservatory Floor Area"]] + ) + + pre_heat_transfer = extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] + pre_heat_demand = ( + extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area + ) + + epr_to_insert = { + "Postcode": extracted_contents["elmhurst epr"]["Postcode"], + "City/County": extracted_contents["elmhurst epr"]["County"], + "District/Town": extracted_contents["elmhurst epr"]["Town"], + "Local Authority": None, + 'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"], + 'Pre Heat Transfer': pre_heat_transfer, + 'Pre Total Floor Area': total_floor_area, + 'Pre Heat Demand': pre_heat_demand, + "R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"], + "Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"], + } + update_dictionary_with_check( + output_row_data, + epr_to_insert + ) + + if extracted_contents.get("full sap xml"): + xml_to_insert = { + "Property Type": extracted_contents["full sap xml"]["Property Type"], + "Property Detachment": extracted_contents["full sap xml"]["Built Form"], + "Property age": extracted_contents["full sap xml"]["Age Band"], + + } + update_dictionary_with_check( + output_row_data, + xml_to_insert + ) + + if extracted_contents.get("osmosis condition report"): + cr_to_insert = { + "No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"], + # "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"], + } + update_dictionary_with_check( + output_row_data, + cr_to_insert + ) + + if extracted_contents.get("elmhurst summary report"): + total_floor_area = sum( + [x["Floor Area (m2)"] for x in extracted_contents["elmhurst summary report"]["Building Parts"]] + + # Get the conservatory floor area + [extracted_contents["elmhurst summary report"]["Conservatory"]["Conservatory Floor Area"]] + ) + + pre_heat_transfer = ( + extracted_contents["elmhurst summary report"]["Primary Energy Use Intensity (kWh/m2/yr)"] + ) + pre_heat_demand = None # Don't have this + + summary_to_insert = { + "Postcode": extracted_contents["elmhurst summary report"]["Postcode"], + "City/County": extracted_contents["elmhurst summary report"]["County"], + "District/Town": extracted_contents["elmhurst summary report"]["Town"], + 'SAP Rating Pre (from IMA)': extracted_contents["elmhurst summary report"]["Current SAP Rating"], + 'Pre Heat Transfer': pre_heat_transfer, + 'Pre Total Floor Area': total_floor_area, + 'Pre Heat Demand': pre_heat_demand, + "R. Assessor - Name": extracted_contents["elmhurst summary report"]["Assessor Name"], + "Retrofit Assessment Date": extracted_contents["elmhurst summary report"]["Assessment Date"], + } + + update_dictionary_with_check( + output_row_data, + summary_to_insert + ) + + if extracted_contents.get("pulse air permeability"): + # We extract the AP50 number + results_table = extracted_contents["pulse air permeability"]["Results Table"] + ap50 = [x["Extrapolated @ 50PA"] for x in results_table if x["Metric"] == "Air Permeability"][0] + update_dictionary_with_check( + output_row_data, + {"Pre Air Tightness": ap50} + ) + + if extracted_contents.get("elmhurst project handover"): + handover_to_insert = { + "Number of Eligible Measures Installed": len( + extracted_contents["elmhurst project handover"]["Measures Fitted"] + ), + "Retrofit Designer Name": extracted_contents["elmhurst project handover"]["Designer Name"], + "Company Name": extracted_contents["elmhurst project handover"]["Installer Name"], + "R. Coordinator - Name": extracted_contents["elmhurst project handover"]["Retrofit Coordinator Name"], + } + update_dictionary_with_check(output_row_data, handover_to_insert) + + if extracted_contents.get("core logic pas assessment report"): + cr_to_insert = { + "No. of Bedrooms": extracted_contents["core logic pas assessment report"]["Number of bedrooms"], + } + update_dictionary_with_check( + output_row_data, + cr_to_insert + ) + + extracted.append(output_row_data) + + extracted_df = pd.DataFrame(extracted) + + extracted_df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot/poc-extrcted-data.csv", + index=False) diff --git a/etl/lodgement/requirements.txt b/etl/lodgement/requirements.txt new file mode 100644 index 00000000..412aed3b --- /dev/null +++ b/etl/lodgement/requirements.txt @@ -0,0 +1,14 @@ +PyPDF2 +pandas +tqdm +openpyxl +boto3 +usaddress==0.5.11 +fuzzywuzzy==0.18.0 +python-dotenv +python-docx +pymupdf +pytesseract +pdf2image +pillow +pdfplumber diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py index 6f9dd135..9ed55185 100644 --- a/etl/route_march_data_pull/app.py +++ b/etl/route_march_data_pull/app.py @@ -21,31 +21,65 @@ load_dotenv(dotenv_path="backend/.env") EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") -def get_data(asset_list, fulladdress_column, address1_column, postcode_column): +def get_data(asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map): epc_data = [] errors = [] no_epc = [] + # home = asset_list[asset_list["row_id"] == errors[5]].squeeze() for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): try: postcode = home[postcode_column] house_number = home[address1_column] full_address = home[fulladdress_column] + house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode) + if house_no is None: + house_no = house_number + uprn = manual_uprn_map.get(full_address, None) searcher = SearchEpc( - address1=str(house_number), + address1=str(house_no), postcode=postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="", property_type=None, fast=True, full_address=full_address, - max_retries=5 + max_retries=5, + uprn=uprn ) # Force the skipping of estimating the EPC searcher.ordnance_survey_client.property_type = None searcher.ordnance_survey_client.built_form = None searcher.find_property(skip_os=True) + + # Check if we have a flat or appartment + if searcher.newest_epc is None and uprn is None: + # Try again: + if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None: + # Backup + add1 = full_address.split(",")[1].strip() + else: + add1 = str(house_number) + searcher = SearchEpc( + address1=add1, + postcode=postcode, + auth_token=EPC_AUTH_TOKEN, + os_api_key="", + property_type=None, + fast=True, + full_address=full_address, + max_retries=5 + ) + + if ( + "flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in + house_number.lower() + ): + searcher.ordnance_survey_client.property_type = "Flat" + + searcher.find_property(skip_os=True) + if searcher.newest_epc is None: no_epc.append(home["row_id"]) continue @@ -63,7 +97,7 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column): ) find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() except ValueError as e: - if "No EPC found" in str(e): + if "No EPC found" in str(e) and "address1" in searcher.newest_epc: find_epc_searcher = RetrieveFindMyEpc( address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"] ) @@ -120,17 +154,20 @@ def app(): Property UPRN """ - DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Bromford/" - DATA_FILENAME = "Bromford programme review.xlsx" - SHEET_NAME = "Bromford" + DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Watford" + DATA_FILENAME = "JS Mailing List 10122024.xlsx" + SHEET_NAME = "Export" POSTCODE_COLUMN = "Postcode" - FULLADDRESS_COLUMN = None - ADDRESS1_COLUMN = "No." - ADDRESS1_METHOD = "first_two_words" - ADDRESS_COLS_TO_CONCAT = ["No.", "Address"] + FULLADDRESS_COLUMN = "Property Address" + ADDRESS1_COLUMN = "Address Line 1" + ADDRESS1_METHOD = None + ADDRESS_COLS_TO_CONCAT = [] + + # Maps addresses to uprn in problematic cases + MANUAL_UPRN_MAP = {} asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME) - asset_list = asset_list[~pd.isnull(asset_list["Postcode"])] + asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index() asset_list["row_id"] = asset_list.index # We clean up portential non-breaking spaces, and double spaces @@ -156,12 +193,14 @@ def app(): # Drop the dupes print(f"There are {asset_list['deduper'].duplicated().sum()} duplicated addresses - dropping") asset_list = asset_list[~asset_list["deduper"].duplicated()] + asset_list = asset_list.drop(columns=["deduper"]) epc_data, errors, no_epc = get_data( asset_list=asset_list, fulladdress_column=FULLADDRESS_COLUMN, address1_column=ADDRESS1_COLUMN, - postcode_column=POSTCODE_COLUMN + postcode_column=POSTCODE_COLUMN, + manual_uprn_map=MANUAL_UPRN_MAP ) # We now retrieve any failed properties @@ -170,7 +209,8 @@ def app(): asset_list=asset_list_failed, fulladdress_column=FULLADDRESS_COLUMN, address1_column=ADDRESS1_COLUMN, - postcode_column=POSTCODE_COLUMN + postcode_column=POSTCODE_COLUMN, + manual_uprn_map=MANUAL_UPRN_MAP ) # Append the failed data to the main data @@ -202,7 +242,8 @@ def app(): transformed_df = pd.DataFrame(transformed_data) # Drop the column that is "" - transformed_df = transformed_df.drop(columns=[""]) + if "" in transformed_df.columns: + transformed_df = transformed_df.drop(columns=[""]) # Get the find my epc data find_my_epc_data = epc_df[["row_id", "find_my_epc_data"]].drop(columns=["find_my_epc_data"]).join( @@ -217,6 +258,9 @@ def app(): [ "row_id", "uprn", + "address1", + "address", + "postcode", "property-type", "built-form", "inspection-date", @@ -224,6 +268,7 @@ def app(): "current-energy-efficiency", "roof-description", "walls-description", + "floor-description", "transaction-type", # New fields needed "secondheat-description", @@ -236,7 +281,7 @@ def app(): "energy-consumption-current", # kwh/m2 "photo-supply", ] - ] + ].rename(columns={"address1": "Address1 on EPC", "address": "Address on EPC", "postcode": "Postcode on EPC"}) asset_list = asset_list.merge( epc_df, @@ -276,6 +321,7 @@ def app(): "number-habitable-rooms": "Number of Habitable Rooms", "walls-description": "Wall Construction", "roof-description": "Roof Construction", + "floor-description": "Floor Construction", "mainheat-description": "Heating Type", "secondheat-description": "Secondary Heating", "transaction-type": "Reason for last EPC", @@ -329,5 +375,9 @@ def app(): asset_list = asset_list.drop(columns=["row_id"]) # Store as an excel - filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx" + filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx" asset_list.to_excel(filename, index=False) + + matches_review = asset_list[ + [FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"] + ] diff --git a/recommendations/DraughtProofingRecommendations.py b/recommendations/DraughtProofingRecommendations.py index 4bd85a03..a16a94f6 100644 --- a/recommendations/DraughtProofingRecommendations.py +++ b/recommendations/DraughtProofingRecommendations.py @@ -26,6 +26,9 @@ class DraughtProofingRecommendations: if not draught_proofing_recommendation_config: return + # Cost is based on a £50 cost per window, based on Checkatrade + cost = draught_proofing_recommendation_config.get("cost", self.property.number_of_windows * 50) + description = ( "Draught proof doors and windows to improve energy efficiency" if not draught_proofing_recommendation_config.get("description") @@ -48,7 +51,7 @@ class DraughtProofingRecommendations: "kwh_savings": 0, "co2_equivalent_savings": 0, "energy_cost_savings": 0, - "total": draught_proofing_recommendation_config["cost"], + "total": cost, # We use a very simple and rough estimate of 4 hours per unit "labour_hours": draught_proofing_recommendation_config.get("labour_hours", 8), "labour_days": draught_proofing_recommendation_config.get("labour_days", 1), # Assume 8 hour day diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py index 7dc4f8b2..1eab7d42 100644 --- a/recommendations/HeatingRecommender.py +++ b/recommendations/HeatingRecommender.py @@ -1,5 +1,6 @@ import re import backend.app.assumptions as assumptions +from etl.customers.immo.pilot.asset_list import non_invasive_recommendations from recommendations.Costs import Costs, BOILER_UPGRADE_SCHEME_ASHP_VALUE from recommendations.recommendation_utils import ( check_simulation_difference, override_costs, combine_recommendation_configs @@ -981,6 +982,10 @@ class HeatingRecommender: self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor", "Average"] ) + non_invasive_recommendation = next(( + r for r in self.property.non_invasive_recommendations if r["type"] == "boiler_upgrade" + ), {}) + if has_inefficient_space_heating or has_inefficient_water: boiler_size = self.estimate_boiler_size( property_type=self.property.data["property-type"], @@ -1079,12 +1084,13 @@ class HeatingRecommender: "description": description, "starting_u_value": None, "new_u_value": None, - "sap_points": None, + "sap_points": non_invasive_recommendation.get("sap_points", None), "already_installed": already_installed, "simulation_config": simulation_config, "description_simulation": description_simulation, **boiler_costs, "system_type": "boiler_upgrade", + "survey": non_invasive_recommendation.get("survey", None) } # We recommend the heating controls @@ -1111,6 +1117,8 @@ class HeatingRecommender: if system_change: # We combine the heating and controls recommendations, in the case of a system change + # If this is true, we set SAP points to None and survey to False for the boiler recommendation + combined_recommendations = [] for controls_recommendation in controls_recommender.recommendation: combined_recommendation = self.combine_heating_and_controls( diff --git a/recommendations/HotwaterRecommendations.py b/recommendations/HotwaterRecommendations.py index b86329e4..d8404cc1 100644 --- a/recommendations/HotwaterRecommendations.py +++ b/recommendations/HotwaterRecommendations.py @@ -20,6 +20,8 @@ class HotwaterRecommendations: :return: """ # Reset the recommendations + recommendations_phase = phase + self.recommendations = [] non_invasive_recommendations = self.property.non_invasive_recommendations if non_invasive_recommendations: @@ -28,7 +30,6 @@ class HotwaterRecommendations: r["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"] ] - recommendations_phase = phase for m in measures: non_invasive_rec = [ r for r in non_invasive_recommendations if r["type"] == m @@ -55,7 +56,7 @@ class HotwaterRecommendations: if self.property.hotwater["clean_description"] == "Gas boiler/circulator, no cylinder thermostat": # Handle this case specifically: - self.recommend_cylinder_thermostat_gas_boiler_circulator(phase=phase) + self.recommend_cylinder_thermostat_gas_boiler_circulator(phase=recommendations_phase) return # If there is no system present, but access to the mains, we @@ -68,14 +69,14 @@ class HotwaterRecommendations: (self.property.hotwater["no_system_present"] is None) & (len(has_tank_recommendation) == 0) ): - self.recommend_tank_insulation(phase=phase) + self.recommend_tank_insulation(phase=recommendations_phase) return has_cylinder_recommendation = [r for r in self.recommendations if r["type"] == "cylinder_thermostat"] if ((self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat") & (len(has_cylinder_recommendation) == 0)): - self.recommend_cylinder_thermostat(phase=phase) + self.recommend_cylinder_thermostat(phase=recommendations_phase) return def recommend_tank_insulation(self, phase, sap_points=None, survey=False, _return=False): diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index ed6a8526..189581d8 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -311,7 +311,7 @@ class Recommendations: continue has_u_value = recommendations_by_type[0].get("new_u_value") is not None - has_sap_points = recommendations_by_type[0].get("sap_points") is not None + has_sap_points = all([r.get("sap_points") is not None for r in recommendations_by_type]) has_rank = recommendations_by_type[0].get("rank") is not None # When check if these recommendations have two different types, such as solid wall insulation @@ -449,6 +449,7 @@ class Recommendations: property_instance, all_predictions, recommendations, + representative_recommendations, ): """ @@ -473,6 +474,9 @@ class Recommendations: property_recommendations = recommendations[property_instance.id].copy() + representative_recs = representative_recommendations[property_instance.id].copy() + representative_ids = [r["recommendation_id"] for r in representative_recs] + increasing_variables = ["sap"] decreasing_variables = ["carbon", "heat_demand"] @@ -530,7 +534,9 @@ class Recommendations: else: - previous_phase_values_multiple = [x for x in impact_summary if x["phase"] == (rec["phase"] - 1)] + previous_phase_values_multiple = [ + x for x in impact_summary if x["phase"] == (rec["phase"] - 1) and x["representative"] + ] if len(previous_phase_values_multiple) != 1: # Take an average of each of the previous phases keys_to_median = ["sap", "carbon", "heat_demand"] @@ -628,7 +634,9 @@ class Recommendations: impact_summary.append( { "phase": rec["phase"], + "representative": rec["recommendation_id"] in representative_ids, "recommendation_id": rec["recommendation_id"], + "measure_type": rec["measure_type"], **current_phase_values } ) diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py index 51264b75..4e29083f 100644 --- a/recommendations/RoofRecommendations.py +++ b/recommendations/RoofRecommendations.py @@ -290,6 +290,11 @@ class RoofRecommendations: insulation_materials = pd.DataFrame(insulation_materials) + non_invasive_recommendations = next( + (r for r in self.property.non_invasive_recommendations if + r["type"] == insulation_materials["type"].values[0]), {} + ) + lowest_selected_u_value = None recommendations = [] for _, insulation_material_group in insulation_materials.groupby("description"): @@ -429,14 +434,15 @@ class RoofRecommendations: "description": self.make_roof_insulation_description(material), "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": None, + "sap_points": non_invasive_recommendations.get("sap_points", 0), "already_installed": already_installed, "simulation_config": simulation_config, "description_simulation": { "roof-description": new_description, "roof-energy-eff": new_efficiency }, - **cost_result + **cost_result, + "survey": non_invasive_recommendations.get("survey", False) } ) diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index f77ae5a0..92147fb8 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -385,6 +385,11 @@ class WallRecommendations(Definitions): if insulation_thickness == "below average": cavity_width = cavity_width * (1 - PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION) + non_invasive_recommendations = next( + (r for r in self.property.non_invasive_recommendations if + r["type"] == insulation_materials["type"].values[0]), {} + ) + # Test the different fill options lowest_selected_u_value = None recommendations = [] @@ -475,14 +480,15 @@ class WallRecommendations(Definitions): "description": description, "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": None, + "sap_points": non_invasive_recommendations.get("sap_points", None), "already_installed": already_installed, "simulation_config": simulation_config, "description_simulation": { "walls-description": "Cavity wall, filled cavity", "walls-energy-eff": "Good" }, - **cost_result + **cost_result, + "survey": non_invasive_recommendations.get("survey", False) } ) diff --git a/utils/OsmosisCondtionReportParser.py b/utils/OsmosisCondtionReportParser.py new file mode 100644 index 00000000..4d8873a2 --- /dev/null +++ b/utils/OsmosisCondtionReportParser.py @@ -0,0 +1,49 @@ +import re +import boto3 +import PyPDF2 +import fitz + + +class OsmosisConditionReportParser: + + def __init__(self, filekey, bucket_name=None): + self.s3_client = boto3.client('s3') + self.bucket_name = bucket_name + self.filekey = filekey + self.pdf_text = None + + self._read_file() + + def _read_file(self): + """ + Reads the XML file either locally or from S3 and parses it using minidom. + + Raises: + ValueError: If the file cannot be found, read, or parsed. + """ + + chunk_size = 10 + + try: + if self.bucket_name: + # Read from S3 + raise NotImplementedError("Imeplement me") + else: + + with fitz.open(self.filekey) as pdf: + text = "" + for page in pdf: + text += page.get_text() + + # Parse the XML content using minidom + self.pdf_text = text + except FileNotFoundError: + raise ValueError(f"Local file not found: {self.filekey}") + except Exception as e: + raise ValueError(f"An error occurred while reading or parsing the XML: {e}") + + def extract(self): + return { + "No. of Bedrooms": int(re.search(r"No\. of Bedrooms \(Total\)\s*(\d+)", self.pdf_text).group(1)), + "Risk Assessment Pathway": re.search(r"Risk\s*Assessment\s*Pathway\s*([A-Z])", self.pdf_text).group(1) + } diff --git a/utils/file_data_extraction.py b/utils/file_data_extraction.py new file mode 100644 index 00000000..2e849ef5 --- /dev/null +++ b/utils/file_data_extraction.py @@ -0,0 +1,1150 @@ +import PyPDF2 +import re +import pdfplumber +from collections import Counter +from utils.logger import setup_logger +from xml.dom.minidom import parseString +from pdf2image import convert_from_path +from pytesseract import image_to_string + +logger = setup_logger() + +""" +This script contains functions used to extract data from retrofit survey files, including EPRs, +summary reports, etc +""" + + +def is_elmhurst_energy_report(text): + """ + Determines if the provided text indicates that the PDF is an Energy Report. + Returns True if the text contains 'Energy Report'. + """ + return text.startswith("ENERGY REPORT") + + +def is_elmhurst_summary_report(text): + """ + Determines if the provided text indicates that the PDF is a Summary Report. + """ + return text.startswith("Summary Information") + + +def is_osmosis_condition_report(text): + """ + Determines if the provided text indicates that the PDF is a Condition Report. + """ + return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport") + + +def is_elmhurst_evidence_report(text): + """ + Determines if the provided text indicates that the PDF is an Elmhurst Evidence Report. + """ + return text.startswith("RdSAP Evidence Report") + + +def is_pulse_air_permeability(text): + """ + Determines if the provided text indicates that the PDF is a Pulse Air Permeability Report. + """ + return text.startswith("Air Permeability Test Report @O PULSE") + + +def is_elmhurst_project_handover(text): + """ + Determines if the provided text indicates that the PDF is an Elmhurst Project Handover Report. + """ + return "Retrofit_Project_Handover" in text or "Retrofit Project Handover" in text + + +def is_core_logic_pas_assessment_report(text): + """ + Determines if the provided text indicates that the PDF is a PAS Assessment Report. + """ + return text.startswith("Generated Using CoreLogic UK PAS Assessment") + + +def detect_pdf_report_type(pdf_path): + """ + Detects the type of report based on content or filename. + :param pdf_path: String path to the PDF file + :return: String type of the report ("epr", "summary", or None) + """ + # Attempt to read the first page of the PDF to determine type + with open(pdf_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + first_page_text = reader.pages[0].extract_text() if reader.pages else "" + + if first_page_text == "": + # Convert PDF pages to images + logger.info("Extracting text from PDF images..., this may take a moment.") + pages = convert_from_path(pdf_path, dpi=300) + if pages: + first_page_text = image_to_string(pages[0]) + + if is_elmhurst_energy_report(first_page_text): + return "elmhurst epr" + elif is_elmhurst_summary_report(first_page_text): + return "elmhurst summary report" + elif is_osmosis_condition_report(first_page_text): + return "osmosis condition report" + elif is_elmhurst_evidence_report(first_page_text): + return "elmhurst evidence report" + elif is_pulse_air_permeability(first_page_text): + return "pulse air permeability" + elif is_elmhurst_project_handover(first_page_text): + return "elmhurst project handover" + elif is_core_logic_pas_assessment_report(first_page_text): + return "core logic pas assessment report" + + return None + + +def detect_xml_report_type(xml_path): + """ + Detects the type of XML report based on content or filename. + :param xml_path: String path to the XML file + :return: String type of the report ("full sap xml", or None) + """ + # Attempt to read the first page of the PDF to determine type + with open(xml_path, "r") as file: + contents = file.read() + + contents = parseString(contents) + product_tag_search = contents.getElementsByTagName("Product") + if product_tag_search: + if product_tag_search[0].firstChild.nodeValue == "Sap 2012 Desktop": + return "full sap xml" + + raise Exception("Not implemented") + + +def is_pdf(filename): + """ + Determines if the provided filename is a PDF file. + """ + return filename.endswith(".pdf") + + +def is_xml(filename): + """ + Determines if the provided filename is an XML file. + """ + return filename.endswith(".xml") + + +class ElmhurstEprExtractor: + """ + A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). + """ + + def __init__(self, file_path): + self.file_path = file_path + + @staticmethod + def extract_window_age_description(windows_text): + """ + Extracts the most common window age description and its proportion. + """ + windows_text = windows_text.replace("\n", "") + window_descriptions = [ + "Double post or during 2002", + "Double pre 2002", + "Double with unknown install date", + "Secondary glazing", + "Triple glazing", + "Single glazing", + ] + description_counts = Counter() + for description in window_descriptions: + matches = re.findall(re.escape(description), windows_text) + description_counts[description] = len(matches) + + if not description_counts or not sum(description_counts.values()): + raise ValueError("Failed to extract window data.") + + most_common_description, window_count = description_counts.most_common(1)[0] + window_proportion = window_count / sum(description_counts.values()) * 100 + + if window_proportion == 100: + second_most_common_description = None + second_most_common_proportion = 0 + else: + second_most_common_description, second_window_count = description_counts.most_common(2)[1] + second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 + + return { + "Window Age Description": most_common_description, + "Window Age Description Proportion (%)": window_proportion, + "Secondary Window Age Description": second_most_common_description, + "Secondary Window Age Description Proportion (%)": second_most_common_proportion, + "Number of Windows": sum(description_counts.values()) + } + + @staticmethod + def extract_building_parts(text): + """ + Extracts building parts and associated dimensions from the provided text. + """ + data = [] + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party " + r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)", + re.DOTALL + ) + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + floor_data = match.group(2) + room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name) + if room_in_roof_match: + floor_area = float(room_in_roof_match.group(1)) + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + data.append({ + "Building Part": cleaned_part_name, + "Floor Level": "Room in Roof", + "Floor Area (m2)": floor_area, + "Room Height (m)": None, + "Perimeter (m)": None, + "Party Wall Length (m)": None + }) + else: + cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip() + + floor_pattern = re.compile( + r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + for floor_match in floor_pattern.finditer(floor_data): + floor_level = floor_match.group(1) + floor_area = float(floor_match.group(2)) + room_height = float(floor_match.group(3)) + perimeter = float(floor_match.group(4)) + party_wall_length = float(floor_match.group(5)) + data.append({ + "Building Part": cleaned_part_name, + "Floor Level": floor_level, + "Floor Area (m2)": floor_area, + "Room Height (m)": room_height, + "Perimeter (m)": perimeter, + "Party Wall Length (m)": party_wall_length + }) + + return data + + @staticmethod + def extract_roof_details(text): + """ + Extracts roof details for each building part in the provided text. + """ + roof_data = [] + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", + re.DOTALL + ) + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + part_details = match.group(2) + roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details) + roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details) + roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details) + + roof_data.append({ + "Building Part": cleaned_part_name, + "Roof Type": roof_type_match.group(1).strip() if roof_type_match else None, + "Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None, + "Roof Insulation Thickness": roof_insulation_thickness_match.group( + 1).strip() if roof_insulation_thickness_match else None, + }) + + return roof_data + + @staticmethod + def extract_wall_details(text): + """ + Extracts wall details for each building part in the provided text. + """ + wall_data = [] + building_part_pattern = re.compile( + r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)", + re.DOTALL + ) + for match in building_part_pattern.finditer(text): + part_name = match.group(1).strip() + cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip() + part_details = match.group(2) + wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details) + wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details) + wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details) + wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details) + + wall_data.append({ + "Building Part": cleaned_part_name, + "Wall Type": wall_type_match.group(1).strip() if wall_type_match else None, + "Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None, + "Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None, + "Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None, + }) + + return wall_data + + @staticmethod + def extract_conservatory(text): + """ + Extracts conservatory data from the provided text. + The section is located between "Conservatory" and "Doors". + + Args: + text (str): The full text of the EPR PDF. + + Returns: + dict: A dictionary with conservatory details: + - "Conservatory Present" + - "Conservatory Separated" + - "Conservatory Floor Area" + - "Conservatory Double Glazed" + - "Conservatory Glazed Perimeter" + - "Heated Conservatory Height" + """ + + conservatory_match = re.search(r"Conservatory\s*(.*?)\s*Doors", text, re.DOTALL) + if not conservatory_match: + logger.error("Failed to extract conservatory data.") + raise ValueError("Could not extract conservatory data.") + + conservatory_text = conservatory_match.group(1) + + # Check if conservatory is present + present_match = re.search(r"Conservatory Present:\s*(Yes|No)", conservatory_text) + + if not present_match or present_match.group(1).strip() == "No": + logger.info("Conservatory not present.") + return { + "Conservatory Present": "No", + "Conservatory Separated": "", + "Conservatory Floor Area": 0, + "Conservatory Double Glazed": "", + "Conservatory Glazed Perimeter": 0, + "Heated Conservatory Height": "", + } + + # Extract conservatory details + separated_match = re.search(r"Conservatory Separated:\s*(Yes|No)", conservatory_text) + floor_area_match = re.search(r"Conservatory Floor Area:\s*([\d.]+)", conservatory_text) + double_glazed_match = re.search(r"Conservatory Double Glazed:\s*(Yes|No)", conservatory_text) + glazed_perimeter_match = re.search(r"Conservatory Glazed Perimeter:\s*([\d.]+)", conservatory_text) + height_match = re.search(r"Heated Conservatory Height:\s*(.*?)(?=\n|$)", conservatory_text) + + return { + "Conservatory Present": "Yes", + "Conservatory Separated": separated_match.group(1).strip() if separated_match else "", + "Conservatory Floor Area": float(floor_area_match.group(1)) if floor_area_match else 0, + "Conservatory Double Glazed": double_glazed_match.group(1).strip() if double_glazed_match else "", + "Conservatory Glazed Perimeter": float(glazed_perimeter_match.group(1)) if glazed_perimeter_match else 0, + "Heated Conservatory Height": height_match.group(1).strip() if height_match else "", + } + + @staticmethod + def _extract_heating_details(section_text, default_value=""): + """ + Extracts heating details from a given section of text. + + Args: + section_text (str): The section of text containing heating details. + default_value (str, optional): The default value to return for missing fields. Defaults to "". + + Returns: + dict: A dictionary containing heating system details. + """ + system_search = re.search(r"Main Heating Code\s*(.*?)\n", section_text) + pcdf_search = re.search(r"PCDF boiler Reference\s*(\d+)", section_text) + controls_search = re.search(r"Main Heating Controls\s*(.*?)\n", section_text) + heat_search = re.search(r"Percentage of Heat\s*(\d+)\s*%?", section_text) + + return { + "System": system_search.group(1).strip() if system_search else default_value, + "PCDF Reference": pcdf_search.group(1) if pcdf_search else default_value, + "Controls": controls_search.group(1).strip() if controls_search else default_value, + "% of Heat": int(heat_search.group(1)) if heat_search else 0, + } + + def extract_primary_heating(self, text): + + # Extract Primary Heating Section (Main Heating 1) + primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL) + # We may not have a secondary heating + primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + primary_text = primary_heating_section.group(1) + + return self._extract_heating_details(primary_text) + + def extract_secondary_heating_details(self, text): + # Extract Secondary Heating Section (Main Heating 2) + secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL) + + output = {} + if secondary_heating_section is None: + + output["System"] = "" + output["PCDF Reference"] = "" + output["Controls"] = "" + output["% of Heat"] = 0 + + else: + secondary_text = secondary_heating_section.group(1) + output.update( + **self._extract_heating_details(secondary_text) + ) + + output["Heating Code"] = ( + re.search(r"Secondary Heating Code\s*(.*?)\n", text).group(1).strip() + if output["System"] and re.search(r"Secondary Heating Code\s*(.*?)\n", text) + else "" + ) + + return output + + def extract(self): + """ + Extracts all relevant data from the EPR PDF. + + Returns: + dict: A dictionary containing extracted data, including: + - Address and Postcode + - SAP Rating and Primary Energy Use + - Lighting, Doors, Windows, Roof, and Wall Details + - Heating systems (Primary and Secondary) + - Building Parts + """ + data = {} + + with open(self.file_path, "rb") as file: + reader = PyPDF2.PdfReader(file) + text = "".join(page.extract_text() for page in reader.pages) + + data["Assessor Name"] = re.search(r"Created by:\s*(.*?)\n", text).group(1).strip() + data["Assessment Date"] = re.search(r"\nAssessment Date\s*(.*?)\n", text).group(1).strip() + + # Extracting individual components + address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL) + if not address_match: + logger.error("Failed to extract address.") + raise ValueError("Failed to extract address.") + data["Address"] = address_match.group(1).strip() + data["Postcode"] = data["Address"].split(",")[-1].strip() + + # TODO: + data["Region"] = None + data["House Name"] = None + data["House No"] = None + data["Street"] = None + data["Locality"] = None + data["Town"] = None + data["County"] = None + + sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text) + if not sap_match: + logger.error("Failed to extract SAP rating.") + raise ValueError("Failed to extract SAP rating.") + data["Current SAP Rating"] = int(sap_match.group(1)) + + energy_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text) + if not energy_match: + logger.error("Failed to extract primary energy use.") + raise ValueError("Failed to extract primary energy use.") + data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(energy_match.group(1)) + + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + if not storeys_match: + logger.error("Failed to extract the number of storeys.") + raise ValueError("Failed to extract the number of storeys.") + data["Number of Storeys"] = int(storeys_match.group(1)) + + fuel_match = re.search(r"TOTAL\s*£(\d+)", text) + if not fuel_match: + logger.error("Failed to extract fuel bill.") + raise ValueError("Failed to extract fuel bill.") + data["Fuel Bill"] = f"£{fuel_match.group(1)}" + + total_doors_match = re.search(r"Total Doors:\s*(\d+)", text) + if not total_doors_match: + logger.error("Failed to extract total doors.") + raise ValueError("Failed to extract total doors.") + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text) + if not insulated_doors_match: + logger.error("Failed to extract insulated doors.") + raise ValueError("Failed to extract insulated doors.") + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + # Get number of lighting outlets and number of fittings needing LEL + lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text) + if not lighting_fittings_match: + logger.error("Failed to extract lighting.") + raise ValueError("Failed to extract lighting") + data["Number of Light Fittings"] = int(lighting_fittings_match.group(1)) + lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text) + if not lel_fittings_match: + logger.error("Failed to extract LEL fittings.") + raise ValueError("Failed to extract LEL fittings.") + data["Number of LEL Fittings"] = int(lel_fittings_match.group(1)) + data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + if not windows_section: + logger.error("Failed to extract window data.") + raise ValueError("Failed to extract window data.") + data["Windows"] = self.extract_window_age_description(windows_section.group(1)) + + data["Primary Heating"] = self.extract_primary_heating(text) + data["Secondary Heating"] = self.extract_secondary_heating_details(text) + data["Building Parts"] = self.extract_building_parts(text) + data["Roof Details"] = self.extract_roof_details(text) + data["Wall Details"] = self.extract_wall_details(text) + data["Conservatory"] = self.extract_conservatory(text) + + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + if not water_heating_code_match: + logger.error("Failed to extract water heating code.") + raise ValueError("Failed to extract water heating code.") + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + return data + + +class ElmhurstSummaryReportExtractor: + """ + A utility class for extracting specific data from Elmhurst Energy Performance Reports (EPR). + """ + + def __init__(self, file_path): + self.file_path = file_path + + @staticmethod + def extract_window_age_description(windows_text): + """ + Extracts the most common window age description and its proportion. + + Parameters: + windows_text (str): The text section containing window data. + + Returns: + dict: A dictionary with the most common window age description and its proportion. + """ + # Clean up windows_text by removing line breaks for better pattern matching + windows_text = windows_text.replace("\n", "") + + # Define possible window age descriptions + window_descriptions = [ + "Double post or during 2002", + "Double pre 2002", + "Double with unknown install date", + "Secondary glazing", + "Triple glazing", + "Single glazing", + ] + + # Count occurrences of each description + description_counts = Counter() + for description in window_descriptions: + matches = re.findall(re.escape(description), windows_text) + description_counts[description] = len(matches) + + if not description_counts or not sum(description_counts.values()): + raise ValueError("Failed to extract window data.") + + # Determine the most common description and calculate its proportion + most_common_description, window_count = description_counts.most_common(1)[0] + window_proportion = window_count / sum(description_counts.values()) * 100 + + # Get the second most common and the proportion + if window_proportion == 100: + second_most_common_description = None + second_most_common_proportion = 0 + else: + second_most_common_description, second_window_count = description_counts.most_common(2)[1] + second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100 + + return { + "Window Age Description": most_common_description, + "Window Age Description Proportion (%)": window_proportion, + "Secondary Window Age Description": second_most_common_description, + "Secondary Window Age Description Proportion (%)": second_most_common_proportion, + "Number of Windows": sum(description_counts.values()) + } + + @staticmethod + def extract_primary_heating(text): + primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL) + primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2 + if primary_heating_section is None: + raise ValueError("Failed to extract primary heating data.") + + primary_text = primary_heating_section.group(1) + + output = { + 'System': re.search(r"Main Heating Code\s*(.*?)\n", primary_text).group(1).strip(), + 'PCDF Reference': re.search(r"PCDF boiler Reference\s*(\d+)", primary_text).group(1), + 'Controls': re.search(r"Main Heating Controls\s*(.*?)\n", primary_text).group(1).strip(), + '% of Heat': int(re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1)) + } + return output + + @staticmethod + def extract_secondary_heating_details(text): + secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL) + + # Defaults + output = { + "System": "", + "PCDF Reference": "", + "Controls": "", + "% of Heat": 0, + "Heating Code": "" + } + if secondary_heating_section is not None: + # Overwrite defaults + secondary_text = secondary_heating_section.group(1) + + main_heating_code_match_secondary = re.search( + r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text + ) + output["System"] = main_heating_code_match_secondary.group(1).strip() + output["PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)", secondary_text).group(1) + + second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text) + output["Heating Controls"] = ( + second_heating_controls_match.group(1).strip() if second_heating_controls_match else "" + ) + output["% of Heat"] = int( + re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1) + ) + + secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text) + if output["System"] != "": + output["Heating Code"] = ( + secondary_heating_code_match.group(1).strip() if secondary_heating_code_match else "" + ) + + return output + + @staticmethod + def extract_building_parts(text): + """ + Extracts building parts and associated dimensions from the summary report PDF. + This includes Main Property, multiple extensions if they exist, and Room in Roof areas. + """ + data = [] + + # Locate the Dimensions section + dimensions_section = re.search( + r"Dimensions:\s*Dimension type: Internal\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL + ) + if not dimensions_section: + raise ValueError("Failed to locate dimensions section in the text.") + + dimensions_text = dimensions_section.group(1) + + # Pattern to extract each building part, starting from Main Property and including extensions + building_part_pattern = re.compile( + r"(Main Property|\d+(?:st|nd|rd|th) Extension)\s*" + r"(.*?)(?=\d+(?:st|nd|rd|th) Extension|5\.0 Conservatory|$)", + re.DOTALL + ) + + # Loop through each building part match, including Main Property and extensions + for match in building_part_pattern.finditer(dimensions_text): + part_name = match.group(1) + floor_data = match.group(2) + + # Pattern to extract floor details: Floor Level, Floor Area, Room Height, Perimeter, Party Wall Length + floor_pattern = re.compile( + r"(1st Floor|Lowest Floor|Second floor):\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)" + ) + + # Extract data for each floor within the building part + for floor_match in floor_pattern.finditer(floor_data): + floor_level = floor_match.group(1) + floor_area = float(floor_match.group(2)) + room_height = float(floor_match.group(3)) + perimeter = float(floor_match.group(4)) + party_wall_length = float(floor_match.group(5)) + + # Append to data list + data.append( + { + "Building Part": part_name, + "Floor Level": floor_level, + "Floor Area (m2)": floor_area, + "Room Height (m)": room_height, + "Perimeter (m)": perimeter, + "Party Wall Length (m)": party_wall_length + } + ) + + # Check specifically for "Room(s) in Roof" entries, which only have Floor Area + room_in_roof_pattern = re.compile(r"Room\(s\) in Roof:\s*([\d.]+)") + room_in_roof_match = room_in_roof_pattern.search(floor_data) + if room_in_roof_match: + floor_area = float(room_in_roof_match.group(1)) + data.append( + { + "Building Part": part_name, + "Floor Level": "Room in Roof", + "Floor Area (m2)": floor_area, + "Room Height (m)": None, # Placeholder for missing data + "Perimeter (m)": None, # Placeholder for missing data + "Party Wall Length (m)": None # Placeholder for missing data + } + ) + + return data + + @staticmethod + def extract_roof_details(text): + """ + Extracts roof type, insulation, and insulation thickness for each building part + in the 8.0 Roofs section of the summary report. + """ + # Define data structure to hold results + roof_data = [] + + # Locate the entire 8.0 Roofs section + roof_section_match = re.search(r"8\.0 Roofs:\n(.*?)(?=\n9\.0 Floors:|$)", text, re.DOTALL) + if not roof_section_match: + return roof_data # Return empty if no roof section is found + + # Extract the roof section and append "9.0 Floors:" as the boundary + roof_section = roof_section_match.group(1).strip() + "\n9.0 Floors:" + + # Define pattern to match each building part's roof entry + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)(?=\n(?:Insulation|9\.0 Floors:|[A-Z]))" # Matches Roof Type until the next field, label, + # or end + r"(?:\nInsulation\s+(.*?)(?=\n(?:Insulation Thickness|9\.0 Floors:|[A-Z])))?" # Optional Insulation + r"(?:\nInsulation Thickness\s+(.*?)(?=\n(?:9\.0 Floors:|[A-Z])))?", # Optional Insulation Thickness + re.DOTALL + ) + + # Extract each building part's data + for match in building_part_pattern.finditer(roof_section): + part_name = match.group(1).strip() # Building part label + roof_type = match.group(2).strip() # Roof Type + roof_insulation = match.group(3).strip() if match.group(3) else None # Optional Insulation + roof_insulation_thickness = match.group(4).strip() if match.group(4) else None # Optional Thickness + + # Cleaning to handle annoying cases when it comes out like this: + # 'A Another dwelling above\n1st Extension' + if roof_type.startswith("A Another dwelling above"): + roof_type = "A Another dwelling above" + + # Store results for this building part + roof_data.append( + { + "Building Part": part_name, + "Roof Type": roof_type, + "Roof Insulation": roof_insulation, + "Roof Insulation Thickness": roof_insulation_thickness, + } + ) + + return roof_data + + @staticmethod + def extract_wall_details(text): + """ + Extracts wall type, insulation, dry-lining, and thickness for each building part, + including any alternative wall details within the 7.0 Walls section of the summary PDF text. + """ + # Define data structure to hold all building part wall entries + wall_data = [] + + # Locate the entire 7.0 Walls section + wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1) + + # Define pattern to match each building part's wall entry within the section + building_part_pattern = re.compile( + r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label + r"Type\s+(.*?)\n" # Matches main wall Type + r"Insulation\s+(.*?)\n" # Matches main wall Insulation + r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining + r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown + r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness + re.DOTALL + ) + + # Define pattern to capture alternative wall details, if present + alternative_wall_pattern = re.compile( + r"Alternative Wall Area.*?\n" # Matches start of alternative wall section + r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type + r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation + r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining + r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown + r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness + re.DOTALL + ) + + # Find all building part entries within the 7.0 Walls section + for match in building_part_pattern.finditer(wall_section): + wall_label = match.group(1).strip() + main_wall_type = match.group(2).strip() + main_wall_insulation = match.group(3).strip() + main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A" + main_wall_thickness_unknown = match.group(6).strip() + main_wall_thickness = int(match.group(7)) + + # Initialize dictionary for this wall entry + wall_entry = { + "Building Part": wall_label, + "Wall Type": main_wall_type, + "Wall Insulation": main_wall_insulation, + "Wall Dry-lining": main_wall_dry_lining, + "Wall Thickness Unknown": main_wall_thickness_unknown, + "Wall Thickness (mm)": main_wall_thickness, + "Alternative Wall Type": None, + "Alternative Wall Insulation": None, + "Alternative Wall Dry-lining": "N/A", + "Alternative Wall Thickness Unknown": None, + "Alternative Wall Thickness (mm)": None, + } + + # Check if there's an alternative wall section following this wall entry + alt_match = alternative_wall_pattern.search(wall_section, match.end()) + if alt_match: + wall_entry["Alternative Wall Type"] = alt_match.group(1).strip() + wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip() + wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A" + wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip() + wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6)) + + # Append each building part as a dictionary in the wall_data list + wall_data.append(wall_entry) + + return wall_data + + @staticmethod + def extract_conservatory(text): + """ + Extracts conservatory data from the provided text. + The section is located between "5.0 Conservatory" and "7.0 Walls". + + Args: + text (str): The full text of the Summary Report PDF. + + Returns: + dict: A dictionary with conservatory details: + - "Conservatory Present" + - "Conservatory Separated" + - "Conservatory Floor Area" + - "Conservatory Double Glazed" + - "Conservatory Glazed Perimeter" + - "Heated Conservatory Height" + """ + + # Extract the section between "5.0 Conservatory" and "7.0 Walls" + conservatory_match = re.search(r"5\.0 Conservatory:(.*?)7\.0 Walls:", text, re.DOTALL) + if not conservatory_match: + logger.error("Failed to extract conservatory data.") + raise ValueError("Could not extract conservatory data.") + + conservatory_text = conservatory_match.group(1) + + # Check if conservatory is present + present_match = re.search(r"Is there a conservatory\?\s*(Yes|No)", conservatory_text, re.IGNORECASE) + + if not present_match or present_match.group(1).strip().lower() == "no": + return { + "Conservatory Present": "No", + "Conservatory Separated": "", + "Conservatory Floor Area": 0, + "Conservatory Double Glazed": "", + "Conservatory Glazed Perimeter": 0, + "Heated Conservatory Height": "", + } + + # If we get here, raise a temporary exception since we've not seen a case of this, so should make sure + # this is correct + + separated_match = re.search(r"Is it thermally separated\?\s*(Yes|No)", conservatory_text, re.IGNORECASE) + floor_area_match = re.search(r"Floor Area \[m2\]\s*([\d.]+)", conservatory_text, re.IGNORECASE) + double_glazed_match = re.search(r"Double Glazed\s*(Yes|No)", conservatory_text, re.IGNORECASE) + glazed_perimeter_match = re.search(r"Glazed Perimeter \[m\]\s*([\d.]+)", conservatory_text, re.IGNORECASE) + height_match = re.search(r"Room Height\s*(.*?)(?=\n|$)", conservatory_text, re.IGNORECASE) + + return { + "Conservatory Present": "Yes", + "Conservatory Separated": separated_match.group(1).strip() if separated_match else "", + "Conservatory Floor Area": float(floor_area_match.group(1)) if floor_area_match else 0, + "Conservatory Double Glazed": double_glazed_match.group(1).strip() if double_glazed_match else "", + "Conservatory Glazed Perimeter": float(glazed_perimeter_match.group(1)) if glazed_perimeter_match else 0, + "Heated Conservatory Height": height_match.group(1).strip() if height_match else "", + } + + def extract(self): + """ + Extracts specific data from the provided PDF file. + Data includes: + - Current SAP rating + - Fuel Bill + - Address + """ + + data = {} + with (open(self.file_path, "rb") as file): + reader = PyPDF2.PdfReader(file) + text = "" + for page in reader.pages: + text += page.extract_text() + + # Match and extract + name_match = re.search(r"Name:\s*([A-Za-z\s]+)\s*Title:\s*([A-Za-z\.]+)", text) + if not name_match: + raise ValueError("Couldn't extract surveyor name") + data["Assessor Name"] = name_match.group(2).strip() + " " + name_match.group(1).strip() + data["Assessment Date"] = re.search(r"Inspection Date:\s*(.*?)\n", text).group(1).strip() + + # Address and postcode + postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text) + postcode = postcode.group(1).strip() if postcode else "" + + region = re.search(r"Region:\s*(.*?)\nHouse Name:", text) + region = region.group(1).strip() if region else "" + + house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text) + house_name = house_name.group(1).strip() if house_name else "" + + house_no = re.search(r"House No:\s*(.*?)\nStreet:", text) + house_no = house_no.group(1).strip() if house_no else "" + + street = re.search(r"Street:\s*(.*?)\nLocality:", text) + street = street.group(1).strip() if street else "" + + locality = re.search(r"Locality:\s*(.*?)\nTown:", text) + locality = locality.group(1).strip() if locality else "" + + town = re.search(r"Town:\s*(.*?)\nCounty:", text) + town = town.group(1).strip() if town else "" + + county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text) + county = county.group(1).strip() if county else "" + + # Clean extracted values and remove any prefixes + address_parts = [ + house_no, + house_name, + street, + locality, + town, + county, + region, + postcode + ] + + # Join non-empty parts with a comma + data["Address"] = ", ".join([part for part in address_parts if part]) + data["Postcode"] = postcode + data["Region"] = region + data["House Name"] = house_name + data["House No"] = house_no + data["Street"] = street + data["Locality"] = locality + data["Town"] = town + data["County"] = county + + # Extract Current SAP rating + sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text) + if not sap_match: + raise ValueError("Could not extract SAP rating") + data["Current SAP Rating"] = sap_match.group(1).split(" ")[1] + + # We don't have primary energy in the summary report + data['Primary Energy Use Intensity (kWh/m2/yr)'] = None + + # Number of storeys + storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text) + if not storeys_match: + raise ValueError("Could not extract number of storeys") + data["Number of Storeys"] = int(storeys_match.group(1)) + + # Extract Fuel Bill + fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text) + if not fuel_bill_match: + raise ValueError("Could not extract fuel bill") + data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}" + + # Extract Total Number of Doors + total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text) + if not total_doors_match: + raise ValueError("Could not extract total number of doors") + data["Total Number of Doors"] = int(total_doors_match.group(1)) + + # Extract Number of Insulated Doors + insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text) + if not insulated_doors_match: + raise ValueError("Could not extract number of insulated doors") + data["Number of Insulated Doors"] = int(insulated_doors_match.group(1)) + + # lighting + data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1)) + data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1)) + data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"] + + windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL) + if not windows_section: + raise ValueError("Failed to extract window data.") + data["Windows"] = self.extract_window_age_description(windows_section.group(1)) + + data["Primary Heating"] = self.extract_primary_heating(text) + data["Secondary Heating"] = self.extract_secondary_heating_details(text) + data["Building Parts"] = self.extract_building_parts(text) + data["Roof Details"] = self.extract_roof_details(text) + data["Wall Details"] = self.extract_wall_details(text) + data["Conservatory"] = self.extract_conservatory(text) + + water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text) + if not water_heating_code_match: + raise ValueError("Failed to extract water heating code.") + + data["Water Heating Code"] = water_heating_code_match.group(1).strip() + + return data + + +class PulseAirPermeabilityExtractor: + """ + A utility class for extracting specific data from Pulse Air Permeability Test Reports. + """ + + def __init__(self, file_path): + self.file_path = file_path + + @staticmethod + def extract_table(text): + patterns = { + "Air Leakage Rate": r"Air Leakage Rate\s*([\d,@.]+)\s*m/h\s*([\d,@.]+)\s*m3/h", + "Air Permeability": r"Air Permeability\s*([\d,@.]+)\s*=.*?\s*([\d,@.]+)\s*m\?/m\?h", + "Air Changes per Hour": r"Air Changes per Hour\s*([\d,@.]+)\s*([\d,@.]+)", + "Equivalent Leakage Area": r"Equivalent Leakage Area\s*([\d,@.]+)\s*([\d,@.]+)", + "Calculation Uncertainty": r"Calculation Uncertainty\s*([\d,@.]+)\s*([\d,@.]+)", + } + + # Initialize results dictionary + table_data = [] + + # Parse each metric using the corresponding regex + for metric, pattern in patterns.items(): + match = re.search(pattern, text) + if match: + # Extract the two column values + first_value = match.group(1) + second_value = match.group(2) + + # Post-process values: replace '@' with '0' and remove commas + first_value = first_value.replace("@", "0").replace(",", "") + second_value = second_value.replace("@", "0").replace(",", "") + + table_data.append( + { + "Metric": metric, + "Measured @ 4PA": first_value, + "Extrapolated @ 50PA": second_value, + } + ) + else: + raise ValueError(f"Could not extract metric: {metric}") + + return table_data + + def extract(self): + # Extract the pdf using tesseract + logger.info("Extracting data from pdf image - this may take a while...") + pages = convert_from_path(self.file_path, dpi=300) + # Extract all of the pages + text = "" + for page in pages: + text += image_to_string(page) + + # We extract the air permeability reading + results_table = self.extract_table(text) + data = { + "Results Table": results_table + } + + return data + + +class ElmhurstProjectHandoverExtractor: + """ + A utility class for extracting specific data from The Elmhurst Project Handover document + """ + + def __init__(self, file_path): + self.file_path = file_path + + def extract(self): + + with (open(self.file_path, "rb") as file): + reader = PyPDF2.PdfReader(file) + text = "" + for page in reader.pages: + text += page.extract_text() + + data = {} + + # Regex patterns + patterns = { + "Retrofit Coordinator Name": r"Retrofit Coordinator Name:\s*(.+)", + "Retrofit Coordinator ID": r"Retrofit Coordinator ID:\s*(\d+)", + "Measures Fitted": r"Measure\(s\) Fitted:\s*([\s\S]*?)\nRetrofit Assessor Name:", + "Designer Name": r"Designer Name\(s\):\s*(.+)", + "Installer Name": r"Installer Name\(s\):\s*(.+)", + } + + # Extract data + for key, pattern in patterns.items(): + match = re.search(pattern, text) + if not match: + raise ValueError(f"Could not match {key}") + if match: + if key == "Measures Fitted": + # Special handling for multiline measures + measures = re.findall(r"[\u2022\u00b7\u25cf\uf0b7]\s*(.+)", match.group(1)) + measures = [m.strip() for m in measures] + data[key] = measures + else: + data[key] = match.group(1).strip() if match else "" + + return data + + +class CoreLogicPasAssessmentReportExtractor: + """ + A utility class for extracting specific data from CoreLogic PAS Assessment Reports. + """ + + def __init__(self, file_path): + self.file_path = file_path + + def extract(self): + data = {} + + with pdfplumber.open(self.file_path) as pdf: + for page in pdf.pages: + tables = page.extract_tables() + if tables: # If tables are detected on the page + for table in tables: + for row in table: + # Check if the row contains "Number of bedrooms" + if any("Number of bedrooms" in str(cell) for cell in row): + # Extract the corresponding value by filtering out None and non-relevant cells + for cell in row: + if cell and cell.strip().isdigit(): # Check if cell contains a numeric value + data["Number of bedrooms"] = int(cell.strip()) + break # Stop further processing once value is found + + return data diff --git a/utils/fullSapParser.py b/utils/fullSapParser.py new file mode 100644 index 00000000..540eff6f --- /dev/null +++ b/utils/fullSapParser.py @@ -0,0 +1,306 @@ +import boto3 +from xml.dom.minidom import parseString + +PROPERTY_AGE_BAND = { + "A": "before 1900", + "B": "1900-1929", + "C": "1930-1949", + "D": "1950-1966", + "E": "1967-1975", + "F": "1976-1982", + "G": "1983-1990", + "H": "1991-1995", + "I": "1996-2002", + "J": "2003-2006", + "K": "2007-2011", + "L": "2012 onwards" +} + +POSITION_OF_FLAT = { + "TopFloorFlat": "(top floor)" +} + +MAINHEATING_LOOKUP = { + "SEB": "Electric (SEB modern slimline storage heaters)" +} + +WINDOWS_YEAR_LOOKUP = { + "unknown install date": "unknown year", + "unknown install": "unknown year", + "post or during 2002": "2002 onwards", +} + + +class FullSapParser: + full_address = None + archetype = None + age_band = None + unheated_corridor = None + property_type = None + built_form = None + + # ventilation + mechanical_ventilation = None + cross_ventilation = None + night_ventilation = None + + # dimensions + number_of_storeys = None + property_dimensions = None + + # fabric + low_energy_lighting = None + + # Heating + heating1 = None + cylinder = None + cylinder_stat = None + + def __init__(self, filekey, bucket_name=None): + self.s3_client = boto3.client('s3') + self.bucket_name = bucket_name + self.filekey = filekey + self.full_sap = None + + self._read_file() + + def _read_file(self): + """ + Reads the XML file either locally or from S3 and parses it using minidom. + + Raises: + ValueError: If the file cannot be found, read, or parsed. + """ + try: + if self.bucket_name: + # Read from S3 + response = self.s3_client.get_object(Bucket=self.bucket_name, Key=self.filekey) + xml_content = response['Body'].read() + else: + # Read locally + with open(self.filekey, "r") as f: + xml_content = f.read() + + # Parse the XML content using minidom + self.full_sap = parseString(xml_content) + except FileNotFoundError: + raise ValueError(f"Local file not found: {self.filekey}") + except Exception as e: + raise ValueError(f"An error occurred while reading or parsing the XML: {e}") + + def extract(self, _return=True): + self.get_address() + self.get_archetype() + self.get_age_band() + self.get_unheated_corridor() + self.get_heating_1() + self.get_ventilation() + self.get_floor_area() + self.get_low_energy_lighting() + self.get_cylinder() + + if _return: + return { + "Property Type": self.property_type, + "Built Form": self.built_form, + "Age Band": self.age_band, + } + + def get_address(self): + if not self.full_sap: + raise ValueError("You need to read the file first") + + address = self.full_sap.getElementsByTagName("AddressAsDesigned") + if len(address) != 1: + raise ValueError("Non-unique address tag found - investigate me") + + address = address[0] + data = {} + for node in address.childNodes: + if node.nodeType == node.ELEMENT_NODE: + data[node.nodeName] = node.firstChild.nodeValue if node.firstChild else None + + self.full_address = " ".join( + [ + x.title() for x in [data["AddressLine1"], data["AddressLine2"], data["AddressLine3"], data["Town"]] + if x is not None + ] + ) + " " + data["Postcode"] + + def get_archetype(self): + if not self.full_sap: + raise ValueError("You need to read the file first") + + property_type1 = self.full_sap.getElementsByTagName('PropertyType1') + property_type2 = self.full_sap.getElementsByTagName('PropertyType2') + position_of_flat = self.full_sap.getElementsByTagName('PositionOfFlat') + + if len(property_type1) != 1 or len(property_type2) != 1: + raise ValueError("Non-unique property tag found - investigate me") + + property_type1 = property_type1[0].firstChild.nodeValue + property_type2 = property_type2[0].firstChild.nodeValue + if position_of_flat[0].firstChild: + position_of_flat = POSITION_OF_FLAT[position_of_flat[0].firstChild.nodeValue] + else: + position_of_flat = None + + self.property_type = property_type1 + self.built_form = property_type2 + self.archetype = property_type1 + " - " + property_type2 + + if position_of_flat: + self.archetype = self.archetype + " " + position_of_flat + + def get_age_band(self): + if not self.full_sap: + raise ValueError("You need to read the file first") + + property_age_band = self.full_sap.getElementsByTagName('PropertyAgeBand') + + if len(property_age_band) != 1: + raise ValueError("Non-unique property age band tag found - investigate me") + + property_age_band = property_age_band[0].firstChild.nodeValue + self.age_band = PROPERTY_AGE_BAND[property_age_band] + + def get_wall_area_for_description(self, description): + wall_recs = self.full_sap.getElementsByTagName("WallRec") + for wall_rec in wall_recs: + desc_elements = wall_rec.getElementsByTagName("Description") + if desc_elements and desc_elements[0].firstChild.data == description: + area_elements = wall_rec.getElementsByTagName("Area") + if area_elements: + area = float(area_elements[0].firstChild.data) + # Placeholder for wall_description which you'll populate later + return f"Unheated corridor - {area} area" + return None + + def get_unheated_corridor(self): + """ + Unheated corridors don't always exist so we'll need to search for it + :return: + """ + + if not self.full_sap: + raise ValueError("You need to read the file first") + + self.unheated_corridor = self.get_wall_area_for_description("Flat corridor Main") + + def get_heating_1(self): + + if not self.full_sap: + raise ValueError("You need to read the file first") + + main_heating_system = self.full_sap.getElementsByTagName('MainHeatingSystem1') + + if len(main_heating_system) != 1: + raise ValueError("Non-unique main heating system tag found - investigate me") + + main_heating_system = main_heating_system[0] + + mhs = main_heating_system.getElementsByTagName('MHS')[0].firstChild.nodeValue + mhs = MAINHEATING_LOOKUP.get(mhs, mhs) + + fraction = main_heating_system.getElementsByTagName('Fraction')[0].firstChild.nodeValue + + self.heating1 = f"{mhs} : {fraction}% of heating" + + def get_ventilation(self): + + bool_lookup = { + "true": True, + "false": False + } + + # Extract MechanicalVentilationDecentralised + mech_vent = self.full_sap.getElementsByTagName("MechanicalVentilationDecentralised") + if mech_vent and mech_vent[0].childNodes: + mech_vent_value = mech_vent[0].firstChild.nodeValue + else: + mech_vent_value = None + + # Extract CrossVentilation + cross_vent = self.full_sap.getElementsByTagName("CrossVentilation") + if cross_vent and cross_vent[0].childNodes: + cross_vent_value = cross_vent[0].firstChild.nodeValue + cross_vent_value = bool_lookup.get(cross_vent_value, cross_vent_value) + else: + cross_vent_value = None + + # Extract NightVentilation + night_vent = self.full_sap.getElementsByTagName("NightVentilation") + if night_vent and night_vent[0].childNodes: + night_vent_value = night_vent[0].firstChild.nodeValue + night_vent_value = bool_lookup.get(night_vent_value, night_vent_value) + else: + night_vent_value = None + + # Create the outputs + self.mechanical_ventilation = "Mechanical ventilation present" if mech_vent_value else "No mechanical " \ + "ventilation" + self.cross_ventilation = "Cross ventilation present" if cross_vent_value else "No cross ventilation" + self.night_ventilation = "Night ventilation present" if night_vent_value else "No night ventilation" + + def get_floor_area(self): + + self.number_of_storeys = int(self.full_sap.getElementsByTagName('NumberOfStoreys')[0].firstChild.nodeValue) + storeys = self.full_sap.getElementsByTagName('StoreyMeasurementRec') + + # TODO: The first StoreyMeasurementRec tag looks like this in the examples we've seen: + # + # Indicating that the tag is explicitly indicated as empty + + storey_data = [] + storey_index = -1 + for storey in storeys: + storey_index += 1 + + if storey.getAttribute("xsi:nil") == "true": + continue + + if storey_index == -1: + raise NotImplementedError( + "Investigated me - potentially basement found but need to confirm with Basement tag" + ) + + floor_area = storey.getElementsByTagName('InternalFloorArea') + if not floor_area: + continue + + floor_area = float(floor_area[0].firstChild.nodeValue) + # If floor area is 0, skip this storey + if not floor_area: + continue + + perimeter = float(storey.getElementsByTagName('InternalPerimeter')[0].firstChild.nodeValue) + height = float(storey.getElementsByTagName('StoreyHeight')[0].firstChild.nodeValue) + + storey_data.append({ + "storey_index": storey_index, + "Floor Area": floor_area, + "Perimeter": perimeter, + "Height": height + }) + + # We will convert this into a table in the markdown + self.property_dimensions = storey_data + + def get_low_energy_lighting(self): + # Extract the values of the LightFittings and LELFittings tags + light_fittings = self.full_sap.getElementsByTagName('LightFittings')[0].firstChild.data + lel_fittings = self.full_sap.getElementsByTagName('LELFittings')[0].firstChild.data + + # Construct the string message + self.low_energy_lighting = f"{lel_fittings} out of {light_fittings} lighting fittings are low energy." + + def get_cylinder(self): + insulation_type = self.full_sap.getElementsByTagName('InsulationType')[0].firstChild.data + insulation_thickness = self.full_sap.getElementsByTagName('InsulationThickness')[0].firstChild.data + + if insulation_type and insulation_thickness: + self.cylinder = f"Insulated, {insulation_type}: {insulation_thickness}mm." + else: + self.cylinder = "Not insulated." + + self.cylinder_stat = self.full_sap.getElementsByTagName('CylinderStat')[0].firstChild.data