Model/etl/customers/stonewater/Wave 3 Preparation.py
Khalim Conn-Kowlessar 294506853d adding in new features
2024-11-18 18:24:26 +00:00

2432 lines
105 KiB
Python

import os
import PyPDF2
import re
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import Counter
from scipy.optimize import linprog
from utils.s3 import read_pickle_from_s3
CUSTOMER_FOLDER_PATH = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater"
SURVEY_FOLDERS = os.path.join(CUSTOMER_FOLDER_PATH, "StonewaterSurveys_{i}")
NUM_FOLDERS = 15
def sap_to_epc(sap_points: int | float):
"""
Simple utility function to convert SAP points to EPC rating.
:param sap_points: numerical value of SAP points, typically between 0 and 100
:return:
"""
if sap_points <= 0:
raise ValueError("SAP points should be above 0.")
if sap_points >= 92:
return "A"
elif sap_points >= 81:
return "B"
elif sap_points >= 69:
return "C"
elif sap_points >= 55:
return "D"
elif sap_points >= 39:
return "E"
elif sap_points >= 21:
return "F"
else:
return "G"
def extract_wall_details_summary(text):
"""
Extracts wall type, insulation, dry-lining, and thickness for each building part,
including any alternative wall details within the 7.0 Walls section of the summary PDF text.
"""
# Define data structure to hold all building part wall entries
wall_data = []
# Locate the entire 7.0 Walls section
wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1)
# Define pattern to match each building part's wall entry within the section
building_part_pattern = re.compile(
r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label
r"Type\s+(.*?)\n" # Matches main wall Type
r"Insulation\s+(.*?)\n" # Matches main wall Insulation
r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining
r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown
r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness
re.DOTALL
)
# Define pattern to capture alternative wall details, if present
alternative_wall_pattern = re.compile(
r"Alternative Wall Area.*?\n" # Matches start of alternative wall section
r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type
r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation
r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining
r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown
r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness
re.DOTALL
)
# Find all building part entries within the 7.0 Walls section
for match in building_part_pattern.finditer(wall_section):
wall_label = match.group(1).strip()
main_wall_type = match.group(2).strip()
main_wall_insulation = match.group(3).strip()
main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A"
main_wall_thickness_unknown = match.group(6).strip()
main_wall_thickness = int(match.group(7))
# Initialize dictionary for this wall entry
wall_entry = {
"Building Part": wall_label,
"Wall Type": main_wall_type,
"Wall Insulation": main_wall_insulation,
"Wall Dry-lining": main_wall_dry_lining,
"Wall Thickness Unknown": main_wall_thickness_unknown,
"Wall Thickness (mm)": main_wall_thickness,
"Alternative Wall Type": None,
"Alternative Wall Insulation": None,
"Alternative Wall Dry-lining": "N/A",
"Alternative Wall Thickness Unknown": None,
"Alternative Wall Thickness (mm)": None,
}
# Check if there's an alternative wall section following this wall entry
alt_match = alternative_wall_pattern.search(wall_section, match.end())
if alt_match:
wall_entry["Alternative Wall Type"] = alt_match.group(1).strip()
wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip()
wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A"
wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip()
wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6))
# Append each building part as a dictionary in the wall_data list
wall_data.append(wall_entry)
return wall_data
def extract_summary_report(pdf_path):
"""
Extracts specific data from the provided PDF file.
Data includes:
- Current SAP rating
- Fuel Bill
- Address
"""
data = {
"Address": None,
"Postcode": None,
"Current SAP Rating": None,
"Current EPC Band": None,
"Fuel Bill": None,
"Number of Storeys": None,
"Window Age Description": None,
"Window Age Description Proportion (%)": None,
"Secondary Window Age Description": None,
"Secondary Window Age Description Proportion (%)": None,
"Number of Windows": None,
"Total Number of Doors": None,
"Number of Insulated Doors": None,
"Existing Primary Heating System": None,
"Existing Primary Heating PCDF Reference": None,
"Existing Primary Heating Controls": None,
"Existing Primary Heating % of Heat": None,
"Existing Secondary Heating System": None,
"Existing Secondary Heating PCDF Reference": None,
"Existing Secondary Heating Controls": None,
"Existing Secondary Heating % of Heat": None,
"Secondary Heating Code": None,
"Water Heating Code": None,
'Total Floor Area (m2)': None,
'Total Ground Floor Area (m2)': None,
'RIR Floor Area': None,
'Main Building Wall Area (m2)': None,
'First Extension Wall Area (m2)': None,
"Number of Light Fittings": None,
"Number of LEL Fittings": None,
"Number of fittings needing LEL": None,
"Main Roof Type": None,
"Main Roof Insulation": None,
"Main Roof Insulation Thickness": None,
"Main Wall Type": None,
"Main Wall Insulation": None,
"Main Wall Dry-lining": None,
"Main Wall Thickness": None,
"Main Building Alternative Wall Type": None,
"Main Building Alternative Wall Insulation": None,
"Main Building Alternative Wall Dry-lining": None,
"Main Building Alternative Wall Thickness": None,
}
with (open(pdf_path, "rb") as file):
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
# Extract Current SAP rating
sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text)
data["Current SAP Rating"] = sap_match.group(1).split(" ")[1]
# Number of storeys
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
data["Number of Storeys"] = int(storeys_match.group(1))
# Extract Carbon Emissions
# carbon_match = re.search(r"Emissions \(t/year\):\s*([\d.]+)\s*tonnes", text)
# data["Carbon Emissions (t/year)"] = float(carbon_match.group(1))
# Extract Fuel Bill
fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text)
data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}"
# Extract individual address components
postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text)
# region = re.search(r"Region:\s*(.*?)\nHouse Name:", text)
house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text)
house_no = re.search(r"House No:\s*(.*?)\nStreet:", text)
street = re.search(r"Street:\s*(.*?)\nLocality:", text)
locality = re.search(r"Locality:\s*(.*?)\nTown:", text)
town = re.search(r"Town:\s*(.*?)\nCounty:", text)
county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text)
# Clean extracted values and remove any prefixes
address_parts = [
house_no.group(1).strip() if house_no else "",
house_name.group(1).strip() if house_name else "",
street.group(1).strip() if street else "",
locality.group(1).strip() if locality else "",
town.group(1).strip() if town else "",
county.group(1).strip() if county else "",
postcode.group(1).strip() if postcode else ""
]
# Join non-empty parts with a comma
data["Address"] = ", ".join([part for part in address_parts if part])
data["Postcode"] = postcode.group(1).strip()
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
windows_text = windows_section.group(1)
window_data = extract_window_age_description(windows_text)
data.update(window_data)
# Extract Total Number of Doors
total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text)
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text)
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
# Extract heating system
# Extract Primary Heating Data
# Extract Primary Heating Section
primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL)
primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
data["Existing Primary Heating System"] = re.search(r"Main Heating Code\s*(.*?)\n", primary_text).group(
1).strip()
data["Existing Primary Heating PCDF Reference"] = re.search(
r"PCDF boiler Reference\s*(\d+)", primary_text
).group(1)
data["Existing Primary Heating Controls"] = re.search(
r"Main Heating Controls\s*(.*?)\n", primary_text
).group(1).strip()
data["Existing Primary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1)
)
# Extract Secondary Heating Section
secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL)
if secondary_heating_section is None:
data["Existing Secondary Heating System"] = ""
data["Existing Secondary Heating PCDF Reference"] = ""
data["Existing Secondary Heating Controls"] = ""
data["Existing Secondary Heating % of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
main_heating_code_match_secondary = re.search(
r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
)
data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip()
data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)",
secondary_text).group(1)
second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text)
data["Existing Secondary Heating Controls"] = (
second_heating_controls_match.group(1).strip() if second_heating_controls_match else ""
)
data["Existing Secondary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1)
)
# Extract Secondary Heating and Water Heating Codes
secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if data["Existing Secondary Heating System"] == "":
data["Secondary Heating Code"] = ""
else:
data["Secondary Heating Code"] = secondary_heating_code_match.group(
1).strip() if secondary_heating_code_match else ""
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
dimensions = extract_building_parts_summary(text)
data.update(dimensions)
data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1))
data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1))
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
roof_section = re.search(r"8\.0 Roofs:\n(.*?)\n9\.0 Floors:", text, re.DOTALL)
roof_text = roof_section.group(1).strip()
roof_type_match = re.search(r"Type\s*([A-Za-z0-9\s]+)", roof_text)
data["Main Roof Type"] = roof_type_match.group(1).strip() if roof_type_match else None
# Check if "Insulation" exists between Type and Insulation Thickness
insulation_search = re.search(
r"Type\s+.*?\n(Insulation\s+(.*?)\n)?(Insulation Thickness\s+(.*?)\n)", roof_text, re.DOTALL
)
if insulation_search:
# Insulation match will be present if it exists, otherwise it will be None
insulation_match = insulation_search.group(2) # Optional group for Insulation
insulation_thickness_match = insulation_search.group(4) # Required group for Insulation Thickness
# Populate insulation fields
data["Main Roof Insulation"] = insulation_match.strip() if insulation_match else None
data["Main Roof Insulation Thickness"] = (
insulation_thickness_match.strip() if insulation_thickness_match else None
)
walls_data = extract_wall_details_summary(text)
# Get the main building wall data
main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0]
data["Main Wall Type"] = main_building_walls["Wall Type"]
data["Main Wall Insulation"] = main_building_walls["Wall Insulation"]
data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"]
data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"]
data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"]
data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"]
data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"]
data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"]
return data
def extract_window_age_description(windows_text):
"""
Extracts the most common window age description and its proportion.
Parameters:
windows_text (str): The text section containing window data.
Returns:
dict: A dictionary with the most common window age description and its proportion.
"""
# Clean up windows_text by removing line breaks for better pattern matching
windows_text = windows_text.replace("\n", "")
# Define possible window age descriptions
window_descriptions = [
"Double post or during 2002",
"Double pre 2002",
"Double with unknown install date",
"Secondary glazing",
"Triple glazing",
"Single glazing",
]
# Count occurrences of each description
description_counts = Counter()
for description in window_descriptions:
matches = re.findall(re.escape(description), windows_text)
description_counts[description] = len(matches)
if not description_counts or not sum(description_counts.values()):
raise ValueError("Failed to extract window data.")
# Determine the most common description and calculate its proportion
most_common_description, window_count = description_counts.most_common(1)[0]
window_proportion = window_count / sum(description_counts.values()) * 100
# Get the second most common and the proportion
if window_proportion == 100:
second_most_common_description = None
second_most_common_proportion = 0
else:
second_most_common_description, second_window_count = description_counts.most_common(2)[1]
second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100
return {
"Window Age Description": most_common_description,
"Window Age Description Proportion (%)": window_proportion,
"Secondary Window Age Description": second_most_common_description,
"Secondary Window Age Description Proportion (%)": second_most_common_proportion,
"Number of Windows": sum(description_counts.values())
}
def extract_building_parts_epr(text):
"""
Extracts building parts and associated dimensions from the provided PDF text.
Each building part (main and extensions) includes floor area, room height, perimeter, and party wall length.
Handles cases where 'Room(s) in Roof area' appears within the part_name with only the Floor Area information.
"""
data = []
# Pattern to locate each "Building part" section
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\nFloor Area \[m2\] Room Height \[m\] Perimeter \[m\] Party "
r"Wall Length \[m\]\n(.*?)(?=Construction details|Data inputs|$)",
re.DOTALL
)
# Extract each building part
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
floor_data = match.group(2)
# Check for "Room(s) in Roof area" within the part_name
room_in_roof_match = re.search(r"Room\(s\) in Roof area:\s*([\d.]+)", part_name)
if room_in_roof_match:
# Extract Room in Roof area and add it as a separate entry
floor_area = float(room_in_roof_match.group(1))
# Clean up part name to exclude "Room(s) in Roof area" from the building part name
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
data.append({
"Building Part": cleaned_part_name,
"Floor Level": "Room in Roof",
"Floor Area (m2)": floor_area,
"Room Height (m)": None, # Placeholder for missing data
"Perimeter (m)": None, # Placeholder for missing data
"Party Wall Length (m)": None # Placeholder for missing data
})
else:
# Clean up part name to keep only the descriptor (e.g., "Main" or "1st Extension")
cleaned_part_name = re.sub(r" - built in.*", "", part_name).strip()
# Pattern to match each floor's measurements in standard cases
floor_pattern = re.compile(
r"(Lowest floor|First floor|Second floor)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
# Extract floor details for each building part
for floor_match in floor_pattern.finditer(floor_data):
floor_level = floor_match.group(1)
floor_area = float(floor_match.group(2))
room_height = float(floor_match.group(3))
perimeter = float(floor_match.group(4))
party_wall_length = float(floor_match.group(5))
# Append to data
data.append({
"Building Part": cleaned_part_name,
"Floor Level": floor_level,
"Floor Area (m2)": floor_area,
"Room Height (m)": room_height,
"Perimeter (m)": perimeter,
"Party Wall Length (m)": party_wall_length
})
# Aggregated data calculation
main_building = [part for part in data if "Main" in part["Building Part"]]
first_extension = [part for part in data if "1st Extension" in part["Building Part"]]
dimensions = {
"Total Floor Area (m2)": sum([part["Floor Area (m2)"] for part in data]),
"Total Ground Floor Area (m2)": sum(
[part["Floor Area (m2)"] for part in data if "Lowest floor" in part["Floor Level"]]
),
"RIR Floor Area": sum(
[part["Floor Area (m2)"] for part in data if "Room in Roof" in part["Floor Level"]]
),
"Main Building Wall Area (m2)": sum(
[x["Perimeter (m)"] * x["Room Height (m)"] for x in main_building if
x["Perimeter (m)"] and x["Room Height (m)"]]
),
"First Extension Wall Area (m2)": sum(
[x["Perimeter (m)"] * x["Room Height (m)"] for x in first_extension if
x["Perimeter (m)"] and x["Room Height (m)"]]
) if first_extension else 0,
}
return dimensions
def extract_building_parts_summary(text):
"""
Extracts building parts and associated dimensions from the summary report PDF.
This includes Main Property, multiple extensions if they exist, and Room in Roof areas.
"""
data = []
# Locate the Dimensions section
dimensions_section = re.search(
r"Dimensions:\s*Dimension type: Internal\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL
)
if not dimensions_section:
raise ValueError("Failed to locate dimensions section in the text.")
dimensions_text = dimensions_section.group(1)
# Pattern to extract each building part, starting from Main Property and including extensions
building_part_pattern = re.compile(
r"(Main Property|\d+(?:st|nd|rd|th) Extension)\s*"
r"(.*?)(?=\d+(?:st|nd|rd|th) Extension|5\.0 Conservatory|$)",
re.DOTALL
)
# Loop through each building part match, including Main Property and extensions
for match in building_part_pattern.finditer(dimensions_text):
part_name = match.group(1)
floor_data = match.group(2)
# Pattern to extract floor details: Floor Level, Floor Area, Room Height, Perimeter, Party Wall Length
floor_pattern = re.compile(
r"(1st Floor|Lowest Floor|Second floor):\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
# Extract data for each floor within the building part
for floor_match in floor_pattern.finditer(floor_data):
floor_level = floor_match.group(1)
floor_area = float(floor_match.group(2))
room_height = float(floor_match.group(3))
perimeter = float(floor_match.group(4))
party_wall_length = float(floor_match.group(5))
# Append to data list
data.append({
"Building Part": part_name,
"Floor Level": floor_level,
"Floor Area (m2)": floor_area,
"Room Height (m)": room_height,
"Perimeter (m)": perimeter,
"Party Wall Length (m)": party_wall_length
})
# Check specifically for "Room(s) in Roof" entries, which only have Floor Area
room_in_roof_pattern = re.compile(r"Room\(s\) in Roof:\s*([\d.]+)")
room_in_roof_match = room_in_roof_pattern.search(floor_data)
if room_in_roof_match:
floor_area = float(room_in_roof_match.group(1))
data.append({
"Building Part": part_name,
"Floor Level": "Room in Roof",
"Floor Area (m2)": floor_area,
"Room Height (m)": None, # Placeholder for missing data
"Perimeter (m)": None, # Placeholder for missing data
"Party Wall Length (m)": None # Placeholder for missing data
})
# Calculate aggregated dimensions
main_property = [part for part in data if "Main Property" in part["Building Part"]]
first_extensions = [part for part in data if "1st Extension" in part["Building Part"]]
dimensions = {
"Total Floor Area (m2)": sum([part["Floor Area (m2)"] for part in data]),
"Total Ground Floor Area (m2)": sum(
[part["Floor Area (m2)"] for part in data if "Lowest Floor" in part["Floor Level"]]
),
"RIR Floor Area": sum(
[part["Floor Area (m2)"] for part in data if "Room in Roof" in part["Floor Level"]]
),
"Main Building Wall Area (m2)": sum([x["Perimeter (m)"] * x["Room Height (m)"] for x in main_property if
x["Perimeter (m)"] and x["Room Height (m)"]]),
"First Extension Wall Area (m2)": sum(
[x["Perimeter (m)"] * x["Room Height (m)"] for x in first_extensions if
x["Perimeter (m)"] and x["Room Height (m)"]]
),
}
return dimensions
def extract_roof_details_epr(text):
"""
Extracts roof type, insulation, and insulation thickness for each building part
in the provided EPR PDF text.
"""
# Define data structure to hold results
roof_data = []
# Locate each building part section
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
# Extract each building part's data, including roof details
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
# Clean up the building part name
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
# Extract Roof Type, Roof Insulation, and Roof Insulation Thickness
roof_type_match = re.search(r"Roof Type:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_match = re.search(r"Roof Insulation:\s*(.*?)(?=\n|$)", part_details)
roof_insulation_thickness_match = re.search(r"Roof Insulation Thickness:\s*(.*?)(?=\n|$)", part_details)
# Store results for this building part
roof_data.append({
"Building Part": cleaned_part_name,
"Roof Type": roof_type_match.group(1).strip() if roof_type_match else None,
"Roof Insulation": roof_insulation_match.group(1).strip() if roof_insulation_match else None,
"Roof Insulation Thickness": roof_insulation_thickness_match.group(
1).strip() if roof_insulation_thickness_match else None,
})
return roof_data
def extract_wall_details_epr(text):
"""
Extracts wall type, insulation, dry-lining, and thickness for each building part
in the provided EPR PDF text.
"""
# Define data structure to hold results
wall_data = []
# Locate each building part section
building_part_pattern = re.compile(
r"Construction details: Building part: (.*?)\n(.*?)(?=Conservatory|Construction details|$)",
re.DOTALL
)
# Extract each building part's data, including wall details
for match in building_part_pattern.finditer(text):
part_name = match.group(1).strip()
# Clean up the building part name
cleaned_part_name = re.sub(r" - built in.*|Room\(s\) in Roof area:.*", "", part_name).strip()
part_details = match.group(2)
# Extract Wall Type, Wall Insulation, Wall Dry-lining, and Wall Thickness
wall_type_match = re.search(r"Wall Type:\s*(.*?)(?=\n|$)", part_details)
wall_insulation_match = re.search(r"Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
wall_drylining_match = re.search(r"Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
wall_thickness_match = re.search(r"Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
# Extract Alternative Wall information if available
alt_wall_type_match = re.search(r"Alternative Wall Type:\s*(.*?)(?=\n|$)", part_details)
alt_wall_insulation_match = re.search(r"Alternative Wall Insulation:\s*(.*?)(?=\n|$)", part_details)
alt_wall_drylining_match = re.search(r"Alternative Wall Dry-lining:\s*(.*?)(?=\n|$)", part_details)
alt_wall_thickness_match = re.search(r"Alternative Wall Thickness:\s*(\d+)(?=\n|$)", part_details)
# Store results for this building part
wall_data.append({
"Building Part": cleaned_part_name,
"Wall Type": wall_type_match.group(1).strip() if wall_type_match else None,
"Wall Insulation": wall_insulation_match.group(1).strip() if wall_insulation_match else None,
"Wall Dry-lining": wall_drylining_match.group(1).strip() if wall_drylining_match else None,
"Wall Thickness": int(wall_thickness_match.group(1)) if wall_thickness_match else None,
"Alternative Wall Type": alt_wall_type_match.group(1).strip() if alt_wall_type_match else None,
"Alternative Wall Insulation": alt_wall_insulation_match.group(
1).strip() if alt_wall_insulation_match else None,
"Alternative Wall Dry-lining": alt_wall_drylining_match.group(
1).strip() if alt_wall_drylining_match else None,
"Alternative Wall Thickness": int(alt_wall_thickness_match.group(1)) if alt_wall_thickness_match else None,
})
return wall_data
def extract_epr(pdf_path):
"""
Extracts specific data from an Energy Report (EPR) PDF file.
"""
data = {
"Address": None,
"Postcode": None,
"Current SAP Rating": None,
"Current EPC Band": None,
"Primary Energy Use (kWh/yr)": None,
"Primary Energy Use Intensity (kWh/m2/yr)": None,
"Number of Storeys": None,
"Fuel Bill": None,
"Window Age Description": None,
"Window Age Description Proportion (%)": None,
"Secondary Window Age Description": None,
"Secondary Window Age Description Proportion (%)": None,
"Number of Windows": None,
"Total Number of Doors": None,
"Number of Insulated Doors": None,
"Existing Primary Heating System": None,
"Existing Primary Heating PCDF Reference": None,
"Existing Primary Heating Controls": None,
"Existing Primary Heating % of Heat": None,
"Existing Secondary Heating System": None,
"Existing Secondary Heating PCDF Reference": None,
"Existing Secondary Heating Controls": None,
"Existing Secondary Heating % of Heat": None,
"Secondary Heating Code": None,
"Water Heating Code": None,
'Total Floor Area (m2)': None,
'Total Ground Floor Area (m2)': None,
'RIR Floor Area': None,
'Main Building Wall Area (m2)': None,
'First Extension Wall Area (m2)': None,
"Number of Light Fittings": None,
"Number of LEL Fittings": None,
"Number of fittings needing LEL": None,
"Main Roof Type": None,
"Main Roof Insulation": None,
"Main Roof Insulation Thickness": None,
"Main Wall Type": None,
"Main Wall Insulation": None,
"Main Wall Dry-lining": None,
"Main Wall Thickness": None,
"Main Building Alternative Wall Type": None,
"Main Building Alternative Wall Insulation": None,
"Main Building Alternative Wall Dry-lining": None,
"Main Building Alternative Wall Thickness": None,
}
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
# Extract Address
address_match = re.search(r"ENERGY REPORT\nDwelling Address\s*(.*?)\s*\nReference", text, re.DOTALL)
data["Address"] = address_match.group(1).strip()
data["Postcode"] = data["Address"].split(",")[-1].strip()
# Extract Current and Potential SAP ratings
sap_match = re.search(r"GG \(1-20\)\s*(\d{1,2})\s*(\d{1,2})", text)
current_sap, _ = int(sap_match.group(1)), int(sap_match.group(2))
data["Current SAP Rating"] = current_sap
# Extract the primary energy use intensity
additional_rating_match = re.search(r"Additional ratings for your home\s*([\d.]+)", text)
data["Primary Energy Use Intensity (kWh/m2/yr)"] = float(additional_rating_match.group(1))
# Extract Number of Storeys
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
data["Number of Storeys"] = int(storeys_match.group(1))
# Extract Fuel Bill
fuel_bill_match = re.search(r"TOTAL\s*£(\d+)", text)
data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}"
# Extract Total Number of Doors
total_doors_match = re.search(r"Total Doors:\s*(\d+)", text)
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Insulated Doors:\s*(\d+)", text)
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
# Extract Primary Heating Section (Main Heating 1)
primary_heating_section1 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Main\s*Heating\s*2", text, re.DOTALL)
# We may not have a secondary heating
primary_heating_section2 = re.search(r"Main\s*Heating\s*1\s*(.*?)\s*Secondary\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
data["Existing Primary Heating System"] = re.search(
r"Main Heating Code\s*(.*?)\n", primary_text
).group(1).strip()
data["Existing Primary Heating PCDF Reference"] = re.search(
r"PCDF boiler Reference\s*(\d+)", primary_text
).group(1)
data["Existing Primary Heating Controls"] = re.search(
r"Main Heating Controls\s*(.*?)\n", primary_text
).group(1).strip()
data["Existing Primary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%?", primary_text).group(1)
)
# Extract Secondary Heating Section (Main Heating 2)
secondary_heating_section = re.search(r"Main\s*Heating\s*2\s*(.*?)\s*Secondary Heating", text, re.DOTALL)
if secondary_heating_section is None:
data["Existing Secondary Heating System"] = ""
data["Existing Secondary Heating PCDF Reference"] = ""
data["Existing Secondary Heating Controls"] = ""
data["Existing Secondary Heating % of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
main_heating_code_match_secondary = re.search(
r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
)
data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip()
data["Existing Secondary Heating PCDF Reference"] = re.search(
r"PCDF boiler Reference\s*(\d+)", secondary_text
).group(1)
if data["Existing Secondary Heating System"] == "":
data["Existing Secondary Heating Controls"] = ""
else:
# Might not have heating controls on 2nd system
secondary_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text)
data["Existing Secondary Heating Controls"] = (
secondary_controls_match.group(1).strip() if secondary_controls_match else ""
)
data["Existing Secondary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%?", secondary_text).group(1)
)
# Extract Secondary Heating and Water Heating Codes
secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if data["Existing Secondary Heating System"] == "":
data["Secondary Heating Code"] = ""
else:
data["Secondary Heating Code"] = secondary_heating_code_match.group(
1).strip() if secondary_heating_code_match else ""
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
# Extract Windows information
windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
if windows_section:
windows_text = windows_section.group(1)
window_data = extract_window_age_description(windows_text)
data.update(window_data)
building_parts = extract_building_parts_epr(text)
data.update(building_parts)
# Get number of lighting outlets and number of fittings needing LEL
lighting_fittings_match = re.search(r"Total number of light fittings\s*(\d+)", text)
data["Number of Light Fittings"] = int(lighting_fittings_match.group(1))
lel_fittings_match = re.search(r"Total number of L.E.L. fittings\s*(\d+)", text)
data["Number of LEL Fittings"] = int(lel_fittings_match.group(1))
data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
roof_details = extract_roof_details_epr(text)
# Get from the main building
main_roof_details = [r for r in roof_details if "Main" in r["Building Part"]]
data["Main Roof Type"] = main_roof_details[0]["Roof Type"]
data["Main Roof Insulation"] = main_roof_details[0]["Roof Insulation"]
data["Main Roof Insulation Thickness"] = main_roof_details[0]["Roof Insulation Thickness"]
wall_details = extract_wall_details_epr(text)
main_wall_details = [w for w in wall_details if "Main" in w["Building Part"]][0]
data["Main Wall Type"] = main_wall_details["Wall Type"]
data["Main Wall Insulation"] = main_wall_details["Wall Insulation"]
data["Main Wall Dry-lining"] = main_wall_details["Wall Dry-lining"]
data["Main Wall Thickness"] = main_wall_details["Wall Thickness"]
data["Main Building Alternative Wall Type"] = main_wall_details["Alternative Wall Type"]
data["Main Building Alternative Wall Insulation"] = main_wall_details["Alternative Wall Insulation"]
data["Main Building Alternative Wall Dry-lining"] = main_wall_details["Alternative Wall Dry-lining"]
data["Main Building Alternative Wall Thickness"] = main_wall_details["Alternative Wall Thickness"]
return data
def detect_report_type(pdf_path, pdf_file):
"""
Detects the type of report based on content or filename.
:param pdf_path: String path to the PDF file
:param pdf_file: String name of the PDF file
:return: String type of the report ("epr", "summary", or None)
"""
# Attempt to read the first page of the PDF to determine type
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
first_page_text = reader.pages[0].extract_text() if reader.pages else ""
if is_energy_report(first_page_text):
return "epr"
elif "summary" in pdf_file.lower() or is_summary_report(first_page_text):
return "summary"
elif is_condition_report(first_page_text):
return "condition"
return None
def extract_retrofit_pdfs(data_folder_path):
"""
Handles extraction from a retrofit data folder if it exists and has content.
Prioritizes extracting data from an EPR if both EPR and summary report are present.
"""
retrofit_files = [f for f in os.listdir(data_folder_path) if f.endswith(".pdf")]
report_types = {"epr": None, "summary": None}
# First, identify the types of reports available
for pdf_file in retrofit_files:
pdf_path = os.path.join(data_folder_path, pdf_file)
report_type = detect_report_type(pdf_path, pdf_file)
if report_type == "epr":
report_types["epr"] = pdf_path
elif report_type == "summary":
report_types["summary"] = pdf_path
# Stop checking further if both EPR and summary are found
if report_types["epr"] and report_types["summary"]:
break
# Extract data based on report availability and priority
if report_types["epr"]:
return extract_epr(report_types["epr"])
elif report_types["summary"]:
return extract_summary_report(report_types["summary"])
# If no relevant PDF is found, return None
return None
def is_energy_report(text):
"""
Determines if the provided text indicates that the PDF is an Energy Report.
Returns True if the text contains 'Energy Report'.
"""
return text.startswith("ENERGY REPORT")
def is_summary_report(text):
"""
Determines if the provided text indicates that the PDF is a Summary Report.
"""
return text.startswith("Summary Information")
def detect_and_parse_report(pdf_path, pdf_file):
"""
Detects the type of report and extracts the relevant data.
:param pdf_path: String path to the PDF file
:param pdf_file: String name of the PDF file
:return:
"""
# Attempt to read the first page of the PDF to determine type
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
first_page_text = reader.pages[0].extract_text() if reader.pages else ""
if is_energy_report(first_page_text):
# Treat this as an Energy Report
return extract_epr(pdf_path)
elif "summary" in pdf_file.lower() or is_summary_report(first_page_text):
# Treat this as a Summary Report
return extract_summary_report(pdf_path)
elif is_condition_report(first_page_text):
return None
else:
raise NotImplementedError("Implement me")
def is_condition_report(text):
"""
Determines if the provided text indicates that the PDF is a Condition Report.
"""
return text.startswith("OsmosisACDNEWPAS2035ConditionReport") or text.startswith("OsmosisACDPAS2035ConditionReport")
def main():
"""
This code prepares the data for the Warm Homes: Social Housing Fund Wave 3, for Stonewater.
"""
# List only directories in the specified FILE_PATH
survey_folders = []
# Loop over each survey folder and list its contents
for i in range(1, NUM_FOLDERS + 1):
folder_path = os.path.join(CUSTOMER_FOLDER_PATH, f"StonewaterSurveys_{i}")
if os.path.isdir(folder_path): # Check if folder exists
folder_contents = [os.path.join(f"StonewaterSurveys_{i}", file) for file in os.listdir(folder_path)]
survey_folders.extend(folder_contents) # Append contents to the master list
# Get rid of .DS_Store files
survey_folders = [folder for folder in survey_folders if not folder.endswith(".DS_Store")]
extracted_data = []
for survey_folder in tqdm(survey_folders):
survey_folder_path = os.path.join(CUSTOMER_FOLDER_PATH, survey_folder)
# List the folders inside of the survey folder
survey_subfolders = [name for name in os.listdir(survey_folder_path)
if os.path.isdir(os.path.join(survey_folder_path, name))]
# Check if there's a "retrofit assessment" folder
retrofit_folder = next((name for name in survey_subfolders if "retrofit assessment" in name.lower()), None)
ra_folder = next(
(name for name in survey_subfolders if "ra coordinator info" in name.lower() or "ra info" in name.lower()),
None
)
# If retrofit assessment folder exists, check if it has content
if retrofit_folder or ra_folder:
if retrofit_folder:
retrofit_folder_path = os.path.join(survey_folder_path, retrofit_folder)
else:
retrofit_folder_path = os.path.join(survey_folder_path, ra_folder)
# Check if everything inside is a sub-folder and the number of folders is 2
items = [item for item in os.listdir(retrofit_folder_path) if item != '.DS_Store']
all_folders = [os.path.isdir(os.path.join(retrofit_folder_path, item)) for item in items]
if all(all_folders) and len(all_folders) == 2 and "Property Pics" in items:
# Get the folder that isn't Property Pics
retrofit_folder_path = os.path.join(
retrofit_folder_path, [item for item in items if item != "Property Pics"][0]
)
if os.listdir(retrofit_folder_path): # If not empty
summary_data = extract_retrofit_pdfs(data_folder_path=retrofit_folder_path)
if summary_data:
summary_data = {
"survey_folder": survey_folder,
**summary_data,
}
extracted_data.append(summary_data)
continue
else:
# Then we have an empty Retrofit Assessment folder
continue
# If no retrofit folder or it was empty, check files in survey_folder
summary_data = extract_retrofit_pdfs(data_folder_path=survey_folder_path)
if not summary_data:
if len(survey_subfolders) == 1:
survey_folder_path = os.path.join(survey_folder_path, survey_subfolders[0])
summary_data = extract_retrofit_pdfs(data_folder_path=survey_folder_path)
if summary_data:
summary_data = {
"survey_folder": survey_folder,
**summary_data,
}
extracted_data.append(summary_data)
extracted_data = pd.DataFrame(extracted_data)
extracted_data["Primary Energy Use (kWh/yr)"] = (
extracted_data["Primary Energy Use Intensity (kWh/m2/yr)"] * extracted_data["Total Floor Area (m2)"]
)
extracted_data["Current SAP Rating"] = extracted_data["Current SAP Rating"].astype(int)
extracted_data["Current EPC Band"] = extracted_data["Current SAP Rating"].apply(sap_to_epc)
# Remove some definite duplicates
dupes = extracted_data[extracted_data["Address"].duplicated()]["Address"]
dupes = extracted_data[extracted_data["Address"].isin(dupes)]
dupes = dupes.sort_values("Address")
# Get all of the folders that end with ROSS
to_drop = dupes[dupes["survey_folder"].str.endswith("ROSS")]["survey_folder"].unique().tolist()
extracted_data = extracted_data[
~extracted_data["survey_folder"].isin(
[
"StonewaterSurveys_10/4 Beech Road, LUTON, LU1 1DP ROSS",
"StonewaterSurveys_2/135 Runley Road, LUTON, LU1 1TX ROSS",
"StonewaterSurveys_13/7 Saxon Road, LUTON, LU3 1JR ROSS"
] + to_drop
)
]
# We now merge on the coordinator data so that against each property, we can map the measures
retrofit_packages_board = pd.read_excel(
os.path.join(
CUSTOMER_FOLDER_PATH,
"Stonewater_SHDF_3_0_Board_work_in_progress_-_Operations_1731315080 11.11.24.xlsx"
),
header=4
)
retrofit_packages_board = retrofit_packages_board[~pd.isnull(retrofit_packages_board["Name"])]
# Take just the rows that have been surveyed
retrofit_packages_board = retrofit_packages_board[
retrofit_packages_board["RA"].isin(["Invoiced", "Completed"])
]
# Replace \n with ""
extracted_data["Postcode"] = extracted_data["Postcode"].str.replace("\n", "")
manual_filters = {
"Flat 21 Walmer Street": "StonewaterSurveys_14/91-1-Flat 21 Walmer Street-HR4 9JD",
"6 Cornewall Close": "StonewaterSurveys_14/aa 6, Cornewall Close, Moccas, HEREFORD, HR2 9LG",
"2 Bromyard Road": "StonewaterSurveys_4/192-9-2 Bromyard Road-WR15 8BZ",
'Flat 18, 1 Raglan Court': "StonewaterSurveys_13/60-3-18 Raglan Court, 1 Raglan Court-MK41 8QT",
'14 Raglan Court, 1 Devizes Avenue': 'StonewaterSurveys_12/55-3-14 Raglan Court, Devizes Avenue-MK41 8QT',
'19 South Road': 'StonewaterSurveys_4/19 The Oaks, South Road, SMETHWICK, B67 7BY',
'Flat 12 Pelican Lane': 'StonewaterSurveys_1/121-3-Flat 12 Lynton Court, Pelican Lane-RG14 1NN',
'Flat C, 44 St Leonards Avenue': 'StonewaterSurveys_11/427-2-44c St. Leonards Avenue-MK42 0RB',
'16 The Crescent, Kington': 'StonewaterSurveys_9/360-3-16 The Crescent-HR5 3AS',
'2 School Lane, Leominster': 'StonewaterSurveys_5/224-1-2 School Lane-HR6 8AA',
'14 South Road': 'StonewaterSurveys_2/14 The Oaks, South Road, SMETHWICK, B67 7BY',
'1 Groves Street': 'StonewaterSurveys_4/19-5-1 Groves Street-SN2 2BW',
# '2 Sorrell Place': '',
# '72 St Ives Road': '',
# '1 The Close, Burton Gardens': '',
# '102 Cheaton Close': '',
# 'Flat 16 Spring Gardens': '',
# '4 Apple Close': '',
# '25 Folly Lane': '',
'2 Calshot Walk': 'StonewaterSurveys_3/156-3-2 Calshot Walk-MK41 8QS',
'21 Constitution Hill': 'StonewaterSurveys_1/112-11-21 Constitution Hill-BH14 0PX',
'22 Constitution Hill': 'StonewaterSurveys_4/185-8-22 Constitution Hill-BH14 0PX',
'2 Marches Cottages, School Lane, Leominster': 'StonewaterSurveys_5/224-1-2 School Lane-HR6 8AA',
'26, Copthorn House, Brighton Road': 'StonewaterSurveys_15/133-1-26 Brighton Road-KT20 6BQ',
'4, Old St Marys, Ripley Lane': "StonewaterSurveys_15/433-3-4 Ripley Lane-KT24 6JG",
'1 Nelson House, Short Street': 'StonewaterSurveys_15/89-2-1 Short Street-GU11 1HX',
"18 Nelson House, Short Street": 'StonewaterSurveys_15/25-3- 18 Short Street- GU11 1HX',
'3 Nelson House, Short Street': 'StonewaterSurveys_2/138-1-3 Short Street-GU11 1HX',
'16, Copthorn House, Brighton Road': 'StonewaterSurveys_13/78-3-16 Brighton Road-KT20 6BQ',
'20 Nelson House, Short Street': 'StonewaterSurveys_15/89-1-20 Short Street-GU11 1HX',
'7 Croft Street': 'StonewaterSurveys_8/333-2-7 Croft Street-HR6 8LA'
}
# We now match this retrofit packages board to the extracted data
matching_lookup = []
for _, home in tqdm(retrofit_packages_board.iterrows(), total=len(retrofit_packages_board)):
# Handle the case that has the wrong postcode in the asset data
if home["Name"] in manual_filters:
filtered = extracted_data[extracted_data["survey_folder"] == manual_filters[home["Name"]]].copy()
else:
filtered = extracted_data[extracted_data["Postcode"].str.lower() == home["Postcode"].lower()].copy()
# We check that home["Name"] is contained in the survey_folder, after removing punctuation and spaces
to_filter = filtered["survey_folder"].str.replace(r"[^\w\s]", "").str.contains(
home["Name"].replace(r"[^\w\s]", "").replace("Flat", "").lstrip(), case=False
)
if to_filter.sum() == 0:
to_filter = filtered["survey_folder"].str.replace(r"[^\w\s]", "").str.replace(",", "").str.replace(".",
"").str.contains(
home["Name"].replace(r"[^\w\s]", "").replace(",", ""), case=False
)
filtered = filtered[to_filter]
if filtered.empty:
continue
if filtered.shape[0] == 1:
matching_lookup.append(
{
"survey_folder": filtered["survey_folder"].values[0],
"Address ID": home["Address ID"],
"Name": home["Name"]
}
)
continue
# home["Name"] should be contained in the survey_folder
filtered = filtered[filtered["survey_folder"].str.contains(home["Name"], case=False)]
# We have an edge case wher some properties have two outputs in Sharepoint
if home["Name"] == "197 Granby Court" and home["Postcode"] == "MK1 1NQ":
raise Exception("Fix me1")
# filtered = filtered[filtered["survey_folder"] == "113-1-197 Granby Court-MK1 1NQ"]
if home["Name"] == '1 Cluny Way' and home["Postcode"] == 'SG15 6ZB':
raise Exception("Fix me2")
# filtered = filtered[filtered["survey_folder"] == "12-1-1 Cluny Way-SG15 6ZB"]
if home["Name"] == '2 Bromyard Road' and home["Postcode"] == 'WR15 8BZ':
filtered = filtered[filtered["survey_folder"] == "StonewaterSurveys_4/192-9-2 Bromyard Road-WR15 8BZ"]
if filtered.empty:
continue
if filtered.shape[0] != 1:
raise Exception("something went wrong")
matching_lookup.append(
{
"survey_folder": filtered["survey_folder"].values[0],
"Address ID": home["Address ID"],
"Name": home["Name"]
}
)
matching_lookup = pd.DataFrame(matching_lookup)
# Find Osmosis IDs that are in the packages board but not in the matching looking
missing_ids = set(retrofit_packages_board["Address ID"]) - set(matching_lookup["Address ID"])
missing_ids = list(missing_ids)
if missing_ids:
# We check that the missing ids have no data yet
# missed = retrofit_packages_board[retrofit_packages_board["Address ID"].isin(missing_ids)]
# missed[["Name", "Postcode", "Archetype ID", "Arch. Group Rank"]].to_csv(
# CUSTOMER_FOLDER_PATH + "/missed_debugging.csv")
if len(missing_ids) != 6:
raise Exception("Unacceptable number of missings")
if matching_lookup["Address ID"].duplicated().sum():
raise Exception("Duplicate Address IDs")
if matching_lookup["survey_folder"].duplicated().sum():
raise Exception("Duplicate survey folders")
measure_columns = [
'Main Wall Insulation',
'Secondary Wall Insulation',
'Loft insulation',
'Flat Roof',
'Room in Roof',
'Window Upgrade',
'Door Upgrade',
'Ventilation',
'Main Heating',
'Water Heating',
'Heating Controls',
'Solar PV',
'Other measures'
]
# We should end up with a 1:1 mapping between the Osm. ID and the survey folder
stonewater_data = extracted_data.merge(matching_lookup, on="survey_folder", how="inner").merge(
retrofit_packages_board[
[
"Name",
"RA",
"Address ID",
"Archetype ID",
"Arch. Group Rank",
"Actual SAP Band",
"Actual SAP Rating",
"Modelled SAP Band",
"Modelled SAP Rating",
"Package Ref",
] + measure_columns
],
on=["Address ID", "Name"],
how="left"
)
if stonewater_data["Address ID"].duplicated().sum():
raise Exception("Duplicate Address IDs")
# Create a section for costs
for measure in measure_columns:
stonewater_data[f"Cost of {measure}"] = None
stonewater_data["Total Cost of Measures"] = None
stonewater_data["Contingency Cost"] = None
stonewater_data["Total Cost of Measures inc Contingency"] = None
# We've appended the recommended packages and modelled SAP ratings to the data
# We also want to append the windows data
windows_data = pd.read_excel(
os.path.join(
CUSTOMER_FOLDER_PATH,
"Window data included AP Copy Stonewater SHDF_3_0_Board Triage Master Filtered 26.07.24.xlsx"
),
header=12
)
windows_data = windows_data[windows_data["Address ID"] != "Address ID"]
windows_data = windows_data[~pd.isnull(windows_data["Address ID"])]
# We get a lookup id of Osm.ID and when the windows were fitted
windows_data = windows_data[
["Address ID", "Window attributes - Fitted/renewed date",
"Parent Asset Window attributes - Fitted/renewed date"]
]
# Convert to string for the moment
windows_data["Parent Asset Window attributes - Fitted/renewed date"] = windows_data[
"Parent Asset Window attributes - Fitted/renewed date"
].astype(str)
# Create a single date column
windows_data["Fitted/renewed date"] = np.where(
pd.notnull(windows_data["Window attributes - Fitted/renewed date"]),
windows_data["Window attributes - Fitted/renewed date"],
windows_data["Parent Asset Window attributes - Fitted/renewed date"]
)
# Convert to a date
windows_data["Fitted/renewed date"] = pd.to_datetime(windows_data["Fitted/renewed date"])
# Calculate the number of years since something was done on the windows
windows_data["Years since fitted/renewed"] = (pd.Timestamp.now() - windows_data[
"Fitted/renewed date"]).dt.days / 365
stonewater_data["Package Includes Windows"] = ~pd.isnull(stonewater_data["Window Upgrade"])
windows_data["Address ID"] = windows_data["Address ID"].astype(float)
stonewater_data = stonewater_data.merge(windows_data, on="Address ID", how="left")
stonewater_data = stonewater_data.sort_values("Archetype ID", ascending=True)
if stonewater_data["Address ID"].duplicated().sum():
raise Exception("Duplicate Address IDs")
for c in [
'Window attributes - Fitted/renewed date',
'Parent Asset Window attributes - Fitted/renewed date',
'Fitted/renewed date'
]:
stonewater_data[c] = stonewater_data[c].astype(str)
# Save this data to excel
stonewater_data.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - costed retrofit packages V3.xlsx", index=False)
cost_sheet = [
{
"measure": "EWI 0.30 w.m2.K", "cost": 298.35, "unit": "m2"
},
{
"measure": "CWI RdSAP Default", "cost": 14.21, "unit": "m2"
},
{
"measure": "Poss Extract CWI & Refill (issues identified)", "cost": 14.21 + 25, "unit": "m2"
},
{
"measure": "IWI 0.30 w.m2.K", "cost": 244.80, "unit": "m2"
},
{
"measure": "EWI/IWI 0.3", "cost": (298.35 + 244.8) / 2, "unit": "m2"
},
{
"measure": "Loft Insulation 0.11 w.m2.K", "cost": 16.07, "unit": "m2"
},
{
"measure": "Flat Roof 0.11 w.m2.K", "cost": 195, "unit": "m2"
},
{
"measure": "DG Window 1.30 w.m2.K", "cost": 1140, "unit": "each"
},
{
"measure": "Secondary 2.40", "cost": 974, "unit": "each"
},
{
"measure": "Ins. Door 1.30 w.m2.K", "cost": None, "unit": "each"
},
{
"measure": "Ins. Door 1.40 w.m2.K", "cost": None, "unit": "each"
},
{
"measure": "DMEV", "cost": 900, "unit": "each"
},
{
"measure": "ASHP Vaillant 102607 5kw", "cost": None, "unit": "each"
},
{
"measure": "HHRSH Quantum 150", "cost": None, "unit": "each"
},
{
"measure": "Dual Stat Tank 210lt 50mm Foam", "cost": None, "unit": "each"
},
{
"measure": "Dual Stat Tank 160lt 50mm Foam", "cost": None, "unit": "each"
},
{
"measure": "Dual Stat Tank 110lt 50mm Foam", "cost": None, "unit": "each"
},
{
"measure": "Smart Thermostat", "cost": 1200, "unit": "each"
},
{
"measure": "TRV's", "cost": 350, "unit": "each"
},
{
"measure": "Solar PV - 3.0kwp", "cost": 4365.0, "unit": "each"
},
{
"measure": "Solar PV - 1.5kwp", "cost": 3881, "unit": "each"
},
{
"measure": "LEL", "cost": 35, "unit": "per bulb"
},
{
"measure": "Roof 0.16 - Walls 0.30", "cost": 180, "unit": "floor area m2"
},
{
"measure": "Roof 0.16 - Walls 0.16", "cost": 180, "unit": "floor area m2"
},
]
cost_sheet = pd.DataFrame(cost_sheet)
# Save cost sheet - ideally this will be used as a secondary sheet for Stonewater
cost_sheet.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - cost sheet.xlsx", index=False)
# stonewater_data[~pd.isnull(stonewater_data["Room in Roof"])]["survey_folder"].values
create_proposed_wave_3_bid(
costed_packages_filepath=os.path.join(
CUSTOMER_FOLDER_PATH, "Stonewater - Costed Retrofit Packages 20241030 (WIP) Single Model V3.xlsx"
),
archetypes_sheet_filepath=os.path.join(
CUSTOMER_FOLDER_PATH, "Stonewater SHDF_3_0_Board Triage 22.05.24 - Archetyped V3.1.xlsx"
)
)
def create_proposed_wave_3_bid(costed_packages_filepath, archetypes_sheet_filepath):
# We read in the costed packages
costed_packages = pd.read_excel(costed_packages_filepath, header=13, sheet_name="Modelled Packages")
costed_packages = costed_packages[~pd.isnull(costed_packages["Address"])]
archetypes_to_cost = costed_packages[
[
"Name", "Address ID", "Archetype ID", "Current SAP Rating", "Current EPC Band", "Modelled SAP Band",
"Modelled SAP Rating", "Package Ref", 'Total Cost of Measures', 'Contingency Cost',
'Total Cost of Measures inc Contingency', 'Main Roof Type', 'Main Roof Insulation',
'Main Roof Insulation Thickness', 'Existing Primary Heating System',
'Existing Primary Heating PCDF Reference'
]
].copy()
# Combine 'Main Roof Type', 'Main Roof Insulation', 'Main Roof Insulation Thickness', separating by colons!
archetypes_to_cost['Surveyed Main Roof'] = (
archetypes_to_cost['Main Roof Type'] + ': ' + archetypes_to_cost['Main Roof Insulation'] + ': ' +
archetypes_to_cost['Main Roof Insulation Thickness'].astype(str)
)
# Combine the heating systems, separating by colons!
archetypes_to_cost['Surveyed Main Heating'] = (
archetypes_to_cost['Existing Primary Heating System'] + ': code - ' + archetypes_to_cost[
'Existing Primary Heating PCDF Reference'].astype(str)
)
archetypes_to_cost = archetypes_to_cost.drop(
columns=['Main Roof Type', 'Main Roof Insulation', 'Main Roof Insulation Thickness',
'Existing Primary Heating System',
'Existing Primary Heating PCDF Reference'])
# We take properties that are EPC D and below (59% of units)
archetypes_to_cost = archetypes_to_cost[archetypes_to_cost["Current EPC Band"].isin(["D", "E", "F", "G"])]
archetypes_to_cost["Has been modelled"] = ~pd.isnull(archetypes_to_cost["Modelled SAP Band"])
# These are the Arhetypes that will likely be suitable for Wave 3
archetypes_sheet = pd.read_excel(archetypes_sheet_filepath, header=4)
archetypes_sheet = archetypes_sheet[~pd.isnull(archetypes_sheet["Address ID"])]
archetypes_sheet = archetypes_sheet[archetypes_sheet["Address ID"] != "Address ID"]
archetypes_sheet["Address ID"] = archetypes_sheet["Address ID"].astype(int)
# We merge the property details onto the costed archetypes
archetypes_to_cost = archetypes_to_cost.merge(
archetypes_sheet[["Address ID", "Property Type", "Wall Type", "Roof Type", "Heating"]],
on="Address ID",
how="left"
)
proposed_sample = archetypes_sheet[
archetypes_sheet["Archetype ID"].astype(str).isin(archetypes_to_cost["Archetype ID"].astype(int).astype(str))
]
not_proposed = archetypes_sheet[
~archetypes_sheet["Archetype ID"].astype(str).isin(archetypes_to_cost["Archetype ID"].astype(int).astype(str))
]
# archetypes_without_survey = []
# for p in list(set(not_proposed)):
# filtered = costed_packages[costed_packages["Archetype ID"].astype(int).astype(str) == p]
# if filtered.empty:
# archetypes_without_survey.append(p)
# Can we propose anything about archetypes that were not surveyed?
proposed_sample = proposed_sample[
[
"Name", "Postcode", "UPRN", "UDPRN", "Address ID", "Osm. ID", "Archetype ID",
"Property Type", "Wall Type", "Roof Type", "Heating"
]
]
# We classify into high and low confidence
archetypes_to_cost["Surveyed Main Roof"] = archetypes_to_cost["Surveyed Main Roof"].fillna("")
match_classification = []
for _, home in tqdm(proposed_sample.iterrows(), total=len(proposed_sample)):
surveyed = archetypes_to_cost[archetypes_to_cost["Archetype ID"] == home["Archetype ID"]].copy()
surveyed["Package Ref"] = surveyed["Package Ref"].astype(str)
package = " or ".join(sorted([x for x in surveyed["Package Ref"].unique() if x.strip()]))
package = package.replace("\n", "")
surveyed_roofs = " or ".join(sorted([x for x in surveyed["Surveyed Main Roof"].unique() if x.strip()]))
surveyed_roofs = surveyed_roofs.replace("\n", "")
surveyed_heating = " or ".join(sorted([x for x in surveyed["Surveyed Main Heating"].unique() if x.strip()]))
surveyed_heating = surveyed_heating.replace("\n", "")
# We now check if we have a perfect match
surveyed = surveyed[
(surveyed["Property Type"] == home["Property Type"]) &
(surveyed["Wall Type"] == home["Wall Type"]) &
(surveyed["Roof Type"] == home["Roof Type"]) &
(surveyed["Heating"] == home["Heating"])
]
if surveyed.empty:
if package == "2B2A":
raise Exception("Fix me")
match_classification.append(
{
"Address ID": home["Address ID"],
"Match to Surveyed": "Approximate",
"Proposed Package Ref": package,
"Surveyed Archetype Roofs": surveyed_roofs,
"Surveyed Archetype Heating": surveyed_heating
}
)
continue
# Re-do
package = " or ".join(sorted([x for x in surveyed["Package Ref"].unique() if x.strip()]))
package = package.replace("\n", "")
surveyed_roofs = " or ".join(sorted([x for x in surveyed["Surveyed Main Roof"].unique() if x.strip()]))
surveyed_roofs = surveyed_roofs.replace("\n", "")
surveyed_heating = " or ".join(sorted([x for x in surveyed["Surveyed Main Heating"].unique() if x.strip()]))
surveyed_heating = surveyed_heating.replace("\n", "")
match_classification.append(
{
"Address ID": home["Address ID"],
"Match to Surveyed": "Exact",
"Proposed Package Ref": package,
"Surveyed Archetype Roofs": surveyed_roofs,
"Surveyed Archetype Heating": surveyed_heating
}
)
match_classification = pd.DataFrame(match_classification)
proposed_sample = proposed_sample.merge(
match_classification,
on="Address ID",
how="left",
)
# Merge on the cost per archetype
cost_per_archetype = (
archetypes_to_cost.groupby("Archetype ID")[['Total Cost of Measures inc Contingency']].mean().reset_index()
)
proposed_sample = proposed_sample.merge(
cost_per_archetype,
on="Archetype ID",
how="left"
)
# We add on a boolean to indicate if a property from that archetype has been modelled
proposed_sample = proposed_sample.merge(
archetypes_to_cost.groupby("Archetype ID")[["Has been modelled"]].any().reset_index(),
on="Archetype ID",
how="left"
)
proposed_sample["Total Cost of Measures inc Contingency"] = np.where(
~proposed_sample["Has been modelled"],
None, proposed_sample["Total Cost of Measures inc Contingency"]
)
proposed_sample = proposed_sample.sort_values("Archetype ID", ascending=True)
# Save excel
proposed_sample.to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - Proposed Wave 3 Bid V2 (WIP).xlsx", index=False)
# For each postcode that's in the bid, we also summarise the number of units in the bid and number left out
proposed_sample_postcodes = proposed_sample["Postcode"].unique()
postcode_summary = []
for postcode in proposed_sample_postcodes:
in_proposal = proposed_sample[proposed_sample["Postcode"] == postcode]
not_in_proposal = not_proposed[not_proposed["Postcode"] == postcode]
postcode_summary.append(
{
"Postcode": postcode,
"Number of properties in Proposal": len(in_proposal),
"Number of properties not in Proposal": len(not_in_proposal)
}
)
postcode_summary = pd.DataFrame(postcode_summary)
postcode_summary = postcode_summary.sort_values(
"Number of properties not in Proposal",
ascending=False).reset_index(drop=True)
postcode_summary.to_excel(
CUSTOMER_FOLDER_PATH + "/Stonewater - Proposed Wave 3 Bid Postcode Summary.xlsx", index=False
)
def find_remaining_surveys():
"""
This compares a list of properties that have been surveyed against a list of properties that I have produced
costed retrofit packages for, so I know what needs to be downloaded from Sharepoint
:return:
"""
surveyed = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater"
"/Stonewater_SHDF_3_0_Board_work_in_progress_- 07.11.24.xlsx",
header=4
)
costed = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater - Costed Retrofit Packages "
"20241030 (WIP) MR Review v1.xlsx",
header=13,
sheet_name="Modelled Packages"
)
costed = costed[~pd.isnull(costed["Address ID"])]
needed = surveyed[~surveyed["Address ID"].isin(costed["Address ID"])]
needed["id"] = needed["Archetype ID"].astype(str) + "-" + needed["Arch. Group Rank"].astype(str)
needed = needed.sort_values("id", ascending=True)
needed[["id", "Name", "Postcode"]].to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/needed_surveys.csv"
)
assert needed.shape[0] + costed.shape[0] == surveyed.shape[0]
def append_stonewater_id():
"""
This completes an adhoc request from Stonewater to add in their organisation Reference onto the model
:return:
"""
model_proposed_sample = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater - Bid Packages WIP 13.11.24.xlsx",
sheet_name="Modelled Packages",
header=13
)
model_proposed_sample = model_proposed_sample[~pd.isnull(model_proposed_sample["Address ID"])]
model_proposed_sample["Address ID"] = model_proposed_sample["Address ID"].astype(int)
z = model_proposed_sample["Archetype ID"].drop_duplicates().sort_values()
original_archetypes = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 "
"- Archetyped V3.1.xlsx",
header=4
)
original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])]
original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"]
original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int)
matched = model_proposed_sample.merge(
original_archetypes[["Address ID", 'Org. ref.']],
on="Address ID",
how="left"
)
if pd.isnull(matched["Org. ref."]).sum():
raise ValueError("Something went wrong")
# Save as CSV
matched.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater IDs.xlsx",
sheet_name="Proposed Wave 3 Sample",
index=False
)
def propsed_wave_3_sample():
"""
Stonewater want to ensure that the properties that when selecting properties for wave 3, they choose properties
such that most of the properties within a geographical area are treatable within the bid.
Name, if we take a geographical area (which could be postal region) they want the most, and ideally all, of the
properties within that geographical area to be included within the bid
:return:
"""
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 "
"- Archetyped V3.1.xlsx",
header=4
)
# TODO: We drop 302 properties that are not priority postcodes - confirm w/ Stonewater and 7 properties missing
# UPRN
asset_list = asset_list[~asset_list["Archetype ID"].isin(["MISSING UPRN"])]
# Clean address ids
asset_list = asset_list[~pd.isnull(asset_list["Address ID"])]
asset_list = asset_list[asset_list["Address ID"] != "Address ID"]
asset_list["Address ID"] = asset_list["Address ID"].astype(int)
# Create the postal region, taking the first part of the postcode
asset_list["Postal Region"] = asset_list["Postcode"].str.split(" ").str[0]
asset_list["Street and Region"] = asset_list["Street name"] + " " + asset_list["Postal Region"]
unique_postal_regions = asset_list["Postal Region"].unique()
# Keep just the columns we need
asset_list = asset_list[
["UPRN", "Address ID", "Archetype ID", "Postal Region", "Postcode", "Street and Region",
"Property Type", "Wall Type", "Roof Type", "Heating"]
]
# Updated packages: to_excel(CUSTOMER_FOLDER_PATH + "/Stonewater - costed retrofit packages V3.xlsx", index=False)
survey_results = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - Bid Packages WIP 14.11.24.xlsx"),
header=13,
sheet_name="Modelled Packages"
)
additional_survey_data = pd.read_excel(
os.path.join(CUSTOMER_FOLDER_PATH, "Stonewater - costed retrofit packages V3.xlsx"),
header=0
)
survey_results = survey_results.merge(
additional_survey_data[
[
"Address ID",
"Main Wall Type", "Main Wall Insulation_x", "Main Wall Thickness",
"Main Building Alternative Wall Type", "Main Building Alternative Wall Insulation",
"Main Building Alternative Wall Thickness"
]
].rename(columns={"Main Wall Insulation_x": "Main Wall Insulation Type"}),
how="left",
on="Address ID"
)
# TOOD: We probably want the actual surveyed wall, roof, heating type
survey_results = survey_results[
[
"Address ID", "Archetype ID", "Current SAP Rating", "Current EPC Band", "Postcode",
"Main Roof Type", "Main Roof Insulation", "Main Roof Insulation Thickness",
"Existing Primary Heating System",
"Main Wall Type", "Main Wall Insulation Type", "Main Wall Thickness",
"Main Building Alternative Wall Type", "Main Building Alternative Wall Insulation",
"Main Building Alternative Wall Thickness"
]
].rename(
columns={
"Existing Primary Heating System": "Surveyed Primary Heating System"
}
)
# Concatenate from the wall information
survey_results["Surveyed: Wall Type"] = survey_results["Main Wall Type"] + ": " + survey_results[
"Main Wall Insulation Type"]
# Alternative wall
survey_results["Survey: Main Alternative Wall"] = (
survey_results["Main Building Alternative Wall Type"] + ": " + survey_results[
"Main Building Alternative Wall Insulation"]
)
# Roof information
survey_results["Survey: Type"] = survey_results["Main Roof Type"] + ": " + survey_results[
"Main Roof Insulation"] + ": " + survey_results["Main Roof Insulation Thickness"].astype(str)
# Drop the individual columns:
survey_results = survey_results.drop(
columns=[
"Main Roof Type", "Main Roof Insulation", "Main Roof Insulation Thickness",
"Main Wall Type", "Main Wall Insulation Type",
"Main Building Alternative Wall Type", "Main Building Alternative Wall Insulation"
]
)
survey_results_with_original_features = survey_results.merge(
asset_list[["UPRN", "Address ID", "Property Type", "Wall Type", "Roof Type", "Heating"]],
on="Address ID",
how="left"
)
if survey_results_with_original_features.shape[0] != survey_results.shape[0]:
raise ValueError("Something went wrong")
# We get longitude & Latitude
archetyping_spatial_features = read_pickle_from_s3(
bucket_name="retrofit-data-dev", s3_file_name="scustomers/Stonewater/clustering/spatial_data_to_uprn.pkl",
)
archetyping_spatial_features = pd.concat(archetyping_spatial_features)
archetyping_spatial_features = archetyping_spatial_features[["UPRN", 'LATITUDE', 'LONGITUDE']].rename(
columns={"LATITUDE": "latitude", "LONGITUDE": "longitude"}
)
# Merge them onto both datasets
asset_list = asset_list.merge(
archetyping_spatial_features, how="left", on="UPRN"
)
if pd.isnull(asset_list["longitude"]).sum():
raise ValueError("Something went wrong")
survey_results_with_original_features = survey_results_with_original_features.merge(
archetyping_spatial_features, how="left", on="UPRN"
)
if pd.isnull(survey_results_with_original_features["longitude"]).sum():
raise ValueError("Something went wrong")
def haversine(lat1, lon1, lat2, lon2):
# Radius of Earth in meters
R = 6371000
# Convert degrees to radians
lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
# Differences
dlat = lat2 - lat1
dlon = lon2 - lon1
# Haversine formula
a = np.sin(dlat / 2.0) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2.0) ** 2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
distance = R * c
return distance
# Tier definitions
# Tier 1: We have a property in the same postal region and same archetype that was surveyed and is below EPC D
# Tier 2: We have a property in the same archetype that was surveyed and is below EPC D
#
def match_property_to_surveyed(property, survey_results_with_original_features):
surveyed = survey_results_with_original_features[
(
survey_results_with_original_features["Property Type"] ==
property["Property Type"]
) &
(
survey_results_with_original_features["Wall Type"] ==
property["Wall Type"]
) &
(
survey_results_with_original_features["Roof Type"] ==
property["Roof Type"]
) &
(
survey_results_with_original_features["Heating"] ==
property["Heating"]
)
].copy()
if not surveyed.empty:
return surveyed
surveyed = survey_results_with_original_features[
(
survey_results_with_original_features["Property Type"] ==
property["Property Type"]
) &
(
survey_results_with_original_features["Wall Type"] ==
property["Wall Type"]
) &
(
survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
property["Roof Type"].split(":")[0]
) &
(
survey_results_with_original_features["Heating"] ==
property["Heating"]
)
].copy()
if not surveyed.empty:
return surveyed
surveyed = survey_results_with_original_features[
(
survey_results_with_original_features["Property Type"] ==
property["Property Type"]
) &
(
survey_results_with_original_features["Wall Type"] ==
property["Wall Type"]
) &
(
survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
property["Roof Type"].split(":")[0]
) &
(
survey_results_with_original_features["Heating"].str.split(":").str[0] ==
property["Heating"].split(":")[0]
)
].copy()
return surveyed
results = []
for region in tqdm(unique_postal_regions):
# Take all of the properties in that region
region_assets = asset_list[asset_list["Postal Region"] == region].copy()
# We have a tier 1 match if the property itself was surveyed
exact_surveyed = survey_results[
survey_results["Address ID"].isin(region_assets["Address ID"])
]
region_assets = region_assets.merge(
exact_surveyed[["Address ID", "Current EPC Band"]],
on="Address ID",
how="left"
)
# Label the tier 1 properties
region_assets["Confidence Tier"] = None
region_assets["Confidence Tier"] = np.where(
region_assets["Current EPC Band"].isin(["D", "E", "F", "G"]),
"1 - property was surveyed", region_assets["Confidence Tier"]
)
region_assets["Confidence Tier"] = np.where(
region_assets["Current EPC Band"].isin(["C", "B", "A"]),
"5 - property was surveyed", region_assets["Confidence Tier"]
)
archetypes = region_assets[
pd.isnull(region_assets["Confidence Tier"])
]["Archetype ID"].unique()
# We get the properties that have been surveyed
region_surveyed = survey_results[
survey_results["Archetype ID"].isin(archetypes) &
(survey_results["Postal Region"] == region)
][["Archetype ID", "Current EPC Band"]].drop_duplicates()
if region_surveyed["Archetype ID"].duplicated().sum():
region_surveyed = []
for arch_id in archetypes:
for _, property in region_assets[region_assets["Archetype ID"] == arch_id].iterrows():
archetype_data = survey_results_with_original_features[
survey_results["Archetype ID"] == arch_id
].copy()
if archetype_data.empty:
continue
if archetype_data.shape[0] > 1:
# Look for an exact match, or as close as possible
archetype_data_filtered = match_property_to_surveyed(property, archetype_data)
if not archetype_data_filtered.empty:
archetype_data = archetype_data_filtered
archetype_data["distance_meters"] = haversine(
lat1=property.latitude, lon1=property.longitude,
lat2=archetype_data["latitude"].values, lon2=archetype_data["longitude"].values
)
expected_sap = np.average(
archetype_data["Current SAP Rating"], weights=1 / (archetype_data["distance_meters"] + 1)
)
expected_epc = sap_to_epc(expected_sap)
region_surveyed.append(
{
"Archetype ID": arch_id,
"Address ID": property["Address ID"],
"Current EPC Band": expected_epc
}
)
region_surveyed = pd.DataFrame(region_surveyed)
region_assets = region_assets.merge(
region_surveyed,
on=["Archetype ID", "Address ID"],
how="left",
suffixes=("", "_method1")
)
else:
region_assets = region_assets.merge(
region_surveyed,
on="Archetype ID",
how="left",
suffixes=("", "_method1")
)
# Label the tier 1 properties
region_assets["Confidence Tier"] = np.where(
region_assets["Current EPC Band_method1"].isin(["D", "E", "F", "G"]) &
pd.isnull(region_assets["Confidence Tier"]),
"1 - Archetype surveyed", region_assets["Confidence Tier"]
)
region_assets["Current EPC Band"] = np.where(
pd.isnull(region_assets["Current EPC Band"]) & pd.notnull(region_assets["Current EPC Band_method1"]),
region_assets["Current EPC Band_method1"], region_assets["Current EPC Band"]
)
# Handle EPC C
region_assets["Confidence Tier"] = np.where(
region_assets["Current EPC Band"].isin(["C", "B", "A"]) & pd.isnull(region_assets["Confidence Tier"]),
"5 - EPC C or above", region_assets["Confidence Tier"]
)
region_assets = region_assets.drop(columns=["Current EPC Band_method1"])
# TODO: Turn into a function
missed_archetypes = set(archetypes) - set(region_surveyed["Archetype ID"])
archetype_surveyed = survey_results[
survey_results["Archetype ID"].isin(missed_archetypes)
][["Archetype ID", "Current EPC Band"]].drop_duplicates()
if archetype_surveyed["Archetype ID"].duplicated().sum():
archetype_surveyed = []
for arch_id in missed_archetypes:
for _, property in region_assets[region_assets["Archetype ID"] == arch_id].iterrows():
archetype_data = survey_results_with_original_features[
survey_results["Archetype ID"] == arch_id
].copy()
if archetype_data.empty:
continue
archetype_data["distance_meters"] = haversine(
lat1=property.latitude, lon1=property.longitude,
lat2=archetype_data["latitude"].values, lon2=archetype_data["longitude"].values
)
expected_sap = np.average(
archetype_data["Current SAP Rating"], weights=1 / (archetype_data["distance_meters"] + 1)
)
expected_epc = sap_to_epc(expected_sap)
archetype_surveyed.append(
{
"Archetype ID": arch_id,
"Address ID": property["Address ID"],
"Current EPC Band": expected_epc
}
)
archetype_surveyed = pd.DataFrame(archetype_surveyed)
region_assets = region_assets.merge(
archetype_surveyed,
on=["Archetype ID", "Address ID"],
how="left",
suffixes=("", "_method2")
)
else:
region_assets = region_assets.merge(
archetype_surveyed,
on="Archetype ID",
how="left",
suffixes=("", "_method2")
)
region_assets["Confidence Tier"] = np.where(
region_assets["Current EPC Band_method2"].isin(["D", "E", "F", "G"]) & pd.isnull(
region_assets["Confidence Tier"]),
"2 - same archetype", region_assets["Confidence Tier"]
)
region_assets["Current EPC Band"] = np.where(
pd.isnull(region_assets["Current EPC Band"]) & pd.notnull(region_assets["Current EPC Band_method2"]),
region_assets["Current EPC Band_method2"], region_assets["Current EPC Band"]
)
region_assets = region_assets.drop(columns=["Current EPC Band_method2"])
# We label EPC C properties
region_assets["Confidence Tier"] = np.where(
region_assets["Current EPC Band"].isin(["C", "B", "A"]) & pd.isnull(region_assets["Confidence Tier"]),
"5 - EPC C or above", region_assets["Confidence Tier"]
)
region_assets["Confidence Tier"] = np.where(
region_assets["Archetype ID"] == "EPC C OR ABOVE",
"5 - EPC C or above", region_assets["Confidence Tier"]
)
region_assets["Current EPC Band"] = np.where(
region_assets["Archetype ID"] == "EPC C OR ABOVE",
"C", region_assets["Current EPC Band"]
)
missed_addressids = region_assets[pd.isnull(region_assets["Confidence Tier"])]["Address ID"].unique().tolist()
if not missed_addressids:
results.append(region_assets)
continue
# This means that this archetype was never surveyed and so we need to find a sufficiently similar property
final_missed_matches = []
for a_id in missed_addressids:
match_type = "3 - compared to similar properties"
property = asset_list[asset_list["Address ID"] == a_id].squeeze()
surveyed = match_property_to_surveyed(property, survey_results_with_original_features)
if surveyed.empty:
match_type = "3 - compared to similar properties, relaxed"
# In this case, we do one additional check where we filter on everything the same apart from heating,
# where we do a slightly more rough match
surveyed = survey_results_with_original_features[
(
survey_results_with_original_features["Property Type"].str.split(":").str[0] ==
property["Property Type"].split(":")[0]
) &
(
survey_results_with_original_features["Wall Type"].str.split(":").str[0] ==
property["Wall Type"].split(":")[0]
) &
(
survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
property["Roof Type"].split(":")[0]
)
].copy()
if surveyed.empty:
if property["Property Type"].split(":")[0] in ["House", "Bungalow", "Maisonette"]:
filter_property_types = ["House", "Bungalow", ]
else:
filter_property_types = ["Flat"]
surveyed = survey_results_with_original_features[
(
survey_results_with_original_features["Property Type"].str.split(":").str[0].isin(
filter_property_types
)
) &
(
survey_results_with_original_features["Wall Type"].str.split(":").str[0] ==
property["Wall Type"].split(":")[0]
) &
(
survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
property["Roof Type"].split(":")[0]
)
].copy()
if "Electric" in property["Heating"]:
# Take other electric heating systems
surveyed = surveyed[surveyed["Heating"].str.contains("Electric")]
elif property["Heating"] in [
"Community Heating Systems: Community boilers only (RdSAP)",
"Community Heating Systems: Community CHP and boilers (RdSAP)"
]:
# Take other community heating systems
surveyed = surveyed[surveyed["Heating"].str.contains("Community")]
elif property["Heating"] == 'Heat Pump: (from database)':
# Take other heat pumps
surveyed = surveyed[surveyed["Heating"].str.contains("Heat Pump")]
elif property["Heating"] == "Solid fuel room heaters: Open fire in grate":
# Take other properties with room heaters
surveyed = surveyed[surveyed["Heating"].str.contains("room heaters")]
elif "Boiler" in property["Heating"]:
# Take other properties with boilers
surveyed = surveyed[surveyed["Heating"].str.contains("Boiler")]
else:
raise Exception("Fix me")
if surveyed.empty:
final_missed_matches.append(
{
"Address ID": a_id,
"Confidence Tier": "4 - no similar property, needs survey to confirm",
"Current EPC Band": "Needs Survey"
}
)
continue
# Calculate distance
surveyed["distance_meters"] = haversine(
lat1=property["latitude"], lon1=property["longitude"],
lat2=surveyed["latitude"].values, lon2=surveyed["longitude"].values
)
surveyed = surveyed.sort_values("distance_meters", ascending=True)
# Check if we have a postcode match check if surveyed postcode is the same as the property postcode
if any(surveyed["Postcode"] == property["Postcode"]):
surveyed = surveyed[surveyed["Postcode"] == property["Postcode"]]
if any(surveyed["Postal Region"] == property["Postal Region"]):
surveyed = surveyed[surveyed["Postal Region"] == property["Postal Region"]]
# Take the 3 nearest
surveyed = surveyed.head(3)
# # We allow a max distance of 10km
# surveyed = surveyed[surveyed["distance_meters"] < 10000]
# if surveyed.empty:
# final_missed_matches.append(
# {
# "Address ID": a_id,
# "Confidence Tier": "4 - no similar property, needs survey to confirm",
# "Current EPC Band": "Needs Survey"
# }
# )
# continue
# perform a weighted mean of SAP rating - the closer the better
expected_sap = np.average(
surveyed["Current SAP Rating"], weights=1 / (surveyed["distance_meters"] + 1)
)
expected_epc = sap_to_epc(expected_sap)
if expected_epc in ["C", "B", "A"]:
match_type = "5 - EPC C or above"
final_missed_matches.append(
{
"Address ID": a_id,
"Confidence Tier": match_type,
"Current EPC Band": expected_epc
}
)
continue
# if property["Property Type"].split(":")[0] in ["House", "Bungalow"]:
# filter_property_types = ["House", "Bungalow"]
# else:
# filter_property_types = ["Flat"]
#
# surveyed_similar = survey_results_with_original_features[
# (survey_results_with_original_features["Postcode"] == property["Postcode"]) &
# (
# survey_results_with_original_features["Property Type"].str.split(":").str[0].isin(
# filter_property_types
# )
# ) &
# (
# survey_results_with_original_features["Wall Type"].str.split(":").str[0] ==
# property["Wall Type"].split(":")[0]
# ) &
# (
# survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
# property["Roof Type"].split(":")[0]
# ) &
# (
# survey_results_with_original_features["Heating"].str.split(":").str[0] ==
# property["Heating"].split(":")[0]
# )
# ]
# if surveyed_similar.empty:
# surveyed_similar = survey_results_with_original_features[
# (survey_results_with_original_features["Postal Region"] == property["Postal Region"]) &
# (survey_results_with_original_features["Property Type"].str.split(":").str[0].isin(
# filter_property_types
# )) &
# (survey_results_with_original_features["Wall Type"].str.split(":").str[0] ==
# property["Wall Type"].split(":")[0]) &
# (survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
# property["Roof Type"].split(":")[0]) &
# (survey_results_with_original_features["Heating"].str.split(":").str[0] ==
# property["Heating"].split(":")[0])
# ]
#
# if surveyed_similar.empty:
#
# # We get an average based on the postcode
# surveyed_similar = survey_results_with_original_features[
# (survey_results_with_original_features["Postal Region"] == property["Postal Region"]) &
# (survey_results_with_original_features["Property Type"].str.split(":").str[0].isin(
# filter_property_types
# ))
# ]
# if surveyed_similar.empty:
# surveyed_similar_entire_population = survey_results_with_original_features[
# (
# survey_results_with_original_features["Property Type"].str.split(":").str[0] == property[
# "Property Type"].split(":")[0]
# ) &
# (
# survey_results_with_original_features["Wall Type"].str.split(":").str[0] ==
# property["Wall Type"].split(":")[0]
# ) &
# (
# survey_results_with_original_features["Roof Type"].str.split(":").str[0] ==
# property["Roof Type"].split(":")[0]
# ) &
# (
# survey_results_with_original_features["Heating"].str.split(":").str[0] ==
# property["Heating"].split(":")[0]
# )
# ]
#
# # We order them by distance on postcode
#
# # Average
# expected_sap = surveyed_similar_entire_population["Current SAP Rating"].mean()
# expected_epc = sap_to_epc(expected_sap)
#
# final_missed_matches.append(
# {
# "Address ID": a_id,
# "Confidence Tier": "3 - similar property, all areas searched",
# "Current EPC Band": expected_epc
# }
#
# )
# else:
# expected_sap = surveyed_similar["Current SAP Rating"].mean()
# expected_epc = sap_to_epc(expected_sap)
# if expected_epc in ["C", "B", "A"]:
# tier = "5 - EPC C or above"
# else:
# tier = "3 - similar property, relaxed conditions"
#
# final_missed_matches.append(
# {
# "Address ID": a_id,
# "Confidence Tier": tier,
# "Current EPC Band": expected_epc
# }
# )
# continue
# # We take an average
# expected_sap = surveyed_similar["Current SAP Rating"].mean()
# expected_epc = sap_to_epc(expected_sap)
# if expected_epc in ["C", "B", "A"]:
# tier = "5 - EPC C or above"
# else:
# tier = "3 - similar property"
#
# final_missed_matches.append(
# {
# "Address ID": a_id,
# "Confidence Tier": tier,
# "Current EPC Band": expected_epc
# }
# )
final_missed_matches = pd.DataFrame(final_missed_matches)
region_assets = region_assets.merge(
final_missed_matches,
on="Address ID",
how="left",
suffixes=("", "_method3")
)
region_assets["Confidence Tier"] = region_assets["Confidence Tier"].fillna(
region_assets["Confidence Tier_method3"]
)
region_assets["Current EPC Band"] = np.where(
pd.isnull(region_assets["Current EPC Band"]),
region_assets["Current EPC Band_method3"], region_assets["Current EPC Band"]
)
region_assets = region_assets.drop(columns=["Confidence Tier_method3", "Current EPC Band_method3"])
if pd.isnull(region_assets["Current EPC Band"]).sum():
raise Exception("Something went wrong")
results.append(region_assets)
results = pd.concat(results)
# home = results[results["Confidence Tier"] == "5 - EPC C or above"].sample(1)
# region = home["Postal Region"].values[0]
# Create a pivot table for counts of Confidence Tier by Postal Region
geographic_summary = results.pivot_table(
index='Postal Region',
columns='Confidence Tier',
aggfunc='size',
fill_value=0
).reset_index()
# We create the gain and loss columns
# Gain is the sum of these columns:
# '1 - Archetype surveyed',
# '1 - property was surveyed',
# '2 - same archetype',
# '3 - similar property, weighted on distance'
gain_columns = sorted([x for x in results["Confidence Tier"].unique() if "1 - " in x or "2 - " in x or "3 - " in x])
loss_columns = sorted([x for x in results["Confidence Tier"].unique() if "4 - " in x or "5 - " in x])
geographic_summary["Gain"] = geographic_summary[gain_columns].sum(axis=1)
geographic_summary["Loss"] = geographic_summary[loss_columns].sum(axis=1)
print(geographic_summary.sum())
geographic_summary = geographic_summary.sort_values("Loss", ascending=True)
geographic_summary["Loss Cumulative Sum"] = geographic_summary["Loss"].cumsum()
geographic_summary[geographic_summary["Loss Cumulative Sum"] <= 250]["Gain"].sum()
loss = geographic_summary["Loss"].values
gain = geographic_summary["Gain"].values
def optimise(gain, loss, max_loss=250):
# Define the coefficients for the objective function (negative because we maximize Gain)
c = -gain
# Define constraints
A = [loss] # Only 1 constraint for now, total Loss
b = [max_loss] # Maximum total Loss allowed
# Bounds for each variable (select or not select each row, 0 <= x <= 1)
bounds = [(0, 1) for _ in gain]
# Solve the problem using linprog with HiGHS solver
result = linprog(c, A_ub=A, b_ub=b, bounds=bounds, method='highs')
if not result.success:
raise Exception("Optimization failed")
selected_rows = result.x.round().astype(int) # Rounded to 0 or 1
optimal_gain = -result.fun
return selected_rows, optimal_gain
selected_rows, _ = optimise(gain, loss, 250)
# Select the rows that are selected
geographic_summary["Selected"] = selected_rows == 1
geographic_summary[geographic_summary["Selected"]].sum()
region_totals = geographic_summary[
geographic_summary["Selected"]
][["Gain", "Loss"]].sum()
# We now see if there are any postcodes that have no loss that can be added
unselected_regions = geographic_summary[~geographic_summary["Selected"]]["Postal Region"].values
# TODO: Try on street
postcode_summary = results.pivot_table(
index='Street and Region',
columns='Confidence Tier',
aggfunc='size',
fill_value=0
).reset_index()
# postcode_summary = postcode_summary.merge(
# results[["Postcode", "Postal Region"]].drop_duplicates(),
# how="left", on="Postcode"
# )
#
postcode_summary_unselected_regions = postcode_summary.copy()
# postcode_summary_unselected_regions = postcode_summary[
# postcode_summary["Postcode"].str.split(" ").str[0].isin(unselected_regions)
# ].copy()
postcode_summary_unselected_regions["Gain"] = postcode_summary_unselected_regions[gain_columns].sum(axis=1)
postcode_summary_unselected_regions["Loss"] = postcode_summary_unselected_regions[loss_columns].sum(axis=1)
# Remaining loss allowed
# remaining_loss_constraint = 230 - region_totals["Loss"]
remaining_loss_constraint = 220
postcode_selected_rows, _ = optimise(
gain=postcode_summary_unselected_regions["Gain"].values,
loss=postcode_summary_unselected_regions["Loss"].values,
max_loss=int(remaining_loss_constraint)
)
postcode_summary_unselected_regions["Selected"] = postcode_selected_rows == 1
postcode_summary_unselected_regions[postcode_summary_unselected_regions["Selected"]][["Gain", "Loss"]].sum()
postcode_optimised_additional_properties = postcode_summary_unselected_regions[
postcode_summary_unselected_regions["Selected"]
]
postcode_totals = postcode_optimised_additional_properties[["Gain", "Loss"]].sum()
bid_size = postcode_totals.sum()
print("Bid Size:", bid_size)
total_epc_d_or_below = postcode_totals["Gain"]
print("Total EPC D or below:", total_epc_d_or_below)
total_epc_c = postcode_totals["Loss"]
print("Total EPC C or above:", total_epc_c)
# Total needing a survey
total_needing_survey = postcode_optimised_additional_properties[
"4 - no similar property, needs survey to confirm"
].sum()
print("Total needing survey:", total_needing_survey)
# Look for postcodes that have no loss
unselected_streets = postcode_summary_unselected_regions[
~postcode_summary_unselected_regions["Selected"]
]["Street and Region"].values
postcode_summary2 = results[
results["Street and Region"].isin(unselected_streets)
].pivot_table(
index='Postcode',
columns='Confidence Tier',
aggfunc='size',
fill_value=0
).reset_index()
postcode_summary2["Gain"] = postcode_summary2[gain_columns].sum(axis=1)
postcode_summary2["Loss"] = postcode_summary2[loss_columns].sum(axis=1)
no_loss_postcodes = postcode_summary2[postcode_summary2["Loss"] == 0].sort_values("Gain", ascending=False)
total_bid_size = bid_size + no_loss_postcodes["Gain"].sum()
print(total_bid_size)
z = results[results["Confidence Tier"] == "5 - EPC C or above"]
# if __name__ == "__main__":
# main()