Model/asset_list/AssetList.py
2025-02-21 12:39:06 +00:00

1046 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import os
import re
import tiktoken
from pprint import pprint
from datetime import datetime
from openai import OpenAI
import numpy as np
import pandas as pd
from fuzzywuzzy import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
from BaseUtility import Definitions
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
import asset_list.mappings.exising_pv as existing_pv_mappings
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = standard_values
self.standard_map = standard_map
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = max_tokens # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
# Replace double strings
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map or value in self.standard_map:
self.remap_dict[value] = (
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
)
continue
if value.lower() in self.standard_map:
self.remap_dict[value] = self.standard_map[value.lower()]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
class AssetList:
"""
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
"""
EPC_API_DATA_NAMES = {
"uprn": "epc_os_uprn",
"address1": "epc_address1",
"address": "epc_address",
"postcode": "epc_postcode",
"inspection-date": "epc_inspection_date",
"current-energy-efficiency": "epc_sap_score_on_register",
"current-energy-rating": "epc_rating_on_register",
"property-type": "epc_property_type",
"built-form": "epc_archetype",
"total-floor-area": "epc_total_floor_area",
"construction-age-band": "epc_age_band",
"floor-height": "epc_floor_height",
"number-habitable-rooms": "epc_number_habitable_rooms",
"walls-description": "epc_wall_construction",
"roof-description": "epc_roof_construction",
"floor-description": "epc_floor_construction",
"mainheat-description": "epc_heating_type",
'mainheatcont-description': "epc_heating_controls",
"secondheat-description": "epc_secondary_heating",
"transaction-type": "epc_reason",
"energy-consumption-current": "epc_heat_demand",
"photo-supply": "epc_photo_supply"
}
FIND_EPC_DATA_NAMES = {
"heating_text": "epc_estiamted_heating_kwh",
"hot_water_text": "epc_estimated_hotwater_kwh",
'Assessors name': "epc_assessor_name",
"Assessor's Telephone": "epc_assessor_telephone",
"Assessor's Email": "epc_assessor_email",
"Accreditation scheme": "epc_assessor_accreditation",
"Assessors ID": "epc_assessor_id",
"Solar photovoltaics": "epc_solar_pv"
}
DATETIME_REMAP = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
# These are the accepted methods we have for cleaning the address1 column
ADDRESS_1_CLEANING_METHODS = [
"first_two_words", # This method will split on the fist two words, where the separator is a space
"first_word", # This method will split on the first word, where the separator is a space
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
# "address1_extraction" # This method will use the NLP model to extract address1
]
# Standard column Names
STANDARD_ADDRESS_1 = "domna_address_1"
STANDARD_POSTCODE = "domna_postcode"
STANDARD_FULL_ADDRESS = "domna_full_address"
STANDARD_YEAR_BUILT = "landlord_year_built"
STANDARD_UPRN = "ordnance_survey_uprn"
STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id"
STANDARD_PROPERTY_TYPE = "landlord_property_type"
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
STANDARD_EXISTING_PV = "landlord_existing_pv"
DOMNA_PROPERTY_ID = "domna_property_id"
# Regular expression for identifying if the address might point to multiple units
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
# List of columns relating to the non-intrusive data
NON_INTRUSIVES_COLNAMES = [
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
"Any further surveyor notes", 'Surveyors Name'
]
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
FILLED_CAVITY_SAP_THRESHOLD = 75
# This SAP the
EMPTY_CAVITY_SAP_THRESHOLD = 71
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
# Attributes - these are columns that we produce, calcualted based on other pieces of data
ATTRIBUTE_HAS_SOLAR = "attribute_has_solar"
ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors"
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"EPC is pre {EPC_YEAR_THRESHOLD}"
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
"cavity wall, as built, no insulation",
]
# List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated
EPC_INSULATED_WALLS_SUBSTRINGS = [
", insulated", "with external insulation", "with internal insulation", "filled cavity"
]
# List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated
EPC_INSULATED_ROOF_SUBSTRINGS = [
"(another dwelling above)", ", insulated", ", insulated (assumed) ",
", ceiling insulated",
]
def __init__(
self,
local_filepath,
sheet_name,
address1_colname,
postcode_colname,
full_address_colname,
landlord_property_id=None,
full_address_cols_to_concat=None,
missing_postcodes_method=None,
address1_extraction_method=None,
landlord_year_built=None,
landlord_uprn=None,
landlord_property_type=None,
landlord_wall_construction=None,
landlord_heating_system=None,
landlord_existing_pv=None,
header=0
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
# Read in the data
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
self.standardised_asset_list = self.raw_asset_list.copy()
# We detect the presence of the non-intrusive columns
self.non_intrusives_present = True if "CIGA Check Required" in self.raw_asset_list.columns else False
# Names of columns
self.landlord_property_id = landlord_property_id
self.address1_colname = address1_colname
self.postcode_colname = postcode_colname
self.full_address_colname = full_address_colname
self.landlord_year_built = landlord_year_built
self.landlord_uprn = landlord_uprn
self.landlord_property_type = landlord_property_type
self.landlord_wall_construction = landlord_wall_construction
self.landlord_heating_system = landlord_heating_system
self.landlord_existing_pv = landlord_existing_pv
# parameters for cleaning
self.full_address_cols_to_concat = full_address_cols_to_concat
self.missing_postcodes_method = missing_postcodes_method
self.address1_extraction_method = address1_extraction_method
self.debug_information = {
"property_type": None,
"wall_construction": None,
"heating_system": None,
"existing_pv": None
}
self.variable_mappings = {}
self.rename_map = {}
self.keep_variables = []
# Finally, we handle the case where the landlord's property ID is actually the OS UPRN
if self.landlord_uprn == self.landlord_property_id:
self.standardised_asset_list[self.STANDARD_UPRN] = self.standardised_asset_list[self.landlord_uprn].copy()
# Update the reference to landlord UPRn
self.landlord_uprn = self.STANDARD_UPRN
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
if method not in self.ADDRESS_1_CLEANING_METHODS:
raise ValueError(f"Method {method} for producing address1 not recognized")
if method == "first_two_words":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list[self.address1_colname] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
raise ValueError(f"Method {method} not recognized")
@staticmethod
def _address1_extraction(x):
pass
def create_property_id(self):
"""
This function creates the domna property ID, which is simply a hash of the full address and postcode
We want all figures to be positive
:return:
"""
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
def _make_hash(value):
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
# Normalize and remove special characters for cleaner ID
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
# Generate SHA-256 hash and truncate it
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
return f"{cleaned_value}-{short_hash}"
# Apply transformation
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
self.standardised_asset_list[self.full_address_colname] +
self.standardised_asset_list[self.postcode_colname]
).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash)
@staticmethod
def _strip_postcode_from_full_address(full_address, postcode):
cleaned = full_address.replace(postcode, "")
# Remove any trailing commas and spaces
cleaned = cleaned.rstrip(", ").strip(",").strip()
return cleaned
@classmethod
def _identify_multi_address(cls, address):
# We check if the address is comma separated
if "," in address:
address1_section = address.split(",")[0]
# We look for string in the form (x-y)
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
@staticmethod
def _convert_uprn(x):
"""
Used to convert UPRNS to integer strings
:param x: uprn to convert
:return: converted uprn
"""
if pd.isnull(x):
return x
# check if numeric
if np.isreal(x):
return str(int(x))
if str(x).isdigit():
return str(int(x))
return x
def init_standardise(self):
"""
This function is used to standardise the asset list
:return: standardised asset list
"""
# Remove rows without a postcode
if self.postcode_colname is not None:
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
# We clean up portential non-breaking spaces, and double spaces
for col in [
c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if
c is not None
]:
self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False)
if self.address1_colname is None:
if self.address1_extraction_method is None:
raise ValueError("Missing address 1 - please specify an extraction method")
self.address1_colname = self.STANDARD_ADDRESS_1
# If we do not have this, we produce it
self.standardised_asset_list = self._extract_address1(
asset_list=self.standardised_asset_list,
full_address_col=self.full_address_colname,
postcode_col=self.postcode_colname,
method=self.address1_extraction_method
)
if self.full_address_colname is None:
if not self.full_address_cols_to_concat:
raise ValueError("Missing full address - please specify columns to concatenate")
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.full_address_cols_to_concat].apply(lambda x: ", ".join(x), axis=1)
)
else:
# Make sure to strip the postcode out of the full address
self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply(
lambda x: self._strip_postcode_from_full_address(
full_address=x[self.full_address_colname],
postcode=x[self.postcode_colname]
),
axis=1
)
# We create the domna property id
self.create_property_id()
# Clean up the UPRN column, if the landlord has provided them
if self.landlord_uprn is not None:
self.standardised_asset_list[self.landlord_uprn] = (
self.standardised_asset_list[self.landlord_uprn].apply(self._convert_uprn)
)
# We keep just the columns we care about and will work through the various columns and standardise
variables = [
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_uprn,
self.landlord_property_type,
self.landlord_year_built,
self.landlord_wall_construction,
self.landlord_heating_system,
self.landlord_existing_pv
]
# Keep just non-null variables (e.g landlord may not provide uprn
self.keep_variables = [v for v in variables if v is not None]
self.rename_map = {
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
self.address1_colname: self.STANDARD_ADDRESS_1,
self.postcode_colname: self.STANDARD_POSTCODE,
self.full_address_colname: self.STANDARD_FULL_ADDRESS,
self.landlord_uprn: self.STANDARD_UPRN,
self.landlord_property_type: self.STANDARD_PROPERTY_TYPE,
self.landlord_year_built: self.STANDARD_YEAR_BUILT,
self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION,
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
self.landlord_existing_pv: self.STANDARD_EXISTING_PV
}
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
if self.non_intrusives_present:
self.keep_variables += self.NON_INTRUSIVES_COLNAMES
self.rename_map = {
**self.rename_map,
**dict(
zip(self.NON_INTRUSIVES_COLNAMES, ["non-intrusives: " + c for c in self.NON_INTRUSIVES_COLNAMES])
)
}
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
].apply(lambda x: self._identify_multi_address(x))
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
# we see instances of "average thermal transmittance" in the description
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
"average thermal transmittance"
),
"new build - average thermal transmittance",
self.standardised_asset_list[self.landlord_wall_construction]
)
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime - year built has not been renamed
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
)
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
self.standardised_asset_list[self.landlord_year_built]
)
# Convert this to year
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].dt.year
)
else:
raise NotImplementedError("Year built column must be a datetime - implement me")
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
"standard_map": property_type_mappings.PROPERTY_MAPPING
},
self.landlord_wall_construction: {
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
},
self.landlord_heating_system: {
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
"standard_map": heating_mappings.HEATING_MAPPINGS
},
self.landlord_existing_pv: {
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
}
}
for variable, config in to_remap.items():
logger.info("Standardising variable: %s", variable)
values_to_remap = self.standardised_asset_list[variable].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
self.variable_mappings[variable] = remap_dictionary
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
# asset list is returned
for variable, mapping in self.variable_mappings.items():
pprint(f"Variable: {variable}")
pprint(mapping)
# Print a space
print("\n")
pprint("=======================================")
def apply_standardiation(self, override_empty_mappings=False):
"""
This function applies the standardisation to the asset list
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
if there are no categories which need remapping which is highly unlikely
:return:
"""
if not self.variable_mappings and not override_empty_mappings:
raise ValueError("Please run init_standardise first")
logger.info("Applying standardisation to asset list")
for variable, mapping in self.variable_mappings.items():
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
# Drop the dupes
pprint(
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
f"addresses - dropping"
)
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
]
# Apply renames to our standard names
# Perform final variable selection and renaming:
self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename(
columns=self.rename_map
)
def merge_data(self, df: pd.DataFrame):
"""
Used to insert data into the standardised asset list, based on the domna property id
:return:
"""
if self.DOMNA_PROPERTY_ID not in df.columns:
raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}")
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError(f"{self.DOMNA_PROPERTY_ID} contains duplicated IDs")
self.standardised_asset_list = self.standardised_asset_list.merge(
df, how="left", on=self.DOMNA_PROPERTY_ID
)
def extract_attributes(self):
# Used to extracty the typical attributes that we use to identify viable work
self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR] = (
self.standardised_asset_list[self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]] |
~self.standardised_asset_list[self.EPC_API_DATA_NAMES["photo-supply"]].isin(["0.0", 0, None, ""])
)
accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"]
# The logic here is:
# 1) Take the property type provided by the HA themselves
# 2) In absence of that, take the EPC property type
# 3) Otherwise use None
self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = self.standardised_asset_list.apply(
lambda x: estimate_number_of_floors(
property_type=(
x[self.STANDARD_PROPERTY_TYPE].title() if
x[self.STANDARD_PROPERTY_TYPE].title() in accepted_epc_property_types else (
x[self.EPC_API_DATA_NAMES["property-type"]] if not
pd.isnull(x[self.EPC_API_DATA_NAMES["property-type"]]) else None
)
)
),
axis=1
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].astype(float)
)
# Replace "" value with None
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].replace("", None)
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].astype(float)
)
# Estimate the perimeter
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
), axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = self.standardised_asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
floor_height=(
float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if
x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5
),
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
built_form=x[self.EPC_API_DATA_NAMES["built-form"]]
),
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = self.standardised_asset_list.apply(
lambda x: RoofAttributes(description=x[self.EPC_API_DATA_NAMES["roof-description"]]).process()[
"insulation_thickness"] if not pd.isnull(
x[self.EPC_API_DATA_NAMES["roof-description"]]) else None,
axis=1
)
# We produce some additional fields
# 1) Is the SAP rating below C75
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.FILLED_CAVITY_SAP_THRESHOLD
)
# 2) Flag anything where the EPC is older than 5 years
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
pd.to_datetime(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
).dt.year < self.EPC_YEAR_THRESHOLD
)
self.process_age_band()
def process_age_band(self):
processed_age_band = []
for _, x in self.standardised_asset_list.iterrows():
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES
):
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": None,
"Does Age Match EPC Age Band?": "No EPC Age Band"
}
)
continue
# We exatract the upper and lower bounds
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
"England and Wales: 2007 onwards", "England and Wales: 2012 onwards"
]:
year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[
"construction-age-band"]] == "England and Wales: 2007 onwards" else 2012
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
else "EPC Age Band is older than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": year_lower_bound,
"epc_year_upper_bound": None,
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900":
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": 1899,
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
)
else "EPC Age Band is different from Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"Does Age Match EPC Age Band?": age_band_matches
}
)
continue
# Oherwise, we extract the upper and lower bounds
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1]
lower_date, upper_date = age_band.split("-")
age_band_matches = (
"EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (
x[self.STANDARD_YEAR_BUILT] <= float(upper_date)
)
else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(lower_date),
"epc_year_upper_bound": int(upper_date),
"Does Age Match EPC Age Band?": age_band_matches
}
)
processed_age_band = pd.DataFrame(processed_age_band)
self.standardised_asset_list = self.standardised_asset_list.merge(
processed_age_band, how="left"
)
def identify_worktypes(self, cleaned):
if not self.non_intrusives_present:
raise NotImplementedError("Need to implement the case for non-intrusives")
# If we have non-intrusives completed, we can use this to identify work types
if self.non_intrusives_present:
######################################################
# Empty cavity:
######################################################
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
# 2) The age is before 1995
# TODO: 3) Remove anything that likley has access issues
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"]) &
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= 2000)
)
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= 1995
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
self.standardised_asset_list["empty_cavity"] = (
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] |
self.standardised_asset_list["epc_indicates_empty_cavity"]
)
# We add a reason
self.standardised_asset_list["empty_cavity_reason"] = np.where(
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"],
"Non-Intrusive Data",
"EPC Data"
)
######################################################
# Extraction
######################################################
# TODO When filterting like this, 627 properties are flagged as not needing a CIGA check and 582 are flagged
# as needing a CIGA check. What is the logic we should be applying here?
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
(self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") &
(self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) &
(~self.standardised_asset_list['non-intrusives: Material'].isin(["GREY LOOSE BEAD", "FORMALDEHYDE"])
) & (
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
)
######################################################
# Solar
######################################################
# Criteria:
# TODO: Standardise these columns with our cleaned_data object
# Check 1: Does the property have a valid heating system?
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["air source heat pump", "ground source heat pump", "high heat retention storage heaters"]
)
)
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().str.contains("air source heat pump|ground source heat pump")
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters"
) & (
self.standardised_asset_list[self.EPC_API_DATA_NAMES[
"mainheatcont-description"]] == "Controls for high heat retention storage heaters"
)
)
)
# Check 2: Does the property have solar already
self.standardised_asset_list["property_has_solar"] = (
(self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") |
(self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF") |
(self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
)
# Check 3: Does the property meet the fabric condition
# Solar PV installs are subject to the minimum insulation requirements which means:
# 1) one of the following insulation measures must be installed as part of the same
# ECO4 project:
# • roof insulation (flat roof, pitched roof, room-in-roof)
# • exterior facing wall insulation (cavity wall, solid wall)
# • party cavity wall insulation
# • floor insulation (solid and underfloor)
#
# OR
#
# all measures (except any exempted measure referred to in paragraph 4.28)
# listed in paragraph a) must already be installed
#
# With this in mind, we look for 2 clases
# 1) The property is fully insulated apart from the loft (<200mm insulation)
# 2) THe property is fully insulated
self.standardised_asset_list["solar_landlord_walls_insulated"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
["filled cavity", "insulated solid brick"]
)
)
# TODO: We don't have information about the roof from this landlord
self.standardised_asset_list["solar_epc_walls_insulated"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS)
)
)
# We merge on the u-value for average thermal transmittance
roof_uvalue_data = pd.DataFrame(cleaned["roof-description"])
roof_uvalue_data = roof_uvalue_data[
~pd.isnull(roof_uvalue_data["thermal_transmittance"])
][["original_description", "thermal_transmittance"]].rename(
columns={
"original_description": self.EPC_API_DATA_NAMES["roof-description"],
"thermal_transmittance": "roof_u_value"
}
)
self.standardised_asset_list = self.standardised_asset_list.merge(
roof_uvalue_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
)
# If the u-value of a roof is less than 0.7 we consider it insulated
self.standardised_asset_list["solar_epc_roof_insulated"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS), regex=False
) | (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) >= 270 if str(x).isdigit() else False
)
) | (
self.standardised_asset_list["roof_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(x) else False
)
)
)
self.standardised_asset_list["solar_epc_loft_needs_topup"] = self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) < 270 if str(x).isdigit() else False
)
self.standardised_asset_list["solar_epc_floor_is_solid"] = self.standardised_asset_list[
self.EPC_API_DATA_NAMES["floor-description"]
].str.lower().str.contains("solid")
self.standardised_asset_list["solar_epc_floor_is_solid"] = (
self.standardised_asset_list["solar_epc_floor_is_solid"].fillna(False)
)
z = self.standardised_asset_list[
self.standardised_asset_list["solar_epc_floor_is_solid"] == True
]