coding up logic to identify work types

This commit is contained in:
Khalim Conn-Kowlessar 2025-02-20 20:50:05 +00:00
parent 8bf6aa5af2
commit c0ebffb6cb
4 changed files with 270 additions and 172 deletions

View file

@ -10,6 +10,7 @@ import pandas as pd
from fuzzywuzzy import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
from BaseUtility import Definitions
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
@ -282,7 +283,9 @@ class AssetList:
]
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
SAP_RATING_THRESHOLD = 75
FILLED_CAVITY_SAP_THRESHOLD = 75
# This SAP the
EMPTY_CAVITY_SAP_THRESHOLD = 71
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
@ -292,9 +295,17 @@ class AssetList:
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{SAP_RATING_THRESHOLD}_and_below"
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"EPC is pre {EPC_YEAR_THRESHOLD}"
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
"cavity wall, as built, no insulation",
]
def __init__(
self,
local_filepath,
@ -728,12 +739,241 @@ class AssetList:
# 1) Is the SAP rating below C75
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.SAP_RATING_THRESHOLD
self.FILLED_CAVITY_SAP_THRESHOLD
)
# 2) Flag anything where the EPC is older than 5 years
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
pd.to_datetime(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["lodgement-date"]]
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
).dt.year < self.EPC_YEAR_THRESHOLD
)
self.process_age_band()
def process_age_band(self):
processed_age_band = []
for _, x in self.standardised_asset_list.iterrows():
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES
):
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": None,
"Does Age Match EPC Age Band?": "No EPC Age Band"
}
)
continue
# We exatract the upper and lower bounds
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
"England and Wales: 2007 onwards", "England and Wales: 2012 onwards"
]:
year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[
"construction-age-band"]] == "England and Wales: 2007 onwards" else 2012
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
else "EPC Age Band is older than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": year_lower_bound,
"epc_year_upper_bound": None,
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900":
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": 1899,
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
)
else "EPC Age Band is different from Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
# Oherwise, we extract the upper and lower bounds
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1]
lower_date, upper_date = age_band.split("-")
age_band_matches = (
"EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (
x[self.STANDARD_YEAR_BUILT] <= float(upper_date)
)
else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(lower_date),
"epc_year_upper_bound": int(upper_date),
"Does Age Match EPC Age Band?": age_band_matches
}
)
processed_age_band = pd.DataFrame(processed_age_band)
self.standardised_asset_list = self.standardised_asset_list.merge(
processed_age_band, how="left"
)
def identify_worktypes(self):
# If we have non-intrusives completed, we can use this to identify work types
if self.non_intrusives_present:
######################################################
# Empty cavity:
######################################################
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
# 2) The age is before 1995
# TODO: 3) Remove anything that likley has access issues
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"]) &
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= 2000)
)
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= 1995
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
######################################################
# Extraction
######################################################
# TODO When filterting like this, 627 properties are flagged as not needing a CIGA check and 582 are flagged
# as needing a CIGA check. What is the logic we should be applying here?
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
(self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") &
(self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) &
(~self.standardised_asset_list['non-intrusives: Material'].isin(["GREY LOOSE BEAD", "FORMALDEHYDE"])
) & (
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
)
######################################################
# Solar
######################################################
# Criteria:
# TODO: Standardise these columns with our cleaned_data object
# Check 1: Does the property have a valid heating system?
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["air source heat pump", "ground source heat pump", "high heat retention storage heaters"]
)
)
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().str.contains("air source heat pump|ground source heat pump")
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters"
) & (
self.standardised_asset_list[self.EPC_API_DATA_NAMES[
"mainheatcont-description"]] == "Controls for high heat retention storage heaters"
)
)
)
# Check 2: Does the property have solar already
self.standardised_asset_list["property_has_solar"] = (
(self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") |
(self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF") |
(self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
)
# Check 3: Does the property meet the fabric condition
# Solar PV installs are subject to the minimum insulation requirements which means:
# 1) one of the following insulation measures must be installed as part of the same
# ECO4 project:
# • roof insulation (flat roof, pitched roof, room-in-roof)
# • exterior facing wall insulation (cavity wall, solid wall)
# • party cavity wall insulation
# • floor insulation (solid and underfloor)
#
# OR
#
# all measures (except any exempted measure referred to in paragraph 4.28)
# listed in paragraph a) must already be installed
#
# With this in mind, we look for 2 clases
# 1) The property is fully insulated apart from the loft (<200mm insulation)
# 2) THe property is fully insulated
self.standardised_asset_list["solar_landlord_walls_insulated"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
["filled cavity", "insulated solid brick"]
)
)
EPC_INSULATED_WALLS_SUBSTRINGS = [
", insulated", "with external insulation", "with internal insulation", "filled cavity"
]
self.standardised_asset_list["landlord_wall_construction"].value_counts()
EPC_INSULATED_ROOF_SUBSTRINGS = [
"(another dwelling above)", "limited insulation", "(other premises above)",
", no insulation",
]

View file

@ -13,6 +13,7 @@ STANDARD_HEATING_SYSTEMS = {
"electric boiler",
"unknown",
"communal gas boiler",
"high heat retention storage heaters",
}
HEATING_MAPPINGS = {

View file

@ -1,8 +1,10 @@
from asset_list.AssetList import DataRemapper
STANDARD_WALL_CONSTRUCTIONS = {
"uninsulated cavity", "filled cavity", "partial insulated cavity", "timber frame", "solid brick",
"system built", "granite or whinstone", "other", "unknown", "sandstone or limestone", "cob",
"uninsulated cavity", "filled cavity", "partial insulated cavity", "cavity unknown insulation",
"timber frame", "uninsulated solid brick",
"insulated solid brick", "system built", "granite or whinstone", "other", "unknown", "sandstone or limestone",
"cob",
"new build - average thermal transmittance",
}
@ -26,7 +28,8 @@ WALL_CONSTRUCTION_MAPPINGS = {
'Average thermal transmittance 0.26 W/m-¦K': 'unknown', 'Average thermal transmittance 0.62 W/m?K': 'unknown',
'Average thermal transmittance 0.64 W/m?K': 'unknown', 'Average thermal transmittance 0.61 W/m?K': 'unknown',
'Sandstone or limestone, as built, no insulation (assumed)': 'sandstone or limestone',
'Average thermal transmittance 0.33 W/m?K': 'unknown', 'Cavity wall,': 'unknown',
'Average thermal transmittance 0.33 W/m?K': 'unknown',
'Cavity wall,': "cavity unknown insulation",
'Cavity wall, as built, partial insulation (assumed)': 'partial insulated cavity',
'Average thermal transmittance 0.29 W/m-¦K': 'unknown', 'Average thermal transmittance 0.32 W/m-¦K': 'unknown',
'Average thermal transmittance 0.19 W/m-¦K': 'unknown', 'Average thermal transmittance 0.27 W/m?K': 'unknown',
@ -55,7 +58,7 @@ WALL_CONSTRUCTION_MAPPINGS = {
'average thermal transmittance 0.26 w/m-¦k': 'unknown', 'average thermal transmittance 0.62 w/m?k': 'unknown',
'average thermal transmittance 0.64 w/m?k': 'unknown', 'average thermal transmittance 0.61 w/m?k': 'unknown',
'sandstone or limestone, as built, no insulation (assumed)': 'sandstone or limestone',
'average thermal transmittance 0.33 w/m?k': 'unknown', 'cavity wall,': 'unknown',
'average thermal transmittance 0.33 w/m?k': 'unknown', 'cavity wall,': "cavity unknown insulation",
'cavity wall, as built, partial insulation (assumed)': 'partial insulated cavity',
'average thermal transmittance 0.29 w/m-¦k': 'unknown', 'average thermal transmittance 0.32 w/m-¦k': 'unknown',
'average thermal transmittance 0.19 w/m-¦k': 'unknown', 'average thermal transmittance 0.27 w/m?k': 'unknown',
@ -67,4 +70,20 @@ WALL_CONSTRUCTION_MAPPINGS = {
'average thermal transmittance 0.32 w/m?k': 'unknown', 'average thermal transmittance 0.24 w/m-¦k': 'unknown',
'cavity wall, with internal insulation': 'filled cavity', 'average thermal transmittance 0.17 w/m-¦k': 'unknown',
'average thermal transmittance 0.28 w/m?k': 'unknown',
'Cavity wall, filled cavity': 'filled cavity',
'Cavity wall, filled cavity and external insulation': 'filled cavity',
'Granite or whinstone, as built, no insulation (assumed)': 'granite or '
'whinstone',
'Solid brick, as built, insulated (assumed)': 'insulated solid brick',
'Solid brick, as built, no insulation (assumed)': 'uninsulated solid brick',
'Solid brick, with external insulation': 'insulated solid brick',
'Solid brick, with internal insulation': 'insulated solid brick',
'System built, as built, insulated (assumed)': 'system built',
'System built, as built, no insulation (assumed)': 'system built',
'System built, with external insulation': 'system built',
'System built, with internal insulation': 'system built',
'Timber frame, as built, insulated (assumed)': 'timber frame',
'Timber frame, as built, no insulation (assumed)': 'timber frame',
'Timber frame, as built, partial insulation (assumed)': 'timber frame',
'Timber frame, with additional insulation': 'timber frame',
}

View file

@ -4,7 +4,6 @@ import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from BaseUtility import Definitions
from asset_list.AssetList import AssetList
from asset_list.mappings.property_type import PROPERTY_MAPPING
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
@ -14,13 +13,6 @@ from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
from etl.epc_clean.epc_attributes.attribute_utils import (
extract_thermal_transmittance
@ -177,109 +169,6 @@ def extract_address1(asset_list, full_address_col, postcode_col, method="first_t
raise ValueError(f"Method {method} not recognized")
def process_age_band(asset_list, year_built_column):
processed_age_band = []
for _, x in asset_list.iterrows():
if pd.isnull(x["Property Age Band"]) or (
x["Property Age Band"] in Definitions.DATA_ANOMALY_MATCHES
):
processed_age_band.append({
"row_id": x["row_id"],
"epc_year_lower_bound": None,
"epc_year_upper_bound": None,
"Does Age Match EPC Age Band?": "No EPC Age Band"
})
continue
# We exatract the upper and lower bounds
if x["Property Age Band"] in ["England and Wales: 2007 onwards", "England and Wales: 2012 onwards"]:
year_lower_bound = 2007 if x["Property Age Band"] == "England and Wales: 2007 onwards" else 2012
if pd.isnull(x[year_built_column]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[year_built_column] >= year_lower_bound
else "EPC Age Band is older than Year Built"
)
processed_age_band.append(
{
"row_id": x["row_id"],
"epc_year_lower_bound": year_lower_bound,
"epc_year_upper_bound": None,
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
if x["Property Age Band"] == "England and Wales: before 1900":
if pd.isnull(x[year_built_column]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[year_built_column] < 1900
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
"row_id": x["row_id"],
"epc_year_lower_bound": None,
"epc_year_upper_bound": 1899,
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
if x["Property Age Band"].isdigit():
if pd.isnull(x[year_built_column]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[year_built_column] == int(x["Property Age Band"])
else "EPC Age Band is different from Year Built"
)
processed_age_band.append(
{
"row_id": x["row_id"],
"epc_year_lower_bound": int(x["Property Age Band"]),
"epc_year_upper_bound": int(x["Property Age Band"]),
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
# Oherwise, we extract the upper and lower bounds
age_band = x["Property Age Band"].split(": ")[1]
lower_date, upper_date = age_band.split("-")
age_band_matches = (
"EPC Age Band Matches Year Built" if (x[year_built_column] >= float(lower_date)) and (
x[year_built_column] <= float(upper_date)
)
else "EPC Age Band is older than Year Built" if x[year_built_column] > float(upper_date)
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
"row_id": x["row_id"],
"epc_year_lower_bound": int(lower_date),
"epc_year_upper_bound": int(upper_date),
"Does Age Match EPC Age Band?": age_band_matches
}
)
processed_age_band = pd.DataFrame(processed_age_band)
return processed_age_band
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
@ -531,62 +420,11 @@ def app():
# TODO: TEMP!!!
epc_df["epc_os_uprn"] = epc_df["epc_os_uprn"].astype("Int64").astype(str)
asset_list.standardised_asset_list = asset_list.standardised_asset_list.merge(
epc_df, how="left", left_on="ordnance_survey_uprn", right_on="epc_os_uprn"
epc_df.drop(columns=["domna_property_id"]), how="left", left_on="ordnance_survey_uprn", right_on="epc_os_uprn"
)
asset_list.extract_attributes()
asset_list["Estimated Number of Floors"] = asset_list.apply(
lambda x: estimate_number_of_floors(property_type=x["Property Type"]) if not pd.isnull(
x["Property Type"]) else None, axis=1
)
asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float)
# Replace "" value with None
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].replace("", None)
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float)
asset_list["Estimated Perimeter (m)"] = asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"],
num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"],
), axis=1
)
asset_list["Estimated Heat Loss Perimeter (m2)"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype - EPC"]
),
axis=1
)
asset_list["Roof Insulation Thickness"] = asset_list.apply(
lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull(
x["Roof Construction"]) else None,
axis=1
)
# We produce some additional fields
# 1) Is the SAP rating below C75
asset_list["SAP Rating is 75 and below"] = asset_list["SAP score on register"] <= 75
# 2) Flag anything where the EPC is older than 5 years
cutoff_year = pd.Timestamp.now().year - 5
asset_list[f"EPC is pre {cutoff_year}"] = (
pd.to_datetime(asset_list["Date of last EPC"]).dt.year < cutoff_year
)
# 3) If we have year in the asset list, we flag entries where the built year is different from the
# EPC Age band
if PROPERTY_YEAR_BUILT is not None:
# We process the age band and merge it on
processed_age_band = process_age_band(asset_list, PROPERTY_YEAR_BUILT)
asset_list = asset_list.merge(
processed_age_band, how="left", on="row_id"
)
if HAS_NON_INTRUSIVES:
# Empty cavity:
# 1) Has been flagged on the non-intrusives as being empty or partially filled