Merge pull request #375 from Hestia-Homes/stonewater-eco-programme

Stonewater eco programme
This commit is contained in:
KhalimCK 2024-12-13 09:35:33 +00:00 committed by GitHub
commit 236d736a75
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 3100 additions and 32 deletions

View file

@ -426,6 +426,18 @@ class Property:
if phase_epc_transformation[k] == v:
continue
if k == "hotwater-description":
if (
v == "From main system"
) and (
phase_epc_transformation["mainheat-description"] == "Electric storage heaters"
) and (
"Electric immersion" in phase_epc_transformation["hotwater-description"]
):
# It means we've recommended HHR with electric immersion, and shouldn't overwrite
# the hot water description
continue
raise NotImplementedError(
"Already have this key in the phase_epc_transformation - implement me"
)

View file

@ -50,4 +50,5 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
},
"Gas instantaneous at point of use": {"fuel": "Natural Gas", "cop": 0.85},
"Room heaters, wood logs": {"fuel": "Wood Logs", "cop": 1},
"Boiler and radiators, coal": {"fuel": "Coal", "cop": 0.85},
}

View file

@ -366,7 +366,7 @@ def extract_property_request_data(
property_non_invasive_recommendations["recommendations"] = str(transformed)
property_valution = next((
float(x["value"]) for x in valuation_data if
float(x["valuation"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
@ -611,6 +611,7 @@ async def trigger_plan(body: PlanTriggerRequest):
property_instance=property_instance,
all_predictions=all_predictions,
recommendations=recommendations,
representative_recommendations=representative_recommendations
)
)

394
etl/access_reporting/app.py Normal file
View file

@ -0,0 +1,394 @@
import os
from msal import ConfidentialClientApplication
from datetime import datetime, timedelta
import requests
from functools import wraps
import time
import logging
from io import BytesIO
import pandas as pd
# Configure logging
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def handle_error(response):
"""
Handle errors based on HTTP status codes and log detailed information.
"""
try:
error_json = response.json().get('error', {})
except ValueError:
error_json = {}
error_code = error_json.get('code', 'unknownError')
error_message = error_json.get('message', 'No detailed error message provided.')
inner_error = error_json.get('innererror', {})
details = error_json.get('details', [])
logger.error(f"Error Code: {error_code}")
logger.error(f"Error Message: {error_message}")
if inner_error:
logger.error(f"Inner Error: {inner_error}")
if details:
logger.error(f"Error Details: {details}")
if response.status_code == 401:
logger.error("Unauthorized. Token might be invalid.")
elif response.status_code == 403:
logger.error("Forbidden. Access denied to the requested resource.")
elif response.status_code == 404:
logger.error("Not Found. The requested resource doesnt exist.")
elif response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.warning(f"Too Many Requests. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
elif response.status_code in (500, 503):
retry_after = int(response.headers.get('Retry-After', 5)) # Default to 5 seconds if not provided
logger.error(f"Server error. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
return 'retry'
else:
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
raise ValueError(f"API request failed with status code {response.status_code} - {error_message}")
def api_call_decorator(func):
"""
Handles various aspects of the API call, including refreshing the access token if needed and handling pagination.
:param func: The function to be decorated.
:return: The wrapped function.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
# Check and refresh the access token if needed
if self.is_access_token_expired():
self.retrieve_access_token()
logger.info("Access token refreshed.")
# Get the HTTP method, URL, and optionally data from the function
http_method, url, data = func(self, *args, **kwargs)
# Initialize the results list and handle pagination if page_size is provided
results = []
page_size = kwargs.get('page_size', None)
response_data = {}
while url:
response = requests.request(http_method, url, headers=self.headers, json=data)
# Handle the response
if response.status_code == 200:
response_json = response.json() # Store the response JSON
if page_size:
results.extend(response_json.get('value', []))
url = response_json.get('@odata.nextLink', None)
else:
response_data = response_json # Capture the full response for consistency
break
else:
retry = handle_error(response)
if retry == 'retry':
continue
if page_size:
response_data = {'value': results}
return response_data
except Exception as e:
logger.exception("An error occurred during the API call.")
raise e
return wrapper
class SharePointClient:
access_token = None
access_token_request_timestamp = None
access_token_expiry = None
headers = None
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, tenant_id, client_id, client_secret, site_id, access_token=None,
access_token_expiration_details=None):
"""
Initializes the SharePointClient with necessary credentials and site information.
:param tenant_id: The tenant ID.
:param client_id: The client ID.
:param client_secret: The client secret.
:param site_id: The site ID.
:param access_token: The access token (optional)
:param access_token_expiration_details: The access token expiration details (optional)
"""
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
if access_token:
if not access_token_expiration_details:
raise ValueError("Access token expiration details must be provided.")
self.access_token = access_token
self.set_access_token_expiration_details(access_token_expiration_details)
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
else:
self.retrieve_access_token()
# Retrieve static identifiers
self.site_id = site_id
self.document_drive = self.get_documents_drive()
def get_token_expiration_details(self):
"""
Returns the access token expiration details. Converts the datetime objects to strings for serialization.
:return:
"""
return {
'access_token_request_timestamp': datetime.strftime(
self.access_token_request_timestamp, self.TIMESTAMP_FORMAT
),
'access_token_expiry': datetime.strftime(self.access_token_expiry, self.TIMESTAMP_FORMAT)
}
def set_access_token_expiration_details(self, access_token_expiration_details):
"""
Sets the access token expiration details from a serialized dictionary.
:param access_token_expiration_details: The serialized access token expiration details.
:return:
"""
self.access_token_request_timestamp = datetime.strptime(
access_token_expiration_details['access_token_request_timestamp'], self.TIMESTAMP_FORMAT
)
self.access_token_expiry = datetime.strptime(
access_token_expiration_details['access_token_expiry'], self.TIMESTAMP_FORMAT
)
def is_access_token_expired(self):
"""
Checks if the access token has expired. If it has, a new access token is retrieved.
:return: True if expired, False otherwise.
"""
return datetime.now() >= self.access_token_expiry
def retrieve_access_token(self, refresh=False):
"""
Implements authentication using MSAL.
:param refresh: If True, force a refresh of the access token.
:return: None
"""
app = ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret
)
scope = ["https://graph.microsoft.com/.default"]
access_token_request_timestamp = datetime.now()
if refresh:
logger.info("Forcing refresh of access token.")
token = app.acquire_token_for_client(scopes=scope)
else:
# Check if a token is already cached
token = app.acquire_token_silent(scope, account=None)
if not token:
token = app.acquire_token_for_client(scopes=scope)
if "access_token" not in token:
logger.error("Authentication failed.")
raise ValueError("Authentication failed")
access_token_expiry = access_token_request_timestamp + timedelta(
seconds=token['expires_in'] - 20
)
self.access_token = token
self.access_token_request_timestamp = access_token_request_timestamp
self.access_token_expiry = access_token_expiry
self.headers = {
'Authorization': f"Bearer {self.access_token['access_token']}"
}
logger.info("Access token retrieved successfully.")
@api_call_decorator
def get_documents_drive(self):
"""
Get the document drive of the SharePoint site.
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive"
logger.info(f"Getting document drive from URL: {url}")
return 'GET', url, None
@api_call_decorator
def list_folder_contents(self, drive_id, folder_path: str, page_size: int = 100):
"""
This function will list the contents of a folder in SharePoint.
:param drive_id: The ID of the drive.
:param folder_path: The path of the folder.
:param page_size: The number of items per page (default is 100).
:return: Tuple containing HTTP method, URL, and None for data.
"""
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{folder_path}:/children?$top={page_size}"
logger.info(f"Listing folder contents from URL: {url}")
return 'GET', url, None
@staticmethod
def download_sharepoint_file(download_url):
"""
Downloads a file from the given URL and returns its content.
:param download_url: The URL to download the file from.
:return: The content of the downloaded file.
"""
response = requests.get(download_url, stream=True)
response.raise_for_status() # Check if the request was successful
file_content = BytesIO()
# Read the file content into memory
for chunk in response.iter_content(chunk_size=8192):
file_content.write(chunk)
file_content.seek(0) # Reset the file pointer to the beginning
return file_content
def app():
# Customers for WC 18/11/2024
#
# ----- Eastlight location -----
# No data this week, low on data
# Housing Associations/Eastlight/Survey Outcomes/
#
# ----- Settle location -----
# No data this week, in separate files
# Housing Associations/Settle/Survey Outcomes/
#
# ----- Community Housing -----
# In separate files - will we get to a singular form?
# Housing Associations/Community Housing/Survey Outcomes/
#
# ----- ACIS location -----
# Doesn't have this week's data
# Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx
#
# ----- Southern location -----
#
#
# ------ Unitas location ------
# Does have this week's data
# Unitas location: Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx
locations = {
"Unitas": "Housing Associations/Unitas/Survey Outcomes/Unitas.xlsx",
"Eastlight": "Housing Associations/Eastlight/Survey Outcomes/",
"Settle": "Housing Associations/Settle/Survey Outcomes/",
"Community Housing": "Housing Associations/Community Housing/Survey Outcomes/",
"ACIS": "Housing Asociation/ACIS/Survey Outcomes/ACIS Group - 25.11.2024 - USE THIS.xlsx",
"Southern": None,
}
SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID", None)
SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET", None)
SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID", None)
WARMFRONT_SHAREPOINT_SITE_ID = os.getenv("WARMFRONT_SHAREPOINT_SITE_ID", None)
sharepoint_client = SharePointClient(
tenant_id=SHAREPOINT_TENANT_ID,
client_id=SHAREPOINT_CLIENT_ID,
client_secret=SHAREPOINT_CLIENT_SECRET,
site_id=WARMFRONT_SHAREPOINT_SITE_ID
)
results = []
for customer, location in locations.items():
if location is None:
continue
if location.endswith(".xlsx"):
# Read in the file
# List the contents of the folder
location_folder = os.path.dirname(location)
contents = sharepoint_client.list_folder_contents(
drive_id=sharepoint_client.document_drive["id"],
folder_path=location_folder
)
filepaths = contents["value"]
download_url = next(
(file['@microsoft.graph.downloadUrl'] for file in filepaths
if '@microsoft.graph.downloadUrl' in file and file['name'] == os.path.basename(location)),
None
)
if download_url is None:
raise ValueError("File not found in the SharePoint folder.")
file_content = sharepoint_client.download_sharepoint_file(download_url)
# Convert to pandas dataframe since file is an excel file
df = pd.read_excel(file_content)
df["Outcome"] = df["Outcome"].str.strip().str.lower()
# We cannot group by funding type accurately because any job that is not funded will have a NaN value
# and therefore we have a 100% acces rate for funded jobs and 0% otherwise
surveyor_outcomes = []
for (week, surveyor, funding), group in df.groupby(["Week Commencing", "DEA/REA"]):
funding_type = [x for x in group["Funding Type"].unique() if not pd.isnull(x)]
if funding_type:
funding_type = " + ".join(funding_type)
else:
funding_type = "No Funding"
surveyed = group[group["Outcome"] == "surveyed"]
no_answer = group[
group["Outcome"] == "no answer"
]
other_issue = group[~group["Outcome"].isin(["surveyed", "no answer"])]
surveyor_outcomes.append(
{
"Surveyor": surveyor,
"Week": week,
"Funding": funding_type,
"Surveyed": surveyed.shape[0],
"No Answer": no_answer.shape[0],
"Other Issue": other_issue.shape[0],
}
)
surveyor_outcomes = pd.DataFrame(surveyor_outcomes)
surveyor_outcomes["Week"] = pd.to_datetime(surveyor_outcomes["Week"])
weekly_access = (
surveyor_outcomes.drop(columns=["Surveyor"]).groupby(["Week", "Funding"]).sum().reset_index()
)
# Sort by week and surveyor ascending
surveyor_outcomes = surveyor_outcomes.sort_values(["Week", "Surveyor"], ascending=[True, True])
surveyor_outcomes["Access Rate"] = 100 * surveyor_outcomes["Surveyed"] / (
surveyor_outcomes["Surveyed"] + surveyor_outcomes["No Answer"] + surveyor_outcomes["Other Issue"]
)
weekly_access["Total"] = (
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
)
weekly_access["Access Rate"] = 100 * weekly_access["Surveyed"] / (
weekly_access["Surveyed"] + weekly_access["No Answer"] + weekly_access["Other Issue"]
)

View file

@ -0,0 +1,11 @@
python-docx==0.8.11
PyPDF2==3.0.1
boto3
requests
pandas
pyarrow==12.0.1
openpyxl==3.1.2
usaddress==0.5.10
pdfplumber==0.10.3
msgpack==1.0.5
msal

View file

@ -0,0 +1,64 @@
import re
import pandas as pd
from PyPDF2 import PdfReader
# Paths to the uploaded files
file_paths = [
"/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged).pdf",
"/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 2.pdf",
"/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 3.pdf",
"/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 4.pdf",
"/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 5.pdf",
"/Users/khalimconn-kowlessar/Downloads/Managed Properties List (dragged) 6.pdf"
]
# Function to extract text from PDFs
def extract_text_from_pdf_with_pypdf2(file_path):
text = ""
reader = PdfReader(file_path)
for page in reader.pages:
text += page.extract_text()
return text
# Initialize a list to hold all parsed data
all_parsed_data = []
# Process each PDF individually
for i, path in enumerate(file_paths):
# Extract text from the PDF
extracted_text = extract_text_from_pdf_with_pypdf2(path)
# Step 1: Remove titles and repeated headers
cleaned_text = re.sub(r"Managed Property Report as at \d+ \w+ \d+", "", extracted_text)
cleaned_text = re.sub(r"Code Property Address Management Type", "", cleaned_text)
# Step 2: Extract rows ending with "Managed"
rows = re.findall(r".*?Managed", cleaned_text)
# Step 3: Parse rows into structured data
parsed_data = []
for row in rows:
match = re.match(r"(\S+)\s+(.+?)\s+Managed", row.strip())
if match:
code = match.group(1).strip()
address = match.group(2).strip()
parsed_data.append((code, address, "Managed"))
# Append parsed data to the global list
all_parsed_data.extend(parsed_data)
# Provide feedback for debugging
print(f"File {i + 1} processed: {len(parsed_data)} rows")
# Step 4: Create a unified DataFrame
final_df = pd.DataFrame(all_parsed_data, columns=["Code", "Property Address", "Management Type"])
# Step 5: Save the unified DataFrame to an Excel file
final_output_file_path = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Unified_Managed_Properties_List.xlsx"
final_df.to_excel(final_output_file_path, index=False)
# Provide feedback
print(f"All files processed and combined. Total rows: {len(final_df)}")
print(f"Unified file saved to: {final_output_file_path}")

View file

@ -0,0 +1,15 @@
import pandas as pd
df = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List.xlsx"
)
# split up the address on commas. First section is address1, last seciton is postcode
df["address1"] = df["Property Address"].apply(lambda x: x.split(",")[0].strip())
df["postcode"] = df["Property Address"].apply(lambda x: x.split(",")[-1].strip())
# Re-save
df.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List.xlsx",
index=False,
)

View file

@ -0,0 +1,124 @@
import os
import time
from tqdm import tqdm
import pandas as pd
from dotenv import load_dotenv
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.SearchEpc import SearchEpc
from utils.s3 import save_csv_to_s3
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
USER_ID = 8
PORTFOLIO_ID = 121
def app():
"""
Prepares the inputs to produce the remote assessments for Cottons
:return:
"""
# Read in the asset list
cottons_asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Cottons/Cottons Asset List EPC Data Pull with "
"valuations.xlsx"
)
# A number are missing EPCs due to the space in the postcode
# Breakdowns:
# C 119
# D 106
# E 26
# B 5
#
# Take the EPC D/E properties
asset_list = cottons_asset_list[
cottons_asset_list["EPC rating on register"].isin(["D", "E"])
]
asset_list = asset_list.reset_index(drop=True)
asset_list["row_id"] = asset_list.index
asset_list["uprn"] = asset_list["uprn"].astype(int)
extracted_data = []
model_asset_list = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
add1 = home["address1"]
pc = home["postcode"]
# Retrieve the EPC data
epc_searcher = SearchEpc(
address1=add1,
postcode=pc, uprn=home["uprn"], auth_token=EPC_AUTH_TOKEN, os_api_key=""
)
epc_searcher.find_property(skip_os=True)
find_epc_searcher = RetrieveFindMyEpc(address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"])
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
time.sleep(0.5)
# We need uprn
extracted_data.append(
{
"uprn": home["uprn"],
**find_epc_data,
}
)
model_asset_list.append(
{
"uprn": home["uprn"],
"address": epc_searcher.newest_epc["address1"],
"postcode": epc_searcher.newest_epc["postcode"],
}
)
non_invasive_recommendations = [
{
"uprn": r["uprn"],
"recommendations": r["recommendations"]
} for r in extracted_data
]
valuations_data = asset_list[["uprn", "Zoopla Valuation"]].copy().rename(columns={"Zoopla Valuation": "valuation"})
valuations_data = valuations_data[~pd.isnull(valuations_data["valuation"])]
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(model_asset_list),
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
# Store the valuations data in s3
valuations_filename = f"{USER_ID}/{PORTFOLIO_ID}/valuations.csv"
save_csv_to_s3(
dataframe=valuations_data,
bucket_name="retrofit-plan-inputs-dev",
file_name=valuations_filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": valuations_filename,
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ['air_source_heat_pump', 'boiler_upgrade', 'floor_insulation']
}
print(body)

View file

@ -0,0 +1,77 @@
import inspect
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from etl.epc.settings import EARLIEST_EPC_DATE
from etl.spatial.OpenUprnClient import OpenUprnClient
src_file_path = inspect.getfile(lambda: None)
EPC_DIRECTORY = Path("/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates")
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
aggregation = []
for directory in tqdm(epc_directories):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
data = data[data["posttown"].str.contains("London", case=False, na=False)]
if data.empty:
continue
# Take just date before the date threshold
data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE]
data = data[~pd.isnull(data["uprn"])]
data["uprn"] = data["uprn"].astype(int)
# Take just the newest EPC per uprn, based on lodgement-date
data = data.sort_values("lodgement-date", ascending=False).drop_duplicates("uprn")
# Take EPC D and below
data = data[data["current-energy-rating"].isin(["D", "E", "F", "G"])]
data["postal_region"] = data["postcode"].str.split(" ").str[0]
# Take homes that don't have a gas boiler
off_gas = data[~data["main-fuel"].str.contains("mains gas", case=False, na=False)]
if off_gas.empty:
continue
# Remote properties with conservation area issues
uprns = off_gas["uprn"].unique()
# Get data
ca_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev")
off_gas = off_gas.merge(
ca_data[["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"]].rename(
columns={"UPRN": "uprn"}
),
how="left",
on="uprn",
)
# Remove any restricted units
off_gas = off_gas[
(off_gas["conservation_status"] != True)
& (off_gas["is_listed_building"] != True)
& (off_gas["is_heritage_building"] != True)
]
off_gas = off_gas[
off_gas["tenure"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"])
]
region_summary = off_gas.groupby("postal_region").size().reset_index(name="count")
aggregation.append(region_summary)
postal_region_aggregation = pd.concat(aggregation)
# Re-aggregate
postal_region_aggregation = postal_region_aggregation.groupby("postal_region")["count"].sum().reset_index()
postal_region_aggregation = postal_region_aggregation.sort_values("count", ascending=False)
postal_region_aggregation = postal_region_aggregation.rename(
columns={"postal_region": "Postcode Region", "count": "Number of Homes"}
)
postal_region_aggregation.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/GLA/Off Gas EPC D-G Postal Regions - without conservation "
"area.xlsx",
index=False
)

View file

@ -305,7 +305,7 @@ def caha():
# Get conservation area data
uprns = [x["uprn"] for x in extracted_data if x["uprn"] not in ["", None]]
conservation_area_data = OpenUprnClient.get_spatial_data([100022526362], "retrofit-data-dev")
conservation_area_data = OpenUprnClient.get_spatial_data([36284], "retrofit-data-dev")
addresses = pd.DataFrame(asset_list)
addresses["uprn"] = addresses["uprn"].astype(str)

View file

@ -6,6 +6,8 @@ import numpy as np
from tqdm import tqdm
from collections import Counter
from scipy.optimize import linprog
from SearchEpc import SearchEpc
from utils.s3 import read_pickle_from_s3
CUSTOMER_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater"
@ -2591,5 +2593,316 @@ def propsed_wave_3_sample():
os.path.join(CUSTOMER_FOLDER_PATH, "Individual units - programme V2.csv"), index=False
)
survey_results = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.19 V2.xlsx"),
header=13,
sheet_name="Modelled Packages"
)
indivual_units = pd.read_csv(
os.path.join(CUSTOMER_FOLDER_PATH, "Individual units - programme V2.csv")
)
u_aids = survey_results["Archetype ID"].astype(str).unique()
units_in_bid = indivual_units[indivual_units['Unit in Programme']]["Archetype ID"].astype(str).values
len({v for v in units_in_bid if str(v) in u_aids})
len(list(set(units_in_bid)))
def identify_incorrect_packages():
"""
Due to limitations in the data collected during survey, we have some properties that do not have suitable packages
assigned. This function will identify those properties, which can be flagged for Stonewater's review
"""
units_with_assigned_packages = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.20 V2.xlsx"),
header=2,
sheet_name="Individual Units Programme"
)
# This sheet contains information on the heating systems for properties, so we can flag any units that have
# been labelled as being electric but are actually gas
heating_survey_data = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "STOCKBOOK December 2024 data (5).xlsx"),
header=0,
sheet_name="Export"
)
units_with_assigned_packages = units_with_assigned_packages.merge(
heating_survey_data[["Asset Reference", "Heating Type"]], how="left",
left_on="Org. ref.", right_on="Asset Reference"
)
# Check the different heating types
units_with_assigned_packages["Gas properties: different to Parity"] = (
(
units_with_assigned_packages["Heating Type"].isin(["Gas", "Communal Gas"])
) & (
units_with_assigned_packages["Heating"].isin(
[
"Heat Pump: Electric Heat "
"pumps: Air source heat pump "
"with flow temperature <= 35°C",
"Electric Storage Systems: Fan "
"storage heaters",
"Electric (direct acting) room "
"heaters: Panel, convector or "
"radiant heaters"
]
)
)
)
units_with_assigned_packages["Electric properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Electric") & (
units_with_assigned_packages["Heating"].isin(
[
"Boiler: A rated Regular Boiler",
"Boiler: F rated Combi",
"No Heating",
"Boiler: A rated CPSU",
"Boiler: G rated Regular Boiler"
]
)
)
)
units_with_assigned_packages["Ground Source properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Ground Source") & (
units_with_assigned_packages["Heating"].isin(
[
"Heat Pump: Electric Heat pumps: Air source heat pump with flow temperature <= 35°C",
"Electric Storage Systems: Fan storage heaters",
"Electric Storage Systems: High heat retention storage heaters"
]
)
)
)
units_with_assigned_packages["LPG properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Lpg") & (
units_with_assigned_packages["Main Fuel"].isin(
[
"Gas: Mains Gas", "Solid Fuel: Wood Logs, Gas: Mains Gas"
]
)
)
)
units_with_assigned_packages["Solid Fuel properties: different to Parity"] = (
(units_with_assigned_packages["Heating Type"] == "Solid Fuel") & (
units_with_assigned_packages["Main Fuel"].isin(
[
"Gas: Mains Gas"
]
)
)
)
# The next check is to identify properties with specific features that are not condusive to specific packages. E.g.
# Solar PV packages for properties that have another dwelling above
# Label properties that have been matched to a package, during coordination, that includes Solar PV and has
# a property with a dwelling above
units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = (
(units_with_assigned_packages["Package Ref"].isin(["3A", "3B", "4", 4])) & (
units_with_assigned_packages["Survey: Main Roof Type"].str.contains("A Another dwelling above")
)
)
# Label properties that have a dwelling above in the Parity data, and weren't surveyed, but have been assigned
# a package that includes solar PV
units_with_assigned_packages["Invalid Roof Type for Solar - coordination to be reviewed"] = (
(units_with_assigned_packages["Package Ref"].isin(["3A", "3B", "4", 4])) & (
units_with_assigned_packages["Survey: Main Roof Type"].str.contains("A Another dwelling above")
)
)
# We now iterate through postcodes and find anomalous properties based on the partiy data and survey data
fields_to_check = [
'Wall Type Category',
# 'Roof Type Category', - not very interesting
'Heating',
'Main Fuel',
'Survey: Main Wall Type',
# 'Survey: Main Roof Type',
'Survey: Primary Heating System'
]
units_with_assigned_packages['Wall Type Category'] = units_with_assigned_packages['Wall Type'].str.replace(
r'\s*\(.*?\)', '', regex=True
)
# Create roof type category by splitting in colon and taking the first part
units_with_assigned_packages['Roof Type Category'] = units_with_assigned_packages['Roof Type'].str.split(':').str[0]
units_with_assigned_packages["Street, Region and Postcode"] = (
units_with_assigned_packages["Street and Region"] + ", " + units_with_assigned_packages["Postcode"]
)
def check_mixed_types(row):
# Count distinct primary types with non-zero values
primary_types_present = set()
for col in field_counts.columns:
if ':' in col:
primary_type = col.split(':')[0]
if row[col] > 0: # Non-zero count means this type is present
primary_types_present.add(primary_type)
return len(primary_types_present) > 1 # True if more than one primary type
aggregated_results = {}
for field in fields_to_check:
# Group by postcode and count occurrences of each unique value
field_counts = (
units_with_assigned_packages.groupby(['Street, Region and Postcode', field])
.size()
.unstack(fill_value=0)
.reset_index()
)
# Calculate dominant value and percentage before modifying the DataFrame
dominant_value = field_counts.iloc[:, 1:].idxmax(axis=1)
dominant_percentage = (
(field_counts.iloc[:, 1:].max(axis=1) / field_counts.iloc[:, 1:].sum(axis=1)) * 100
)
number_of_properties = field_counts.iloc[:, 1:].sum(axis=1)
# Add these as new columns after computation
field_counts['Dominant Value'] = dominant_value
field_counts['% Dominant'] = dominant_percentage
field_counts['Number of Properties'] = number_of_properties
field_counts['Mixed Type'] = field_counts.apply(check_mixed_types, axis=1)
# Store the result in the dictionary
aggregated_results[field] = field_counts
# Let's fetch the EPC data
# Read in the existing EPC data we stored
import json
from utils.s3 import read_from_s3, read_pickle_from_s3
def read_epc_data():
epc_data = json.loads(
read_from_s3(
bucket_name="retrofit-data-dev",
s3_file_name="customers/Stonewater/clustering/epc_data.json"
)
)
epc_data = pd.DataFrame(epc_data)
epc_data["uprn"] = np.where(
epc_data["internal_id"] == 1091,
83143766,
epc_data["uprn"]
)
epc_data_batch_2 = read_pickle_from_s3(
s3_file_name="customers/Stonewater/clustering/epc_data_batch_2.pkl",
bucket_name="retrofit-data-dev"
)
epc_data_batch_2 = pd.DataFrame(epc_data_batch_2)
complete_epcs = pd.concat([epc_data, epc_data_batch_2])
return complete_epcs
epc_data = read_epc_data()
# Get just the fields we want from the EPC: Uprn, Wall, Roof, Heating, Fuel, SAP Score, EPC Band, Date of EPC
epc_data_to_append = epc_data[
[
"uprn", "walls-description", "roof-description", "mainheat-description", "main-fuel",
"current-energy-efficiency", "current-energy-rating", "lodgement-date",
"estimated"
]
].rename(
columns={
"uprn": "UPRN",
"walls-description": "EPC: Wall Type",
"roof-description": "EPC: Roof Type",
"mainheat-description": "EPC: Heating",
"mainfuel": "EPC: Main Fuel",
"current-energy-efficiency": "EPC: SAP Score",
"current-energy-rating": "EPC: EPC Band",
"lodgement-date": "EPC: Date of EPC",
"estimated": "EPC Estimated based on Nearby Properties"
}
)
# Find entries where the SAP score is not an integer
non_integer_sap = epc_data_to_append[~epc_data_to_append["EPC: SAP Score"].astype(str).str.isnumeric()]
non_integer_sap["UPRN"].values[0]
epc_data_to_append["EPC: Date of EPC"] = pd.to_datetime(epc_data_to_append["EPC: Date of EPC"])
# Years since the EPC was lodged
epc_data_to_append["Years since EPC"] = (pd.Timestamp.now() - epc_data_to_append["EPC: Date of EPC"]).dt.days / 365
epc_data_to_append = epc_data_to_append[epc_data_to_append["UPRN"] != ""]
epc_data_to_append["UPRN"] = epc_data_to_append["UPRN"].astype(int)
units_with_assigned_packages = units_with_assigned_packages.merge(
epc_data_to_append, how="left", on="UPRN",
)
# Read in the wave 2.1 data
wave_2_data = pd.read_excel(
os.path.join(
CUSTOMER_FOLDER_PATH, "Stonewater 2.1 SAP Pre & Post.xlsx"
),
header=3
)
# Remove any where the work is outstanding
wave_2_data = wave_2_data[wave_2_data["Retrofit Assessment"] == "Completed"]
wave_2_data = wave_2_data[~pd.isnull(wave_2_data["Package Approved (Client)"])]
wave_2_data["house_number"] = wave_2_data["Name"].apply(lambda x: SearchEpc.get_house_number(x, ""))
# Filter postcodes in the units_with_assigned_packages, to find overlapping postcodes
related_to_wave_2 = units_with_assigned_packages[
units_with_assigned_packages["Postcode"].isin(
wave_2_data["Post Code"].values
) & (
~units_with_assigned_packages["Confidence Tier"].isin(
[
"1 - same archetype, same postal region", "1 - property was surveyed"
]
)
)
]
wave2_matches = []
for _, home in related_to_wave_2.iterrows():
# Get the related homes
assigned_wave_2_packages = wave_2_data[
wave_2_data["Post Code"] == home["Postcode"]
]
if assigned_wave_2_packages.shape[0] != 1:
# In this case, we get the closest match based on door number
hn = SearchEpc.get_house_number(home["Name"], home["Postcode"])
assigned_wave_2_packages = assigned_wave_2_packages[
abs(assigned_wave_2_packages["house_number"].astype(int) - int(hn)) == min(
abs(assigned_wave_2_packages["house_number"].astype(int) - int(hn)))
]
wave2_matches.append(
{
"UPRN": home["UPRN"],
"2.1 matched address": assigned_wave_2_packages["Name"].values[0],
"2.1 matched address: Package Ref": assigned_wave_2_packages["Package Approved (Client)"].values[0],
"2.1 matched address: Wall Insulation": assigned_wave_2_packages["Wall Insulation"].values[0],
"2.1 matched address: Loft Insulation": assigned_wave_2_packages["Loft Insulation"].values[0],
"2.1 matched address: Ventilation": assigned_wave_2_packages["Ventilation"].values[0],
"2.1 matched address: Windows": assigned_wave_2_packages["Windwos Upgrade"].values[0]
}
)
# Store each results to CSV
for field, df in aggregated_results.items():
df.to_csv(
os.path.join(CUSTOMER_FOLDER_PATH, f"{field} - aggregated results.csv"), index=False
)
# Store units_with_assigned_packages
units_with_assigned_packages.to_csv(
os.path.join(CUSTOMER_FOLDER_PATH, "Units with assigned packages - with flags.csv"), index=False
)
# if __name__ == "__main__":
# main()

View file

@ -375,3 +375,41 @@ def app():
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater Properties Needing CWI - WIP.csv",
index=False
)
def cross_reference_epc_programme():
eco3_fallout = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/STONEWATER LIST OF ADDRESSES TO BE "
"SURVEYED - ECO3 NOT COMPLETED.xlsx"
)
eco3_fallout["house_number"] = eco3_fallout.apply(
lambda x: SearchEpc.get_house_number(x["ADDRESS"], ""), axis=1
)
# for _, x in eco3_fallout.ite
stonewater_modelled_above_c = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Osmosis Reviewed - Parity Download 18.7 - "
"master sheet.csv",
encoding='latin1'
)
stonewater_modelled_above_c["house_number"] = stonewater_modelled_above_c.apply(
lambda x: SearchEpc.get_house_number(x["Address"], x["Postcode"]), axis=1
)
eco3_fallout_matched_to_above_c = []
for _, property in eco3_fallout.iterrows():
# Match on house number
match = stonewater_modelled_above_c[
stonewater_modelled_above_c["house_number"] == property["house_number"]
]
# We do a fuzzy match on the address, with levenstein distance
from fuzzywuzzy import fuzz
match = stonewater_modelled_above_c[
stonewater_modelled_above_c["Address"].apply(lambda x: fuzz.ratio(x, property["ADDRESS"]) > 90)
]
match.head()

View file

@ -0,0 +1,77 @@
"""
This is the list of properties, based on the EPC data, that look eligible for WHLG
"""
import pandas as pd
from etl.epc.settings import EARLIEST_EPC_DATE
from etl.spatial.OpenUprnClient import OpenUprnClient
epc_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates/domestic-E09000031-Waltham-Forest/certificates.csv"
)
epc_data.columns = [c.replace("_", "-").lower() for c in epc_data.columns]
epc_data = epc_data[epc_data["lodgement-date"] >= EARLIEST_EPC_DATE]
epc_data = epc_data[~pd.isnull(epc_data["uprn"])]
epc_data["uprn"] = epc_data["uprn"].astype(int)
epc_data = epc_data[epc_data["current-energy-rating"].isin(["D", "E", "F", "G"])]
epc_data = epc_data[epc_data["tenure"].isin(
["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"])
]
whlg_eligible_postcodes = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/WHLG-eligible-postcodes.xlsx",
sheet_name="Eligible postcodes",
header=1
)
# Format:
whlg_eligible_postcodes = whlg_eligible_postcodes[['Postcode', 'Local Authority']]
uprns = epc_data["uprn"].unique()
# Get data
ca_data = OpenUprnClient.get_spatial_data(uprns, "retrofit-data-dev")
epc_data = epc_data.merge(
ca_data[["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"]].rename(
columns={"UPRN": "uprn"}
),
how="left",
on="uprn",
)
epc_data["has_conservation_restrictions"] = (
(epc_data["conservation_status"] == True)
| (epc_data["is_listed_building"] == True)
| (epc_data["is_heritage_building"] == True)
)
# Pathway 1:
# Match based on eligible postcodes
pathway1 = epc_data[epc_data["postcode"].isin(whlg_eligible_postcodes["Postcode"].values)]
pathway1 = pathway1[
[
"uprn", "address", "address1", "postcode", "current-energy-rating", "current-energy-efficiency",
"lodgement-date",
"has_conservation_restrictions", "walls-description", "roof-description", "mainheat-description"
]
]
pathway1 = pathway1.rename(
columns={
"current-energy-rating": "EPC Rating", "current-energy-efficiency": "SAP Score",
"lodgement-date": "EPC Date", "has_conservation_restrictions": "Conservation Area Restrictions",
"walls-description": "Wall Type", "roof-description": "Roof Type", "mainheat-description": "Main Heating"
}
)
pathway1["EPC Date"] = pd.to_datetime(pathway1["EPC Date"]).dt.strftime("%Y-%m-%d")
# Create a year EPC was lodged
pathway1["EPC Year"] = pd.to_datetime(pathway1["EPC Date"]).dt.year
pathway1.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Waltham Forest WHLG - Pathway 1 Eligibility.csv",
index=False
)
# Pathway 2 or 3
# The household will need to be means tested
pathway2 = epc_data[~epc_data["uprn"].isin(pathway1["uprn"].values)]

View file

@ -282,7 +282,8 @@ class RetrieveFindMyEpc:
"Low energy lighting for all fixed outlets": ["low_energy_lighting"],
"Cylinder thermostat recommendation": [],
"Heating controls recommendation": [],
"Replace boiler with Band A condensing boiler": [],
"Replace boiler with Band A condensing boiler": ["boiler_upgrade"],
"Band A condensing gas boiler": ["boiler_upgrade"],
"Solar panel recommendation": [],
"Double glazing recommendation": [],
"Solid wall insulation recommendation": [],
@ -295,6 +296,19 @@ class RetrieveFindMyEpc:
"Change room heaters to condensing boiler": ["boiler_upgrade"],
"Cylinder thermostat": ["cylinder_thermostat"],
"Heat recovery system for mixer showers": ["heat_recovery_shower"],
"Room-in-roof insulation": ["room_in_roof_insulation"],
"Fan assisted storage heaters": [],
"Fan-assisted storage heaters": [],
"Step 1:": [],
"Biomass stove with boiler": [],
"Replace boiler with biomass boiler": [],
"Heating controls (room thermostat and thermostatic radiator valves)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Heating controls (programmer, and thermostatic radiator valves)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Replacement warm air unit": []
}
survey = True

326
etl/lodgement/app.py Normal file
View file

@ -0,0 +1,326 @@
import os
import pandas as pd
import utils.file_data_extraction as file_extraction_tools
from utils.fullSapParser import FullSapParser
from utils.OsmosisCondtionReportParser import OsmosisConditionReportParser
output_template = {
"Property Address": None,
"Osm. ID": None,
"Postcode": None,
"City/County": None,
"District/Town": None,
"Funding Stream": None,
# "Risk Path": None,
"Local Authority": None,
"Trustmark Lodgement ID": None,
"Certificate Number": None,
"EWI UMR": None,
"Loft UMR": None,
"Windows UMR": None,
"Doors UMR": None,
"Measure Lodgement Date": None,
"Full Lodgement Date": None,
"Owner - Name": None,
"Owner - Phone": None,
"Owner - Email": None,
"Tenant - Name": None,
"Tenant - Phone": None,
"R. Assessor - Name": None,
"R. Coordinator - Name": None,
"Trustmark Licence Number": None,
"Retrofit Assessment Date": None,
"Company Name": None,
"Retrofit Designer Name": None,
"Property Type": None,
"Property Detachment": None,
"No. of Bedrooms": None,
"Property age": None,
"SAP Rating Pre (from IMA)": None,
"Pre Heat Transfer": None,
"Pre Total Floor Area": None,
"Pre Heat Demand": None,
"Pre Air Tightness": None,
"SAP Rating Post (from EPC)": None,
"Post Heat Transfer": None,
"Post Total Floor Area": None,
"Post Heat Demand": None,
"Post Air Tightness": None,
"Number of Eligible Measures Installed": None,
"Total Cost of Works": None,
"Annual Fuel Saving (MTP)": None,
}
def update_dictionary_with_check(dictionary, updates):
"""
Updates a dictionary with key-value pairs, raising an error if the key does not exist.
Args:
dictionary (dict): The dictionary to update.
updates (dict): The updates to apply.
Raises:
KeyError: If a key in updates does not exist in the dictionary.
"""
for key, value in updates.items():
if key not in dictionary:
raise KeyError(f"Key '{key}' does not exist in the dictionary.")
dictionary[key] = value
def handler():
"""
This is a simple application that will extract the data from documents that have been uploaded to Sharepoint
to populate the lodgement spreadsheet with
:return:
"""
# Ths source data will eventually come from Sharepoint
source_data_path = "/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot"
output_template_file = "Trustmark Details - Template REV.25.11.24.xlsx"
funding_stream = "HUG2"
customer_name = "Shropshire Council"
customer_phone = "0345 678 9000"
customer_email = "affordablewarmth@shropshire.gov.uk"
# TODO: In order for this to go live, we need to use Poppler, which needs to be installed
# w/ brew install poppler
# We also need to install Tesseract: brew install tesseract
# List the folders in the source data path
folders = [x for x in os.listdir(source_data_path) if os.path.isdir(os.path.join(source_data_path, x))]
extractors = {
"elmhurst epr": file_extraction_tools.ElmhurstEprExtractor,
"elmhurst summary report": file_extraction_tools.ElmhurstSummaryReportExtractor,
"osmosis condition report": OsmosisConditionReportParser,
"elmhurst evidence report": None,
"full sap xml": FullSapParser,
"pulse air permeability": file_extraction_tools.PulseAirPermeabilityExtractor,
"elmhurst project handover": file_extraction_tools.ElmhurstProjectHandoverExtractor,
"core logic pas assessment report": file_extraction_tools.CoreLogicPasAssessmentReportExtractor,
}
extracted = []
for property_folder in folders:
property_folder_path = os.path.join(source_data_path, property_folder)
# List the folders in the source data path
subfolders = [
x for x in os.listdir(property_folder_path) if os.path.isdir(os.path.join(property_folder_path, x))
]
coord_folder = os.path.join(property_folder_path, [f for f in subfolders if "RA Coordinator Info" in f][0])
# Get the contents of the folder
coordinator_folder_contents = [
file for file in os.listdir(coord_folder) if os.path.isfile(os.path.join(coord_folder, file))
]
# We detect the various file types
extracted_contents = {}
for filename in coordinator_folder_contents:
filepath = os.path.join(coord_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors[report_type]
if file_extractor is None:
continue
extracted_contents[report_type] = file_extractor(filepath).extract()
if file_extraction_tools.is_xml(filepath):
xml_type = file_extraction_tools.detect_xml_report_type(xml_path=filepath)
if xml_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors.get(xml_type)
if file_extractor is None:
continue
extracted_contents[xml_type] = file_extractor(filepath).extract()
att_folder = os.path.join(property_folder_path, [f for f in subfolders if "Air Tightness Tests" in f][0])
att_folder_contents = [
file for file in os.listdir(att_folder) if os.path.isfile(os.path.join(att_folder, file))
]
for filename in att_folder_contents:
filepath = os.path.join(att_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type is None:
raise ValueError(f"Unknown report type for {filename}")
file_extractor = extractors[report_type]
if file_extractor is None:
continue
extracted_contents[report_type] = file_extractor(filepath).extract()
lodgement_folder = os.path.join(
property_folder_path, [f for f in subfolders if "TrustMark Lodgement" in f][0]
)
# Within the lodgement folder, we want the required documents sub-folder
lodgement_subfolders = [
file for file in os.listdir(lodgement_folder) if os.path.isdir(os.path.join(lodgement_folder, file))
]
required_documents_folder = os.path.join(
lodgement_folder, [f for f in lodgement_subfolders if "required documents" in f.lower()][0]
)
# List the contents
required_documents_contents = [
file for file in os.listdir(required_documents_folder) if
os.path.isfile(os.path.join(required_documents_folder, file))
]
# There are only a few file types we actually want to process in here for the moment
for filename in required_documents_contents:
filepath = os.path.join(required_documents_folder, filename)
if file_extraction_tools.is_pdf(filepath):
report_type = file_extraction_tools.detect_pdf_report_type(pdf_path=filepath)
if report_type != "elmhurst project handover":
continue
file_extractor = extractors[report_type]
extracted_contents[report_type] = file_extractor(filepath).extract()
output_row_data = output_template.copy()
# dict_keys([ 'City/County', 'District/Town',
# 'Local Authority', 'Trustmark Lodgement ID', 'Certificate Number', 'EWI UMR', 'Loft UMR', 'Windows UMR',
# 'Doors UMR', 'Measure Lodgement Date', 'Full Lodgement Date', 'Owner - Name', 'Owner - Phone',
# 'Owner - Email', 'Tenant - Name', 'Tenant - Phone',
# 'Trustmark Licence Number',
# Pre Air Tightness', 'SAP Rating Post (from EPC)', 'Post Heat
# Transfer', 'Post Total Floor Area', 'Post Heat Demand', 'Post Air Tightness',
# 'Total Cost of Works', 'Annual Fuel Saving (MTP)'])
update_dictionary_with_check(
output_row_data,
{
"Funding Stream": funding_stream,
"Property Address": property_folder.split(")")[1].strip(),
"Osm. ID": property_folder.split(")")[0].strip().lstrip("(").strip(),
}
)
if extracted_contents.get("elmhurst epr"):
total_floor_area = sum(
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst epr"]["Building Parts"]] +
# Get the conservatory floor area
[extracted_contents["elmhurst epr"]["Conservatory"]["Conservatory Floor Area"]]
)
pre_heat_transfer = extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"]
pre_heat_demand = (
extracted_contents["elmhurst epr"]["Primary Energy Use Intensity (kWh/m2/yr)"] * total_floor_area
)
epr_to_insert = {
"Postcode": extracted_contents["elmhurst epr"]["Postcode"],
"City/County": extracted_contents["elmhurst epr"]["County"],
"District/Town": extracted_contents["elmhurst epr"]["Town"],
"Local Authority": None,
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst epr"]["Current SAP Rating"],
'Pre Heat Transfer': pre_heat_transfer,
'Pre Total Floor Area': total_floor_area,
'Pre Heat Demand': pre_heat_demand,
"R. Assessor - Name": extracted_contents["elmhurst epr"]["Assessor Name"],
"Retrofit Assessment Date": extracted_contents["elmhurst epr"]["Assessment Date"],
}
update_dictionary_with_check(
output_row_data,
epr_to_insert
)
if extracted_contents.get("full sap xml"):
xml_to_insert = {
"Property Type": extracted_contents["full sap xml"]["Property Type"],
"Property Detachment": extracted_contents["full sap xml"]["Built Form"],
"Property age": extracted_contents["full sap xml"]["Age Band"],
}
update_dictionary_with_check(
output_row_data,
xml_to_insert
)
if extracted_contents.get("osmosis condition report"):
cr_to_insert = {
"No. of Bedrooms": extracted_contents["osmosis condition report"]["No. of Bedrooms"],
# "Risk Path": extracted_contents["osmosis condition report"]["Risk Assessment Pathway"],
}
update_dictionary_with_check(
output_row_data,
cr_to_insert
)
if extracted_contents.get("elmhurst summary report"):
total_floor_area = sum(
[x["Floor Area (m2)"] for x in extracted_contents["elmhurst summary report"]["Building Parts"]] +
# Get the conservatory floor area
[extracted_contents["elmhurst summary report"]["Conservatory"]["Conservatory Floor Area"]]
)
pre_heat_transfer = (
extracted_contents["elmhurst summary report"]["Primary Energy Use Intensity (kWh/m2/yr)"]
)
pre_heat_demand = None # Don't have this
summary_to_insert = {
"Postcode": extracted_contents["elmhurst summary report"]["Postcode"],
"City/County": extracted_contents["elmhurst summary report"]["County"],
"District/Town": extracted_contents["elmhurst summary report"]["Town"],
'SAP Rating Pre (from IMA)': extracted_contents["elmhurst summary report"]["Current SAP Rating"],
'Pre Heat Transfer': pre_heat_transfer,
'Pre Total Floor Area': total_floor_area,
'Pre Heat Demand': pre_heat_demand,
"R. Assessor - Name": extracted_contents["elmhurst summary report"]["Assessor Name"],
"Retrofit Assessment Date": extracted_contents["elmhurst summary report"]["Assessment Date"],
}
update_dictionary_with_check(
output_row_data,
summary_to_insert
)
if extracted_contents.get("pulse air permeability"):
# We extract the AP50 number
results_table = extracted_contents["pulse air permeability"]["Results Table"]
ap50 = [x["Extrapolated @ 50PA"] for x in results_table if x["Metric"] == "Air Permeability"][0]
update_dictionary_with_check(
output_row_data,
{"Pre Air Tightness": ap50}
)
if extracted_contents.get("elmhurst project handover"):
handover_to_insert = {
"Number of Eligible Measures Installed": len(
extracted_contents["elmhurst project handover"]["Measures Fitted"]
),
"Retrofit Designer Name": extracted_contents["elmhurst project handover"]["Designer Name"],
"Company Name": extracted_contents["elmhurst project handover"]["Installer Name"],
"R. Coordinator - Name": extracted_contents["elmhurst project handover"]["Retrofit Coordinator Name"],
}
update_dictionary_with_check(output_row_data, handover_to_insert)
if extracted_contents.get("core logic pas assessment report"):
cr_to_insert = {
"No. of Bedrooms": extracted_contents["core logic pas assessment report"]["Number of bedrooms"],
}
update_dictionary_with_check(
output_row_data,
cr_to_insert
)
extracted.append(output_row_data)
extracted_df = pd.DataFrame(extracted)
extracted_df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Lodgment Pilot/poc-extrcted-data.csv",
index=False)

View file

@ -0,0 +1,14 @@
PyPDF2
pandas
tqdm
openpyxl
boto3
usaddress==0.5.11
fuzzywuzzy==0.18.0
python-dotenv
python-docx
pymupdf
pytesseract
pdf2image
pillow
pdfplumber

View file

@ -21,31 +21,65 @@ load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(asset_list, fulladdress_column, address1_column, postcode_column):
def get_data(asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map):
epc_data = []
errors = []
no_epc = []
# home = asset_list[asset_list["row_id"] == errors[5]].squeeze()
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
house_number = home[address1_column]
full_address = home[fulladdress_column]
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
searcher = SearchEpc(
address1=str(house_number),
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")[1].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home["row_id"])
continue
@ -63,7 +97,7 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column):
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
@ -120,17 +154,20 @@ def app():
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Bromford/"
DATA_FILENAME = "Bromford programme review.xlsx"
SHEET_NAME = "Bromford"
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Watford"
DATA_FILENAME = "JS Mailing List 10122024.xlsx"
SHEET_NAME = "Export"
POSTCODE_COLUMN = "Postcode"
FULLADDRESS_COLUMN = None
ADDRESS1_COLUMN = "No."
ADDRESS1_METHOD = "first_two_words"
ADDRESS_COLS_TO_CONCAT = ["No.", "Address"]
FULLADDRESS_COLUMN = "Property Address"
ADDRESS1_COLUMN = "Address Line 1"
ADDRESS1_METHOD = None
ADDRESS_COLS_TO_CONCAT = []
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {}
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list["Postcode"])]
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
asset_list["row_id"] = asset_list.index
# We clean up portential non-breaking spaces, and double spaces
@ -156,12 +193,14 @@ def app():
# Drop the dupes
print(f"There are {asset_list['deduper'].duplicated().sum()} duplicated addresses - dropping")
asset_list = asset_list[~asset_list["deduper"].duplicated()]
asset_list = asset_list.drop(columns=["deduper"])
epc_data, errors, no_epc = get_data(
asset_list=asset_list,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# We now retrieve any failed properties
@ -170,7 +209,8 @@ def app():
asset_list=asset_list_failed,
fulladdress_column=FULLADDRESS_COLUMN,
address1_column=ADDRESS1_COLUMN,
postcode_column=POSTCODE_COLUMN
postcode_column=POSTCODE_COLUMN,
manual_uprn_map=MANUAL_UPRN_MAP
)
# Append the failed data to the main data
@ -202,7 +242,8 @@ def app():
transformed_df = pd.DataFrame(transformed_data)
# Drop the column that is ""
transformed_df = transformed_df.drop(columns=[""])
if "" in transformed_df.columns:
transformed_df = transformed_df.drop(columns=[""])
# Get the find my epc data
find_my_epc_data = epc_df[["row_id", "find_my_epc_data"]].drop(columns=["find_my_epc_data"]).join(
@ -217,6 +258,9 @@ def app():
[
"row_id",
"uprn",
"address1",
"address",
"postcode",
"property-type",
"built-form",
"inspection-date",
@ -224,6 +268,7 @@ def app():
"current-energy-efficiency",
"roof-description",
"walls-description",
"floor-description",
"transaction-type",
# New fields needed
"secondheat-description",
@ -236,7 +281,7 @@ def app():
"energy-consumption-current", # kwh/m2
"photo-supply",
]
]
].rename(columns={"address1": "Address1 on EPC", "address": "Address on EPC", "postcode": "Postcode on EPC"})
asset_list = asset_list.merge(
epc_df,
@ -276,6 +321,7 @@ def app():
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"floor-description": "Floor Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC",
@ -329,5 +375,9 @@ def app():
asset_list = asset_list.drop(columns=["row_id"])
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"
asset_list.to_excel(filename, index=False)
matches_review = asset_list[
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
]

View file

@ -26,6 +26,9 @@ class DraughtProofingRecommendations:
if not draught_proofing_recommendation_config:
return
# Cost is based on a £50 cost per window, based on Checkatrade
cost = draught_proofing_recommendation_config.get("cost", self.property.number_of_windows * 50)
description = (
"Draught proof doors and windows to improve energy efficiency" if
not draught_proofing_recommendation_config.get("description")
@ -48,7 +51,7 @@ class DraughtProofingRecommendations:
"kwh_savings": 0,
"co2_equivalent_savings": 0,
"energy_cost_savings": 0,
"total": draught_proofing_recommendation_config["cost"],
"total": cost,
# We use a very simple and rough estimate of 4 hours per unit
"labour_hours": draught_proofing_recommendation_config.get("labour_hours", 8),
"labour_days": draught_proofing_recommendation_config.get("labour_days", 1), # Assume 8 hour day

View file

@ -1,5 +1,6 @@
import re
import backend.app.assumptions as assumptions
from etl.customers.immo.pilot.asset_list import non_invasive_recommendations
from recommendations.Costs import Costs, BOILER_UPGRADE_SCHEME_ASHP_VALUE
from recommendations.recommendation_utils import (
check_simulation_difference, override_costs, combine_recommendation_configs
@ -981,6 +982,10 @@ class HeatingRecommender:
self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor", "Average"]
)
non_invasive_recommendation = next((
r for r in self.property.non_invasive_recommendations if r["type"] == "boiler_upgrade"
), {})
if has_inefficient_space_heating or has_inefficient_water:
boiler_size = self.estimate_boiler_size(
property_type=self.property.data["property-type"],
@ -1079,12 +1084,13 @@ class HeatingRecommender:
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"sap_points": non_invasive_recommendation.get("sap_points", None),
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": description_simulation,
**boiler_costs,
"system_type": "boiler_upgrade",
"survey": non_invasive_recommendation.get("survey", None)
}
# We recommend the heating controls
@ -1111,6 +1117,8 @@ class HeatingRecommender:
if system_change:
# We combine the heating and controls recommendations, in the case of a system change
# If this is true, we set SAP points to None and survey to False for the boiler recommendation
combined_recommendations = []
for controls_recommendation in controls_recommender.recommendation:
combined_recommendation = self.combine_heating_and_controls(

View file

@ -20,6 +20,8 @@ class HotwaterRecommendations:
:return:
"""
# Reset the recommendations
recommendations_phase = phase
self.recommendations = []
non_invasive_recommendations = self.property.non_invasive_recommendations
if non_invasive_recommendations:
@ -28,7 +30,6 @@ class HotwaterRecommendations:
r["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]
]
recommendations_phase = phase
for m in measures:
non_invasive_rec = [
r for r in non_invasive_recommendations if r["type"] == m
@ -55,7 +56,7 @@ class HotwaterRecommendations:
if self.property.hotwater["clean_description"] == "Gas boiler/circulator, no cylinder thermostat":
# Handle this case specifically:
self.recommend_cylinder_thermostat_gas_boiler_circulator(phase=phase)
self.recommend_cylinder_thermostat_gas_boiler_circulator(phase=recommendations_phase)
return
# If there is no system present, but access to the mains, we
@ -68,14 +69,14 @@ class HotwaterRecommendations:
(self.property.hotwater["no_system_present"] is None) &
(len(has_tank_recommendation) == 0)
):
self.recommend_tank_insulation(phase=phase)
self.recommend_tank_insulation(phase=recommendations_phase)
return
has_cylinder_recommendation = [r for r in self.recommendations if r["type"] == "cylinder_thermostat"]
if ((self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat") &
(len(has_cylinder_recommendation) == 0)):
self.recommend_cylinder_thermostat(phase=phase)
self.recommend_cylinder_thermostat(phase=recommendations_phase)
return
def recommend_tank_insulation(self, phase, sap_points=None, survey=False, _return=False):

View file

@ -311,7 +311,7 @@ class Recommendations:
continue
has_u_value = recommendations_by_type[0].get("new_u_value") is not None
has_sap_points = recommendations_by_type[0].get("sap_points") is not None
has_sap_points = all([r.get("sap_points") is not None for r in recommendations_by_type])
has_rank = recommendations_by_type[0].get("rank") is not None
# When check if these recommendations have two different types, such as solid wall insulation
@ -449,6 +449,7 @@ class Recommendations:
property_instance,
all_predictions,
recommendations,
representative_recommendations,
):
"""
@ -473,6 +474,9 @@ class Recommendations:
property_recommendations = recommendations[property_instance.id].copy()
representative_recs = representative_recommendations[property_instance.id].copy()
representative_ids = [r["recommendation_id"] for r in representative_recs]
increasing_variables = ["sap"]
decreasing_variables = ["carbon", "heat_demand"]
@ -530,7 +534,9 @@ class Recommendations:
else:
previous_phase_values_multiple = [x for x in impact_summary if x["phase"] == (rec["phase"] - 1)]
previous_phase_values_multiple = [
x for x in impact_summary if x["phase"] == (rec["phase"] - 1) and x["representative"]
]
if len(previous_phase_values_multiple) != 1:
# Take an average of each of the previous phases
keys_to_median = ["sap", "carbon", "heat_demand"]
@ -628,7 +634,9 @@ class Recommendations:
impact_summary.append(
{
"phase": rec["phase"],
"representative": rec["recommendation_id"] in representative_ids,
"recommendation_id": rec["recommendation_id"],
"measure_type": rec["measure_type"],
**current_phase_values
}
)

View file

@ -290,6 +290,11 @@ class RoofRecommendations:
insulation_materials = pd.DataFrame(insulation_materials)
non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if
r["type"] == insulation_materials["type"].values[0]), {}
)
lowest_selected_u_value = None
recommendations = []
for _, insulation_material_group in insulation_materials.groupby("description"):
@ -429,14 +434,15 @@ class RoofRecommendations:
"description": self.make_roof_insulation_description(material),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": None,
"sap_points": non_invasive_recommendations.get("sap_points", 0),
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": {
"roof-description": new_description,
"roof-energy-eff": new_efficiency
},
**cost_result
**cost_result,
"survey": non_invasive_recommendations.get("survey", False)
}
)

View file

@ -385,6 +385,11 @@ class WallRecommendations(Definitions):
if insulation_thickness == "below average":
cavity_width = cavity_width * (1 - PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION)
non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if
r["type"] == insulation_materials["type"].values[0]), {}
)
# Test the different fill options
lowest_selected_u_value = None
recommendations = []
@ -475,14 +480,15 @@ class WallRecommendations(Definitions):
"description": description,
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": None,
"sap_points": non_invasive_recommendations.get("sap_points", None),
"already_installed": already_installed,
"simulation_config": simulation_config,
"description_simulation": {
"walls-description": "Cavity wall, filled cavity",
"walls-energy-eff": "Good"
},
**cost_result
**cost_result,
"survey": non_invasive_recommendations.get("survey", False)
}
)

View file

@ -0,0 +1,49 @@
import re
import boto3
import PyPDF2
import fitz
class OsmosisConditionReportParser:
def __init__(self, filekey, bucket_name=None):
self.s3_client = boto3.client('s3')
self.bucket_name = bucket_name
self.filekey = filekey
self.pdf_text = None
self._read_file()
def _read_file(self):
"""
Reads the XML file either locally or from S3 and parses it using minidom.
Raises:
ValueError: If the file cannot be found, read, or parsed.
"""
chunk_size = 10
try:
if self.bucket_name:
# Read from S3
raise NotImplementedError("Imeplement me")
else:
with fitz.open(self.filekey) as pdf:
text = ""
for page in pdf:
text += page.get_text()
# Parse the XML content using minidom
self.pdf_text = text
except FileNotFoundError:
raise ValueError(f"Local file not found: {self.filekey}")
except Exception as e:
raise ValueError(f"An error occurred while reading or parsing the XML: {e}")
def extract(self):
return {
"No. of Bedrooms": int(re.search(r"No\. of Bedrooms \(Total\)\s*(\d+)", self.pdf_text).group(1)),
"Risk Assessment Pathway": re.search(r"Risk\s*Assessment\s*Pathway\s*([A-Z])", self.pdf_text).group(1)
}

File diff suppressed because it is too large Load diff

306
utils/fullSapParser.py Normal file
View file

@ -0,0 +1,306 @@
import boto3
from xml.dom.minidom import parseString
PROPERTY_AGE_BAND = {
"A": "before 1900",
"B": "1900-1929",
"C": "1930-1949",
"D": "1950-1966",
"E": "1967-1975",
"F": "1976-1982",
"G": "1983-1990",
"H": "1991-1995",
"I": "1996-2002",
"J": "2003-2006",
"K": "2007-2011",
"L": "2012 onwards"
}
POSITION_OF_FLAT = {
"TopFloorFlat": "(top floor)"
}
MAINHEATING_LOOKUP = {
"SEB": "Electric (SEB modern slimline storage heaters)"
}
WINDOWS_YEAR_LOOKUP = {
"unknown install date": "unknown year",
"unknown install": "unknown year",
"post or during 2002": "2002 onwards",
}
class FullSapParser:
full_address = None
archetype = None
age_band = None
unheated_corridor = None
property_type = None
built_form = None
# ventilation
mechanical_ventilation = None
cross_ventilation = None
night_ventilation = None
# dimensions
number_of_storeys = None
property_dimensions = None
# fabric
low_energy_lighting = None
# Heating
heating1 = None
cylinder = None
cylinder_stat = None
def __init__(self, filekey, bucket_name=None):
self.s3_client = boto3.client('s3')
self.bucket_name = bucket_name
self.filekey = filekey
self.full_sap = None
self._read_file()
def _read_file(self):
"""
Reads the XML file either locally or from S3 and parses it using minidom.
Raises:
ValueError: If the file cannot be found, read, or parsed.
"""
try:
if self.bucket_name:
# Read from S3
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=self.filekey)
xml_content = response['Body'].read()
else:
# Read locally
with open(self.filekey, "r") as f:
xml_content = f.read()
# Parse the XML content using minidom
self.full_sap = parseString(xml_content)
except FileNotFoundError:
raise ValueError(f"Local file not found: {self.filekey}")
except Exception as e:
raise ValueError(f"An error occurred while reading or parsing the XML: {e}")
def extract(self, _return=True):
self.get_address()
self.get_archetype()
self.get_age_band()
self.get_unheated_corridor()
self.get_heating_1()
self.get_ventilation()
self.get_floor_area()
self.get_low_energy_lighting()
self.get_cylinder()
if _return:
return {
"Property Type": self.property_type,
"Built Form": self.built_form,
"Age Band": self.age_band,
}
def get_address(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
address = self.full_sap.getElementsByTagName("AddressAsDesigned")
if len(address) != 1:
raise ValueError("Non-unique address tag found - investigate me")
address = address[0]
data = {}
for node in address.childNodes:
if node.nodeType == node.ELEMENT_NODE:
data[node.nodeName] = node.firstChild.nodeValue if node.firstChild else None
self.full_address = " ".join(
[
x.title() for x in [data["AddressLine1"], data["AddressLine2"], data["AddressLine3"], data["Town"]]
if x is not None
]
) + " " + data["Postcode"]
def get_archetype(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
property_type1 = self.full_sap.getElementsByTagName('PropertyType1')
property_type2 = self.full_sap.getElementsByTagName('PropertyType2')
position_of_flat = self.full_sap.getElementsByTagName('PositionOfFlat')
if len(property_type1) != 1 or len(property_type2) != 1:
raise ValueError("Non-unique property tag found - investigate me")
property_type1 = property_type1[0].firstChild.nodeValue
property_type2 = property_type2[0].firstChild.nodeValue
if position_of_flat[0].firstChild:
position_of_flat = POSITION_OF_FLAT[position_of_flat[0].firstChild.nodeValue]
else:
position_of_flat = None
self.property_type = property_type1
self.built_form = property_type2
self.archetype = property_type1 + " - " + property_type2
if position_of_flat:
self.archetype = self.archetype + " " + position_of_flat
def get_age_band(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
property_age_band = self.full_sap.getElementsByTagName('PropertyAgeBand')
if len(property_age_band) != 1:
raise ValueError("Non-unique property age band tag found - investigate me")
property_age_band = property_age_band[0].firstChild.nodeValue
self.age_band = PROPERTY_AGE_BAND[property_age_band]
def get_wall_area_for_description(self, description):
wall_recs = self.full_sap.getElementsByTagName("WallRec")
for wall_rec in wall_recs:
desc_elements = wall_rec.getElementsByTagName("Description")
if desc_elements and desc_elements[0].firstChild.data == description:
area_elements = wall_rec.getElementsByTagName("Area")
if area_elements:
area = float(area_elements[0].firstChild.data)
# Placeholder for wall_description which you'll populate later
return f"Unheated corridor - {area} area"
return None
def get_unheated_corridor(self):
"""
Unheated corridors don't always exist so we'll need to search for it
:return:
"""
if not self.full_sap:
raise ValueError("You need to read the file first")
self.unheated_corridor = self.get_wall_area_for_description("Flat corridor Main")
def get_heating_1(self):
if not self.full_sap:
raise ValueError("You need to read the file first")
main_heating_system = self.full_sap.getElementsByTagName('MainHeatingSystem1')
if len(main_heating_system) != 1:
raise ValueError("Non-unique main heating system tag found - investigate me")
main_heating_system = main_heating_system[0]
mhs = main_heating_system.getElementsByTagName('MHS')[0].firstChild.nodeValue
mhs = MAINHEATING_LOOKUP.get(mhs, mhs)
fraction = main_heating_system.getElementsByTagName('Fraction')[0].firstChild.nodeValue
self.heating1 = f"{mhs} : {fraction}% of heating"
def get_ventilation(self):
bool_lookup = {
"true": True,
"false": False
}
# Extract MechanicalVentilationDecentralised
mech_vent = self.full_sap.getElementsByTagName("MechanicalVentilationDecentralised")
if mech_vent and mech_vent[0].childNodes:
mech_vent_value = mech_vent[0].firstChild.nodeValue
else:
mech_vent_value = None
# Extract CrossVentilation
cross_vent = self.full_sap.getElementsByTagName("CrossVentilation")
if cross_vent and cross_vent[0].childNodes:
cross_vent_value = cross_vent[0].firstChild.nodeValue
cross_vent_value = bool_lookup.get(cross_vent_value, cross_vent_value)
else:
cross_vent_value = None
# Extract NightVentilation
night_vent = self.full_sap.getElementsByTagName("NightVentilation")
if night_vent and night_vent[0].childNodes:
night_vent_value = night_vent[0].firstChild.nodeValue
night_vent_value = bool_lookup.get(night_vent_value, night_vent_value)
else:
night_vent_value = None
# Create the outputs
self.mechanical_ventilation = "Mechanical ventilation present" if mech_vent_value else "No mechanical " \
"ventilation"
self.cross_ventilation = "Cross ventilation present" if cross_vent_value else "No cross ventilation"
self.night_ventilation = "Night ventilation present" if night_vent_value else "No night ventilation"
def get_floor_area(self):
self.number_of_storeys = int(self.full_sap.getElementsByTagName('NumberOfStoreys')[0].firstChild.nodeValue)
storeys = self.full_sap.getElementsByTagName('StoreyMeasurementRec')
# TODO: The first StoreyMeasurementRec tag looks like this in the examples we've seen:
# <StoreyMeasurementRec xsi:nil="true" />
# Indicating that the tag is explicitly indicated as empty
storey_data = []
storey_index = -1
for storey in storeys:
storey_index += 1
if storey.getAttribute("xsi:nil") == "true":
continue
if storey_index == -1:
raise NotImplementedError(
"Investigated me - potentially basement found but need to confirm with Basement tag"
)
floor_area = storey.getElementsByTagName('InternalFloorArea')
if not floor_area:
continue
floor_area = float(floor_area[0].firstChild.nodeValue)
# If floor area is 0, skip this storey
if not floor_area:
continue
perimeter = float(storey.getElementsByTagName('InternalPerimeter')[0].firstChild.nodeValue)
height = float(storey.getElementsByTagName('StoreyHeight')[0].firstChild.nodeValue)
storey_data.append({
"storey_index": storey_index,
"Floor Area": floor_area,
"Perimeter": perimeter,
"Height": height
})
# We will convert this into a table in the markdown
self.property_dimensions = storey_data
def get_low_energy_lighting(self):
# Extract the values of the LightFittings and LELFittings tags
light_fittings = self.full_sap.getElementsByTagName('LightFittings')[0].firstChild.data
lel_fittings = self.full_sap.getElementsByTagName('LELFittings')[0].firstChild.data
# Construct the string message
self.low_energy_lighting = f"{lel_fittings} out of {light_fittings} lighting fittings are low energy."
def get_cylinder(self):
insulation_type = self.full_sap.getElementsByTagName('InsulationType')[0].firstChild.data
insulation_thickness = self.full_sap.getElementsByTagName('InsulationThickness')[0].firstChild.data
if insulation_type and insulation_thickness:
self.cylinder = f"Insulated, {insulation_type}: {insulation_thickness}mm."
else:
self.cylinder = "Not insulated."
self.cylinder_stat = self.full_sap.getElementsByTagName('CylinderStat')[0].firstChild.data