Model/asset_list/AssetList.py
2025-03-19 18:50:21 +00:00

2439 lines
111 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import os
import re
import tiktoken
from pprint import pprint
from datetime import datetime
from openai import OpenAI
import numpy as np
import pandas as pd
from tqdm import tqdm
from fuzzywuzzy import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
from BaseUtility import Definitions
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
import asset_list.mappings.exising_pv as existing_pv_mappings
import asset_list.mappings.built_form as built_form_mappings
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = standard_values
self.standard_map = standard_map
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = max_tokens # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
# Replace double strings
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map or value in self.standard_map:
self.remap_dict[value] = (
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
)
continue
if value.lower() in self.standard_map:
self.remap_dict[value] = self.standard_map[value.lower()]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
class AssetList:
"""
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
"""
EPC_API_DATA_NAMES = {
"uprn": "epc_os_uprn",
"address1": "epc_address1",
"address": "epc_address",
"postcode": "epc_postcode",
"inspection-date": "epc_inspection_date",
"current-energy-efficiency": "epc_sap_score_on_register",
"current-energy-rating": "epc_rating_on_register",
"property-type": "epc_property_type",
"built-form": "epc_archetype",
"total-floor-area": "epc_total_floor_area",
"construction-age-band": "epc_age_band",
"floor-height": "epc_floor_height",
"number-habitable-rooms": "epc_number_habitable_rooms",
"walls-description": "epc_wall_construction",
"roof-description": "epc_roof_construction",
"floor-description": "epc_floor_construction",
"mainheat-description": "epc_heating_type",
'mainheatcont-description': "epc_heating_controls",
"secondheat-description": "epc_secondary_heating",
"transaction-type": "epc_reason",
"energy-consumption-current": "epc_heat_demand",
"photo-supply": "epc_photo_supply",
"estimated": "estimated"
}
FIND_EPC_DATA_NAMES = {
"heating_text": "epc_estiamted_heating_kwh",
"hot_water_text": "epc_estimated_hotwater_kwh",
'Assessors name': "epc_assessor_name",
"Assessor's Telephone": "epc_assessor_telephone",
"Assessor's Email": "epc_assessor_email",
"Accreditation scheme": "epc_assessor_accreditation",
"Assessors ID": "epc_assessor_id",
"Solar photovoltaics": "epc_solar_pv"
}
DATETIME_REMAP = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
# These are the accepted methods we have for cleaning the address1 column
ADDRESS_1_CLEANING_METHODS = [
"first_two_words", # This method will split on the fist two words, where the separator is a space
"first_word", # This method will split on the first word, where the separator is a space
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
# "address1_extraction" # This method will use the NLP model to extract address1
]
# Standard column Names
STANDARD_ADDRESS_1 = "domna_address_1"
STANDARD_POSTCODE = "domna_postcode"
STANDARD_FULL_ADDRESS = "domna_full_address"
STANDARD_YEAR_BUILT = "landlord_year_built"
STANDARD_UPRN = "ordnance_survey_uprn"
STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id"
STANDARD_PROPERTY_TYPE = "landlord_property_type"
STANDARD_BUILT_FORM = "landlord_built_form"
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
STANDARD_EXISTING_PV = "landlord_existing_pv"
DOMNA_PROPERTY_ID = "domna_property_id"
# Regular expression for identifying if the address might point to multiple units
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
# List of columns relating to the non-intrusive data
NON_INTRUSIVES_COLNAMES = [
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
"Any further surveyor notes", 'Surveyors Name'
]
OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ['WFT Findings', 'ECO Eligibility']
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
FILLED_CAVITY_SAP_THRESHOLD = 75
# This SAP the
EMPTY_CAVITY_SAP_THRESHOLD = 75
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
# Attributes - these are columns that we produce, calcualted based on other pieces of data
ATTRIBUTE_HAS_SOLAR = "attribute_has_solar"
ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors"
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}"
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
"cavity wall, as built, no insulation",
]
# List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated
EPC_INSULATED_WALLS_SUBSTRINGS = [
", insulated", "with external insulation", "with internal insulation", "filled cavity"
]
# List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated
EPC_INSULATED_ROOF_SUBSTRINGS = [
"(another dwelling above)", ", insulated", ", insulated (assumed) ",
", ceiling insulated",
]
# List of strings we look for in the EPC data, where substrings indicate that the cavity is empty
UNINSULATED_CAVITY_SUBSTRINGS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, no insulation",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
]
def __init__(
self,
local_filepath,
sheet_name,
address1_colname,
postcode_colname,
full_address_colname,
landlord_property_id=None,
full_address_cols_to_concat=None,
missing_postcodes_method=None,
address1_extraction_method=None,
landlord_year_built=None,
landlord_uprn=None,
landlord_property_type=None,
landlord_built_form=None,
landlord_wall_construction=None,
landlord_heating_system=None,
landlord_existing_pv=None,
header=0
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
# Read in the data
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
self.standardised_asset_list = self.raw_asset_list.copy()
# Will be used to store aggregated figures against the various work types
self.work_type_figures = {}
self.work_type_breakdowns = {}
self.flat_data = None
self.duplicated_addresses = None
self.contact_details = None
self.contact_detail_fields = None
self.outcomes = None
self.outcomes_no_match = None
self.outcomes_for_output = None
self.master_surveyed = None
# We detect the presence of the non-intrusive columns
self.non_intrusives_present = "CIGA Check Required" in self.raw_asset_list.columns
# We detect if we have the old format of non-intruvies
self.old_format_non_intrusives_present = "WFT Findings" in self.raw_asset_list.columns
# Names of columns
self.landlord_property_id = landlord_property_id
self.address1_colname = address1_colname
self.postcode_colname = postcode_colname
self.full_address_colname = full_address_colname
self.landlord_year_built = landlord_year_built
self.landlord_uprn = landlord_uprn
self.landlord_property_type = landlord_property_type
self.landlord_built_form = landlord_built_form
self.landlord_wall_construction = landlord_wall_construction
self.landlord_heating_system = landlord_heating_system
self.landlord_existing_pv = landlord_existing_pv
# parameters for cleaning
self.full_address_cols_to_concat = full_address_cols_to_concat
self.missing_postcodes_method = missing_postcodes_method
self.address1_extraction_method = address1_extraction_method
self.debug_information = {
"property_type": None,
"wall_construction": None,
"heating_system": None,
"existing_pv": None
}
self.variable_mappings = {}
self.hubspot_data = None
self.rename_map = {}
self.keep_variables = []
# Finally, we handle the case where the landlord's property ID is actually the OS UPRN
if (self.landlord_uprn == self.landlord_property_id) and (self.landlord_property_id is not None):
self.standardised_asset_list[self.STANDARD_UPRN] = self.standardised_asset_list[self.landlord_uprn].copy()
# Update the reference to landlord UPRn
self.landlord_uprn = self.STANDARD_UPRN
# Handle the case when full address and address 1 are the same
if self.full_address_colname == self.address1_colname:
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.address1_colname].copy()
)
# Handle the case where the property type column is the same as the built type
if self.landlord_property_type == self.landlord_built_form:
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list[self.landlord_property_type].copy()
)
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
if method not in self.ADDRESS_1_CLEANING_METHODS:
raise ValueError(f"Method {method} for producing address1 not recognized")
if method == "first_two_words":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list[self.address1_colname] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
raise ValueError(f"Method {method} not recognized")
@staticmethod
def _address1_extraction(x):
pass
def create_property_id(self):
"""
This function creates the domna property ID, which is simply a hash of the full address and postcode
We want all figures to be positive
:return:
"""
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
def _make_hash(value):
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
# Normalize and remove special characters for cleaner ID
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
# Generate SHA-256 hash and truncate it
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
return f"{cleaned_value}-{short_hash}"
# Apply transformation
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
self.standardised_asset_list[self.full_address_colname] +
self.standardised_asset_list[self.postcode_colname]
).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash)
@staticmethod
def _strip_postcode_from_full_address(full_address, postcode):
cleaned = full_address.replace(postcode, "")
# Remove any trailing commas and spaces
cleaned = cleaned.rstrip(", ").strip(",").strip()
return cleaned
@classmethod
def _identify_multi_address(cls, address):
# We check if the address is comma separated
if "," in address:
address1_section = address.split(",")[0]
# We look for string in the form (x-y)
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
@staticmethod
def _convert_uprn(x):
"""
Used to convert UPRNS to integer strings
:param x: uprn to convert
:return: converted uprn
"""
if pd.isnull(x):
return x
# check if numeric
if np.isreal(x):
return str(int(x))
if str(x).isdigit():
return str(int(x))
return x
def init_standardise(self):
"""
This function is used to standardise the asset list
:return: standardised asset list
"""
# Remove rows without a postcode
if self.postcode_colname is not None:
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
# We clean up portential non-breaking spaces, and double spaces
for col in [
c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if
c is not None
]:
self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False)
if self.address1_colname is None:
if self.address1_extraction_method is None:
raise ValueError("Missing address 1 - please specify an extraction method")
self.address1_colname = self.STANDARD_ADDRESS_1
# If we do not have this, we produce it
self.standardised_asset_list = self._extract_address1(
asset_list=self.standardised_asset_list,
full_address_col=self.full_address_colname,
postcode_col=self.postcode_colname,
method=self.address1_extraction_method
)
if self.full_address_colname is None:
if not self.full_address_cols_to_concat:
raise ValueError("Missing full address - please specify columns to concatenate")
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.full_address_cols_to_concat].apply(
lambda x: ", ".join([y for y in x if not pd.isnull(y)]),
axis=1
)
)
else:
# Make sure to strip the postcode out of the full address
self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply(
lambda x: self._strip_postcode_from_full_address(
full_address=x[self.full_address_colname],
postcode=x[self.postcode_colname]
),
axis=1
)
# We create the domna property id
self.create_property_id()
# Clean up the UPRN column, if the landlord has provided them
if self.landlord_uprn is not None:
self.standardised_asset_list[self.landlord_uprn] = (
self.standardised_asset_list[self.landlord_uprn].apply(self._convert_uprn)
)
# We keep just the columns we care about and will work through the various columns and standardise
variables = [
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_uprn,
self.landlord_property_type,
self.landlord_built_form,
self.landlord_year_built,
self.landlord_wall_construction,
self.landlord_heating_system,
self.landlord_existing_pv
]
# Keep just non-null variables (e.g landlord may not provide uprn
self.keep_variables = [v for v in variables if v is not None]
self.rename_map = {
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
self.address1_colname: self.STANDARD_ADDRESS_1,
self.postcode_colname: self.STANDARD_POSTCODE,
self.full_address_colname: self.STANDARD_FULL_ADDRESS,
self.landlord_uprn: self.STANDARD_UPRN,
self.landlord_property_type: self.STANDARD_PROPERTY_TYPE,
self.landlord_built_form: self.STANDARD_BUILT_FORM,
self.landlord_year_built: self.STANDARD_YEAR_BUILT,
self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION,
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
self.landlord_existing_pv: self.STANDARD_EXISTING_PV
}
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
non_intrusive_columns = []
if self.non_intrusives_present:
non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES
if self.old_format_non_intrusives_present:
non_intrusive_columns = self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES
self.keep_variables += non_intrusive_columns
self.rename_map = {
**self.rename_map,
**dict(
zip(non_intrusive_columns, ["non-intrusives: " + c for c in non_intrusive_columns])
)
}
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
].apply(lambda x: self._identify_multi_address(x))
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
# we see instances of "average thermal transmittance" in the description
if self.landlord_wall_construction is not None:
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
"average thermal transmittance"
) == True,
"new build - average thermal transmittance",
self.standardised_asset_list[self.landlord_wall_construction]
)
else:
# We want to make sure that we have a column for wall construction
self.landlord_wall_construction = "landlord_wall_construction"
self.standardised_asset_list[self.landlord_wall_construction] = None
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime - year built has not been renamed
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
)
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
self.standardised_asset_list[self.landlord_year_built]
)
# Convert this to year
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].dt.year
)
else:
# We attempt to convert the year built to a datetime, by detecting the format and converting
def extract_year(date_str):
"""
Extracts the year from a date string in the format '01-Jul-YYYY'.
Returns the extracted year as an integer or None if the format is incorrect.
"""
known_errors = [
"#MULTIVALUE",
"This cell has an external reference that can't be shown or edited. Editing this cell will "
"remove the external reference.",
"ND"
]
if pd.isnull(date_str) or date_str in known_errors:
return None
if isinstance(date_str, str):
match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str)
if match:
return int(match.group(1)) # Extract the year and convert to integer
if "-" in date_str:
# We probably have a range
return int(date_str.split("-")[1].strip())
if isinstance(date_str, datetime):
return date_str.year
# Check if date_str is a year itself
if str(date_str).isdigit() & (len(str(date_str)) == 4):
return int(date_str)
# Remove any non-numeric characters
date_str = re.sub(r"\D", "", str(date_str))
if str(date_str).isdigit() & (len(str(date_str)) == 4):
return int(date_str)
raise NotImplementedError("Unhandled format for year built - implement me")
self.standardised_asset_list[self.landlord_year_built] = self.standardised_asset_list[
self.landlord_year_built
].apply(extract_year)
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
"standard_map": property_type_mappings.PROPERTY_MAPPING
},
self.landlord_built_form: {
"standard_values": built_form_mappings.STANDARD_BUILT_FORMS,
"standard_map": built_form_mappings.BUILT_FORM_MAPPINGS
},
self.landlord_wall_construction: {
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
},
self.landlord_heating_system: {
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
"standard_map": heating_mappings.HEATING_MAPPINGS
},
self.landlord_existing_pv: {
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
}
}
# Keep just entries where the key is not None
to_remap = {k: v for k, v in to_remap.items() if k is not None}
for variable, config in to_remap.items():
logger.info("Standardising variable: %s", variable)
values_to_remap = self.standardised_asset_list[variable].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
self.variable_mappings[variable] = remap_dictionary
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
# asset list is returned
for variable, mapping in self.variable_mappings.items():
pprint(f"Variable: {variable}")
pprint(mapping)
# Print a space
print("\n")
pprint("=======================================")
def apply_standardiation(self, override_empty_mappings=False):
"""
This function applies the standardisation to the asset list
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
if there are no categories which need remapping which is highly unlikely
:return:
"""
if not self.variable_mappings and not override_empty_mappings:
raise ValueError("Please run init_standardise first")
logger.info("Applying standardisation to asset list")
for variable, mapping in self.variable_mappings.items():
self.standardised_asset_list[variable + "_original_from_landlord"] = (
self.standardised_asset_list[variable].copy()
)
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
# Drop the dupes
pprint(
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
f"addresses - dropping"
)
# Keep a record of duplicates
self.duplicated_addresses = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy()
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
]
# Apply renames to our standard names
# Perform final variable selection and renaming:
# We add the original columns to the keep variables
self.keep_variables += [
k + "_original_from_landlord" for k in self.variable_mappings.keys()
]
self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename(
columns=self.rename_map
)
# We fill any standard columns that are not in the data because they were not provided by the landlord
missing_variables = [
v for v in [
self.STANDARD_EXISTING_PV,
self.STANDARD_HEATING_SYSTEM,
self.STANDARD_UPRN,
self.STANDARD_PROPERTY_TYPE,
self.STANDARD_YEAR_BUILT,
self.STANDARD_WALL_CONSTRUCTION,
self.STANDARD_HEATING_SYSTEM,
self.STANDARD_EXISTING_PV
] if v not in self.standardised_asset_list.columns
]
for v in missing_variables:
self.standardised_asset_list[v] = None
# Convert to string
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = (
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str)
)
def merge_data(self, df: pd.DataFrame):
"""
Used to insert data into the standardised asset list, based on the domna property id
:return:
"""
if self.DOMNA_PROPERTY_ID not in df.columns:
raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}")
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError(f"{self.DOMNA_PROPERTY_ID} contains duplicated IDs")
self.standardised_asset_list = self.standardised_asset_list.merge(
df, how="left", on=self.DOMNA_PROPERTY_ID
)
def extract_attributes(self):
# Used to extracty the typical attributes that we use to identify viable work
self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR] = (
self.standardised_asset_list[self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]] |
~self.standardised_asset_list[self.EPC_API_DATA_NAMES["photo-supply"]].isin(["0.0", 0, None, "", np.nan])
)
accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"]
# The logic here is:
# 1) Take the property type provided by the HA themselves
# 2) In absence of that, take the EPC property type
# 3) Otherwise use None
self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = self.standardised_asset_list.apply(
lambda x: estimate_number_of_floors(
property_type=(
x[self.STANDARD_PROPERTY_TYPE].title() if
x[self.STANDARD_PROPERTY_TYPE].title() in accepted_epc_property_types else (
x[self.EPC_API_DATA_NAMES["property-type"]] if not
pd.isnull(x[self.EPC_API_DATA_NAMES["property-type"]]) else None
)
)
),
axis=1
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].astype(float)
)
# Replace "" value with None
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].replace("", None)
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].astype(float)
)
# Estimate the perimeter
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
), axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = self.standardised_asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
floor_height=(
float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if
x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5
),
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
built_form=x[self.EPC_API_DATA_NAMES["built-form"]]
),
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = self.standardised_asset_list.apply(
lambda x: RoofAttributes(description=x[self.EPC_API_DATA_NAMES["roof-description"]]).process()[
"insulation_thickness"] if not pd.isnull(
x[self.EPC_API_DATA_NAMES["roof-description"]]) else None,
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].str.replace("+", "")
)
# We produce some additional fields
# 1) Is the SAP rating below C75
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]].astype(float) <=
self.FILLED_CAVITY_SAP_THRESHOLD
)
# 2) Flag anything where the EPC is older than 5 years
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
pd.to_datetime(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
).dt.year < self.EPC_YEAR_THRESHOLD
)
self.process_age_band()
def process_age_band(self):
processed_age_band = []
for _, x in self.standardised_asset_list.iterrows():
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES
):
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": None,
"does_age_band_match_epc_age_band": "No EPC Age Band"
}
)
continue
# We exatract the upper and lower bounds
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
"England and Wales: 2007 onwards", "England and Wales: 2012 onwards"
]:
year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[
"construction-age-band"]] == "England and Wales: 2007 onwards" else 2012
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
else "EPC Age Band is older than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": year_lower_bound,
"epc_year_upper_bound": None,
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900":
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": 1899,
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
)
else "EPC Age Band is different from Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
# Oherwise, we extract the upper and lower bounds
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1]
lower_date, upper_date = age_band.split("-")
age_band_matches = (
"EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (
x[self.STANDARD_YEAR_BUILT] <= float(upper_date)
)
else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(lower_date),
"epc_year_upper_bound": int(upper_date),
"does_age_band_match_epc_age_band": age_band_matches
}
)
processed_age_band = pd.DataFrame(processed_age_band)
self.standardised_asset_list = self.standardised_asset_list.merge(
processed_age_band, how="left"
)
def identify_worktypes(self, cleaned):
# If we have non-intrusives completed, we can use this to identify work types
######################################################
# Empty cavity:
######################################################
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
# 2) The age is before 1995
# 3) We don't remove anything that haas access issues yet
if self.non_intrusives_present:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"])
)
elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().isin(
["empty cavity", "partial fill"]
)
)
else:
# We set the filter to False, as we have no non-intrusives
non_intrusives_wall_filter = False
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= 2002) &
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
] <= self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
# Let's also flag work that looks eligible without the SAP filter
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_sap_filter"] = (
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= 2002)
)
# If non_intrusive_indicates_empty_cavity is True,
# set non_intrusive_indicates_empty_cavity_no_sap_filter to False
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_sap_filter"] = np.where(
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"],
False,
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_sap_filter"]
)
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= 1995
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
self.standardised_asset_list["epc_indicates_empty_cavity_no_sap_filter"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= 1995
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]] > self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) &
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= 2002) &
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
] <= self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
self.standardised_asset_list["landlord_data_indicates_empty_cavity_no_sap_filter"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) &
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= 2002) &
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
] > self.EMPTY_CAVITY_SAP_THRESHOLD
)
)
# If the EPC is esimtated, we defer to the non-intrusives
self.standardised_asset_list["epc_indicates_empty_cavity"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
self.standardised_asset_list["estimated"]
),
False,
self.standardised_asset_list["epc_indicates_empty_cavity"]
)
# Finally, we create a flag to indicate that the cavity is empty, based on the criteria above
self.standardised_asset_list["cavity_is_empty"] = (
non_intrusives_wall_filter |
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) |
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"])
)
######################################################
# Extraction
######################################################
# as needing a CIGA check. What is the logic we should be applying here?
if self.non_intrusives_present:
extraction_wall_filter = (
(self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") &
(self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) &
(~self.standardised_asset_list['non-intrusives: Material'].isin(
["GREY LOOSE BEAD", "COMPACTED BEAD", "FIBRE BATT NO CAVITY", "EMPTY NARROW BELOW 30mm"]
)
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter & (
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
)
# Also include work without the SAP filter as optimistic
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] = (
extraction_wall_filter
)
elif self.old_format_non_intrusives_present:
print("Review these categories with Kieran")
extraction_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
["retro drilled", "retro filled", "fibre from build", "polybead"]
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter & (
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] = (
extraction_wall_filter
)
else:
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = False
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] = False
# Adjust
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] = np.where(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"],
False,
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"]
)
######################################################
# Solar
######################################################
# Criteria:
# Check 1: Does the property have a valid heating system?
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["air source heat pump", "ground source heat pump", "high heat retention storage heaters"]
)
)
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["electric storage heaters", "room heaters", "electric radiators"]
)
)
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().str.contains("air source heat pump|ground source heat pump")
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters"
) & (
self.standardised_asset_list[self.EPC_API_DATA_NAMES[
"mainheatcont-description"]] == "Controls for high heat retention storage heaters"
)
)
)
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters|room heaters"
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
] != "Controls for high heat retention storage heaters"
)
)
# Basic check - both of the previous two shouldn't be true simultaneously
if (
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] &
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
).sum():
raise ValueError("Both heating system checks are true - this should not be possible")
# Check 2: Does the property have solar already
if self.non_intrusives_present:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF"
)
elif self.old_format_non_intrusives_present:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
["solar pv on roof"]
)
)
else:
# We don't have an indication
existing_solar_non_intrusives_check = False
self.standardised_asset_list["property_has_solar"] = (
(self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") |
existing_solar_non_intrusives_check |
(self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
)
# Check 3: Does the property meet the fabric condition
# Solar PV installs are subject to the minimum insulation requirements which means:
# 1) one of the following insulation measures must be installed as part of the same
# ECO4 project:
# • roof insulation (flat roof, pitched roof, room-in-roof)
# • exterior facing wall insulation (cavity wall, solid wall)
# • party cavity wall insulation
# • floor insulation (solid and underfloor)
#
# OR
#
# all measures (except any exempted measure referred to in paragraph 4.28)
# listed in paragraph a) must already be installed
#
# With this in mind, we look for 2 clases
# 1) The property is fully insulated apart from the loft (<200mm insulation)
# 2) THe property is fully insulated
print("Should we include cavity properties where they might be uninsulated?")
self.standardised_asset_list["solar_landlord_walls_insulated"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
[
"filled cavity", "insulated solid brick", "insulated timber frame",
]
)
)
if self.non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EWI", "RETRO DRILLED", "FILLED AT BUILD"]
)
)
elif self.old_format_non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
["retro drilled", "retro filled", "ewi", "retro drilled/ solid"]
)
)
else:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False
# TODO: We don't have information about the roof from this landlord
# We merge on the u-value for average thermal transmittance
walls_uvalue_data = pd.DataFrame(cleaned["walls-description"])
walls_uvalue_data = walls_uvalue_data[
~pd.isnull(walls_uvalue_data["thermal_transmittance"])
][["original_description", "thermal_transmittance"]].rename(
columns={
"original_description": self.EPC_API_DATA_NAMES["walls-description"],
"thermal_transmittance": "walls_u_value"
}
)
self.standardised_asset_list = self.standardised_asset_list.merge(
walls_uvalue_data, how="left", on=self.EPC_API_DATA_NAMES["walls-description"]
)
self.standardised_asset_list["solar_epc_walls_insulated"] = (
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES[
"walls-description"]].str.lower().str.contains(
"|".join(
self.EPC_INSULATED_WALLS_SUBSTRINGS)
)
) | (
self.standardised_asset_list[
"walls_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(x) else False
)
)
)
# We merge on the u-value for average thermal transmittance
roof_roof_data = pd.DataFrame(cleaned["roof-description"])
roof_roof_data = roof_roof_data[
["original_description", "thermal_transmittance", "is_pitched", "is_loft"]
].rename(
columns={
"original_description": self.EPC_API_DATA_NAMES["roof-description"],
"thermal_transmittance": "roof_u_value",
}
)
self.standardised_asset_list = self.standardised_asset_list.merge(
roof_roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
)
# If the u-value of a roof is less than 0.7 we consider it insulated
self.standardised_asset_list["solar_epc_roof_insulated"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS),
) | (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) >= 200 if str(x).isdigit() else False
)
) | (
self.standardised_asset_list["roof_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(x) else False
)
)
)
self.standardised_asset_list["solar_epc_loft_needs_topup"] = (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) < 200 if str(x).isdigit() else False
) | (
(
self.standardised_asset_list["is_loft"] | self.standardised_asset_list["is_pitched"]
) & (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].isin(
["below average", "none"]
)
)
)
)
self.standardised_asset_list["epc_has_floor_recommendation"] = (
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
)
# We merge on the u-value for average thermal transmittance
floors_uvalue_data = pd.DataFrame(cleaned["floor-description"])
floors_uvalue_data = floors_uvalue_data[
~pd.isnull(floors_uvalue_data["thermal_transmittance"])
][["original_description", "thermal_transmittance"]].rename(
columns={
"original_description": self.EPC_API_DATA_NAMES["floor-description"],
"thermal_transmittance": "floor_u_value"
}
)
# Merge on
self.standardised_asset_list = self.standardised_asset_list.merge(
floors_uvalue_data, how="left", on=self.EPC_API_DATA_NAMES["floor-description"]
)
# We assume that a U-value of 0.5 or below is indicative of an insulated floor
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] = (
(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["floor-description"]].str
.lower().str.contains("solid")
) & (
~self.standardised_asset_list["epc_has_floor_recommendation"]
) & (
# We do not utilise estimated EPCs for this method because we will always find that
# "epc_has_floor_recommendation" is False
(self.standardised_asset_list["estimated"] == False)
)
) | (
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["floor-description"]].str.lower().str.contains("solid")
) & (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["floor-description"]].str.lower()
.str.contains(", insulated")
)
)
)
####################################
# Check solar eligibility
####################################
# Set up the filters to stop repetition
correct_heating_system = (
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] |
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"]
)
needs_heating_upgrade = (
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] |
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
)
# The requirements for walls are:
# 1) walls are insulated
# 2) property is a cavity (can be done insulated or not)
walls_meet_solar_requirements = (
self.standardised_asset_list["solar_landlord_walls_insulated"] |
self.standardised_asset_list["solar_epc_walls_insulated"] |
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] |
self.standardised_asset_list["cavity_is_empty"] |
(self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].str.contains("cavity"))
)
not_a_flat = (
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat"
)
self.standardised_asset_list["solar_eligible_solid_floor"] = (
# Property isn't a flag
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Floor type check
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
self.standardised_asset_list["solar_eligible_solid_floor_sap_above_threshold"] = (
# Property isn't a flag
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Floor type check
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# With heating upgrade
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Floor type check
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP Below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# With heating upgrade, above threshold
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade_sap_above_threshold"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Floor type check
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Because the EPC data can be contradictrory, we remove any overlap
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade"] = np.where(
self.standardised_asset_list["solar_eligible_solid_floor"],
False,
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade"]
)
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade_sap_above_threshold"] = np.where(
self.standardised_asset_list["solar_eligible_solid_floor_sap_above_threshold"],
False,
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade_sap_above_threshold"]
)
# We shouldn't have an overlap
if (
self.standardised_asset_list["solar_eligible_solid_floor"] &
self.standardised_asset_list["solar_eligible_solid_floor_needs_heating_upgrade"]
).sum():
raise ValueError("Both heating upgrade and no heating upgrade are true - this should not be possible")
# Solid floor but needs a loft top-up
self.standardised_asset_list["solar_eligible_solid_floor_needs_loft"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Check floor
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Solid floor, needs loft, above SAP thresold
self.standardised_asset_list["solar_eligible_solid_floor_needs_loft_sap_above_threshold"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Check floor
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Needs loft & heating
self.standardised_asset_list["solar_eligible_solid_floor_needs_loft_needs_heating_upgrade"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Floor type
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
self.standardised_asset_list[
"solar_eligible_solid_floor_needs_loft_needs_heating_upgrade_sap_above_threshold"
] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Floor type
self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Other floor type, fully insulated
self.standardised_asset_list["solar_eligible_other_floor"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Floor type
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
self.standardised_asset_list["solar_eligible_other_floor_sap_above_threshold"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Floor type - other types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# With heating upgrade
self.standardised_asset_list["solar_eligible_other_floor_needs_heating_upgrade"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Other floor types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# With heating upgrade, SAP above threshold
self.standardised_asset_list["solar_eligible_other_floor_needs_heating_upgrade_sap_above_threshold"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof is insulated
self.standardised_asset_list["solar_epc_roof_insulated"] &
# Other floor types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Check for overlap
if (
self.standardised_asset_list["solar_eligible_other_floor_needs_heating_upgrade"] &
self.standardised_asset_list["solar_eligible_other_floor_needs_heating_upgrade_sap_above_threshold"]
).sum():
raise ValueError("Both heating upgrade and no heating upgrade are true - this should not be possible")
# Other floor type, needs loft top-up
self.standardised_asset_list["solar_eligible_other_floor_needs_loft"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof need loft top-up
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Other floor types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Other floor type, needs loft top-up, SAP above threshold
self.standardised_asset_list["solar_eligible_other_floor_needs_loft_sap_above_threshold"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof need loft top-up
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Other floor types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# With heating upgrade
self.standardised_asset_list["solar_eligible_other_floor_needs_loft_needs_heating_upgrade"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof need loft top-up
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Other floor types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP below threshold
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
self.standardised_asset_list[
"solar_eligible_other_floor_needs_loft_needs_heating_upgrade_sap_above_threshold"
] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof need loft top-up
self.standardised_asset_list["solar_epc_loft_needs_topup"] &
# Other floor types
~self.standardised_asset_list["solar_epc_floor_is_solid_no_recommendation"] &
# SAP above threshold
~self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW]
)
# Drop anything we don't need
self.standardised_asset_list = self.standardised_asset_list.drop(
columns=["walls_u_value", "roof_u_value", "floor_u_value"]
)
# Adjust flagged extraction jobs to remove anything for solar
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
~self.standardised_asset_list["solar_eligible_solid_floor"] &
~self.standardised_asset_list["solar_eligible_solid_floor_needs_loft"]
# ~self.standardised_asset_list["solar_eligible_other_floor"] &
# ~self.standardised_asset_list["solar_eligible_other_floor_needs_loft"]
)
# Finally, we note why each property has been flagged
self.standardised_asset_list["cavity_reason"] = None
self.standardised_asset_list["cavity_reason"] = np.where(
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"],
"Non-Intrusive Data Showed Empty Cavity",
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_sap_filter"],
"Non-Intrusive Data Showed Empty Cavity but all SAP scores allowed",
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
),
"EPC Data Showed Empty Cavity",
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity_no_sap_filter"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_sap_filter"]
),
"EPC Data Showed Empty Cavity but all SAP scores allowed",
self.standardised_asset_list["cavity_reason"]
)
# Landlord data
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
~self.standardised_asset_list["epc_indicates_empty_cavity"]
),
"Landlord Data Showed Empty Cavity",
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["landlord_data_indicates_empty_cavity_no_sap_filter"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_sap_filter"] &
~self.standardised_asset_list["epc_indicates_empty_cavity_no_sap_filter"]
),
"Landlord Data Showed Empty Cavity but all SAP scores allowed",
self.standardised_asset_list["cavity_reason"],
)
# Flag extraction
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"Non-Intrusive Data Showed Cavity Extraction",
self.standardised_asset_list["cavity_reason"]
)
# extraction no sap filter
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"Non-Intrusive Data Showed Cavity Extraction but all SAP scores allowed",
self.standardised_asset_list["cavity_reason"]
)
######################################################
# Flag solar
######################################################
self.standardised_asset_list["solar_reason"] = None
# Map of variables and fill values for the solar_reason variable
solar_reason_map = {
"solar_eligible_solid_floor": "Solar Eligible, Solid Floor",
"solar_eligible_solid_floor_sap_above_threshold": "Solar Eligible, Solid Floor, SAP Above Threshold",
"solar_eligible_solid_floor_needs_heating_upgrade": (
"Solar Eligible, Solid Floor, Needs Heating Upgrade"
),
"solar_eligible_solid_floor_needs_heating_upgrade_sap_above_threshold": (
"Solar Eligible, Solid Floor, Needs Heating Upgrade, SAP Above Threshold"
),
"solar_eligible_solid_floor_needs_loft": "Solar Eligible, Solid Floor, Needs Loft",
"solar_eligible_solid_floor_needs_loft_sap_above_threshold": (
"Solar Eligible, Solid Floor, Needs Loft, SAP Above Threshold"
),
"solar_eligible_solid_floor_needs_loft_needs_heating_upgrade": (
"Solar Eligible, Solid Floor, Needs Loft, Needs Heating Upgrade"
),
"solar_eligible_solid_floor_needs_loft_needs_heating_upgrade_sap_above_threshold": (
"Solar Eligible, Solid Floor, Needs Loft, Needs Heating Upgrade, SAP Above Threshold"
),
"solar_eligible_other_floor": "Solar Eligible, Other Floor",
"solar_eligible_other_floor_sap_above_threshold": "Solar Eligible, Other Floor, SAP Above Threshold",
"solar_eligible_other_floor_needs_heating_upgrade": "Solar Eligible, Other Floor, Needs Heating Upgrade",
"solar_eligible_other_floor_needs_heating_upgrade_sap_above_threshold": (
"Solar Eligible, Other Floor, Needs Heating Upgrade, SAP Above Threshold"
),
"solar_eligible_other_floor_needs_loft": "Solar Eligible, Other Floor, Needs Loft",
"solar_eligible_other_floor_needs_loft_sap_above_threshold": (
"Solar Eligible, Other Floor, Needs Loft, SAP Above Threshold"
),
"solar_eligible_other_floor_needs_loft_needs_heating_upgrade": (
"Solar Eligible, Other Floor, Needs Loft, Needs Heating Upgrade"
),
"solar_eligible_other_floor_needs_loft_needs_heating_upgrade_sap_above_threshold": (
"Solar Eligible, Other Floor, Needs Loft, Needs Heating Upgrade, SAP Above Threshold"
)
}
for variable, reason in solar_reason_map.items():
self.standardised_asset_list["solar_reason"] = np.where(
self.standardised_asset_list[variable],
reason,
self.standardised_asset_list["solar_reason"]
)
# Flag anything that has existing outcomes
if self.outcomes is not None:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(self.standardised_asset_list["Surveyed"] > 0) |
(self.standardised_asset_list["Installer Refusal"] > 0)
),
None,
self.standardised_asset_list["cavity_reason"]
)
if self.master_surveyed is not None:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(~pd.isnull(self.standardised_asset_list["SUBMISSION DATE"]))
),
None,
self.standardised_asset_list["cavity_reason"]
)
blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats"
]
non_blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats"
]
# Produce some aggregate figures
self.work_type_figures = {
**non_blocks_of_flats["cavity_reason"].value_counts().to_dict(),
**{
k + " (Block of flats)": v for k, v in
blocks_of_flats["solar_reason"].value_counts().to_dict().items()
},
**self.standardised_asset_list["solar_reason"].value_counts().to_dict()
}
# We prepare outcomes for output
if self.outcomes is not None:
logger.info("Preparing outcomes for output")
identified_work = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["cavity_reason"]) |
~pd.isnull(self.standardised_asset_list["solar_reason"])
][self.DOMNA_PROPERTY_ID].values
self.outcomes_for_output = self.outcomes[
self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work)
]
def flat_analysis(self):
# We need to deduce the building name - we strip out the house number
# We want to deduce if flats have 50% of the properties below C75
# We group by postcode and property type
grouped = self.standardised_asset_list.groupby(
[self.STANDARD_POSTCODE, self.STANDARD_PROPERTY_TYPE]
)
flat_data = []
for _, group in grouped:
if "flat" in group[self.STANDARD_PROPERTY_TYPE].values:
num_flats = group[self.STANDARD_PROPERTY_TYPE].shape[0]
num_below_c75 = group[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
].lt(self.FILLED_CAVITY_SAP_THRESHOLD).sum()
# Check if any flats are below C69
num_flats_below_c69 = group[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
].lt(69).sum()
flat_data.append(
{
"Postcode": group[self.STANDARD_POSTCODE].iloc[0],
"Property Type": "Flat",
"Number of Flats with EPC": num_flats,
"Number of Flats below C75": num_below_c75,
"Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats),
"Number of Flats Below C69": num_flats_below_c69,
}
)
flat_data = pd.DataFrame(flat_data)
self.flat_data = flat_data
@staticmethod
def split_full_name(x):
if pd.isnull(x):
return None, None, None
x = x.lower()
titles = ["mr", "mrs", "ms", "miss", "dr", "prof"]
# Remove titles
detected_title = [title for title in titles if x.startswith(title)]
if detected_title:
for title in detected_title:
x = x.replace(title, "")
x = x.strip()
first_name, last_name = x.split(" ")[0], x.split(" ")[-1]
title = detected_title[0].title() if detected_title else None
return title, first_name.title(), last_name.title()
def load_contact_details(
self,
local_filepath,
sheet_name,
landlord_property_id,
phone_number_column=None,
email_column=None,
fullname_column=None,
firstname_column=None,
lastname_column=None
):
self.contact_detail_fields = {
"landlord_property_id": landlord_property_id,
"phone_number": phone_number_column,
"email": email_column,
"fullname": fullname_column,
"firstname": firstname_column,
"lastname": lastname_column
}
details_colnames = [
phone_number_column, email_column, fullname_column, firstname_column, lastname_column
]
# We'll fill them
none_details = [x for x in details_colnames if x is None]
details_colnames = [x for x in details_colnames if x is not None]
contact_details = pd.read_excel(
local_filepath, sheet_name=sheet_name
)[[self.contact_detail_fields["landlord_property_id"]] + details_colnames]
contact_details = contact_details[
~pd.isnull(contact_details[self.contact_detail_fields["landlord_property_id"]])
]
# Fill anything we don't have
for detail in none_details:
contact_details[detail] = None
if fullname_column and not (firstname_column and lastname_column):
contact_details["title"], contact_details["first_name"], contact_details["last_name"] = zip(
*contact_details[fullname_column].apply(self.split_full_name)
)
else:
raise NotImplementedError("Implement me")
self.contact_details = contact_details
def prepare_for_crm(self, company_domain, crm_pipeline_name, first_dealstage, assigned_surveyors):
"""
This function prepares the data for upload into Hubspot
:return:
"""
# This is a placeholder for now
# This maps the opportunities as we reference them, to the product data as stored in Hubspot
product_lookup_table = {
"Non-Intrusive Data Showed Cavity Extraction": {
"name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500
},
"Non-Intrusive Data Showed Empty Cavity": {
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
},
"Non-Intrusive Data Showed Empty Cavity but all SAP scores allowed": {
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
},
"Non-Intrusive Data Showed Cavity Extraction but all SAP scores allowed": {
"name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500
},
"EPC Data Showed Empty Cavity": {
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
},
"Solid Floor, Insulated, No Solar": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
},
"Solid Floor, Insulated, Needs Loft": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
},
"Other Floor, Insulated, No Solar": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
},
"Other Floor, Insulated, Needs Loft": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
}
}
# We check if all products are covered in the lookup table
cavity_products = self.standardised_asset_list["cavity_reason"].unique()
solar_products = self.standardised_asset_list["solar_reason"].unique()
# Check if there any options not in out lookup table
if (
any(x for x in cavity_products if x not in product_lookup_table) or
any(x for x in solar_products if x not in product_lookup_table)
):
raise ValueError("We have products not referenced in the lookup table - check this")
programme_data = self.standardised_asset_list.copy()
# Exclusions - these are properties we won't treat for the moment
product_exclusions = [
"Other Floor, Insulated, No Solar",
"Other Floor, Insulated, Needs Loft"
]
if product_exclusions:
logger.warning("Excluding products: %s", product_exclusions)
programme_data = programme_data[programme_data["solar_reason"].isin(product_exclusions) == False]
# Merge on the contact details
programme_data = programme_data.merge(
self.contact_details,
how="left",
left_on=self.STANDARD_LANDLORD_PROPERTY_ID,
right_on=self.landlord_property_id,
)
programme_data["Company Domain Name <COMPANY domain>"] = company_domain
# Append the product data onto the programme data
programme_data["cavity_product"] = programme_data["cavity_reason"].map(
lambda x: product_lookup_table.get(x, {"name": None})["name"]
)
programme_data["solar_product"] = programme_data["solar_reason"].map(
lambda x: product_lookup_table.get(x, {"name": None})["name"]
)
programme_data["domna_product"] = programme_data["solar_reason"].copy()
programme_data["domna_product"] = np.where(
pd.isnull(programme_data["domna_product"]),
programme_data["solar_product"],
programme_data["domna_product"]
)
# We filter just on rows where we have a product
programme_data = programme_data[
~pd.isnull(programme_data["domna_product"])
]
programme_data = programme_data.drop(columns=["solar_product", "cavity_product"])
product_df = (
pd.DataFrame(product_lookup_table).T[["name", "id", "unit_price"]]
.reset_index()
.rename(
columns={
"name": "Name <LINE_ITEM name>",
"id": 'Product ID <LINE_ITEM hs_product_id>',
"unit_price": 'Unit price <LINE_ITEM price>',
"index": "domna_product"
}
)
)
product_df['Quantity <LINE_ITEM quantity>'] = 1
# Append on the product data
programme_data = programme_data.merge(
product_df,
how="left",
on="domna_product",
)
# Add in deal and pipeline information
programme_data["dealname"] = programme_data[self.STANDARD_FULL_ADDRESS] + " : " + programme_data[
"domna_product"]
programme_data['Pipeline <DEAL pipeline>'] = crm_pipeline_name
programme_data['Deal Stage <DEAL dealstage>'] = first_dealstage
programme_data['Associations: Listing'] = "Property Owner"
programme_data = programme_data.merge(
assigned_surveyors.rename(
columns={self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID}
), how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
)
# This maps the hubspot schema to the template. Anything that is not covered in this will be flagged
schema_mappings = {
'Name <LISTING hs_name>': self.DOMNA_PROPERTY_ID, # TODO: Maybe change this?
'Company Domain Name <COMPANY domain>': 'Company Domain Name <COMPANY domain>',
'Email <CONTACT email>': (
self.contact_detail_fields["email"] if self.contact_detail_fields["email"] else None
), # TODO: Review
'First Name <CONTACT firstname>': (
self.contact_detail_fields["firstname"] if self.contact_detail_fields["firstname"] else None
), # TODO: Review
'Last Name <CONTACT lastname>': (
self.contact_detail_fields["lastname"] if self.contact_detail_fields["lastname"] else None
), # TODO: Review
'Phone <CONTACT phone>': (
self.contact_detail_fields["phone_number"] if self.contact_detail_fields["phone_number"] else None
), # TODO: Review
'Full Address <LISTING full_address>': self.STANDARD_FULL_ADDRESS,
'Address 1 <LISTING hs_address_1>': self.STANDARD_ADDRESS_1,
'Address 2 <LISTING hs_address_2>': None, # TODO: Don't have this for the moment
'Postcode <LISTING hs_zip>': self.STANDARD_POSTCODE,
'Property Type <LISTING property_type>': self.STANDARD_PROPERTY_TYPE,
'Property Sub Type <LISTING property_sub_type>': None, # TODO: Don't have this for the moment
'Bedroom(s) <LISTING hs_bedrooms>': None, # TODO: Don't have this for the moment
'Domna Property ID <LISTING domna_property_id>': self.DOMNA_PROPERTY_ID,
'National UPRN <LISTING national_uprn>': (
self.STANDARD_UPRN if self.STANDARD_UPRN is not None else self.EPC_API_DATA_NAMES["uprn"]
),
'Owner Property ID <LISTING owner_property_id>': self.STANDARD_LANDLORD_PROPERTY_ID,
'Wall Construction <LISTING wall_construction>': self.STANDARD_WALL_CONSTRUCTION,
'Heating System <LISTING heating_system>': self.STANDARD_HEATING_SYSTEM,
'Year Built <LISTING hs_year_built>': self.STANDARD_YEAR_BUILT,
'Boiler Make <LISTING boiler_make>': None, # TODO: Don't have this for the moment
'Boiler Model <LISTING boiler_model>': None, # TODO: Don't have this for the moment
'Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>': None,
# TODO: Don't have this for the moment
'Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>': (
"non-intrusives: Construction" if self.non_intrusives_present else None
),
'Non-intrusives: Insulation <LISTING non_intrusives__insulation>': (
"non-intrusives: Insulated" if self.non_intrusives_present else None
),
'Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>': (
"non-intrusives: Material" if self.non_intrusives_present else None
),
'Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>': (
'non-intrusives: CIGA Check Required' if self.non_intrusives_present else None
),
'Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>': (
'non-intrusives: PV, ACCESS ISSUE, SEE NOTES' if self.non_intrusives_present else None
),
'Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>': (
'non-intrusives: OFF GAS - ROOF ORIENTATION' if self.non_intrusives_present else None
),
'Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>': (
'non-intrusives: Any further surveyor notes' if self.non_intrusives_present else None
),
'Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>': (
'non-intrusives: Surveyors Name' if self.non_intrusives_present else None
),
'CIGA: Date Requested <LISTING ciga__date_requested>': None, # TODO: Don't have this for the moment
'CIGA: Cavity Guarantee Found <LISTING ciga__cavity_guarantee_found>': None,
'Last EPC: Is Estimated <LISTING last_epc__is_estimated>': self.EPC_API_DATA_NAMES["estimated"],
'Last EPC: EPC Rating <LISTING last_epc__epc_rating>': self.EPC_API_DATA_NAMES["current-energy-rating"],
'Last EPC: SAP Rating <LISTING last_epc__sap_rating>': self.EPC_API_DATA_NAMES["current-energy-efficiency"],
'Last EPC: Main Heating Description <LISTING last_epc__main_heating_description>': self.EPC_API_DATA_NAMES[
"mainheat-description"],
'Last EPC: Heating Controls <LISTING last_epc__heating_controls>': self.EPC_API_DATA_NAMES[
"mainheatcont-description"],
'Last EPC: Lodgement Date <LISTING last_epc__lodgement_date>': self.EPC_API_DATA_NAMES["inspection-date"],
'Last EPC: Floor Area <LISTING last_epc__floor_area>': self.EPC_API_DATA_NAMES["total-floor-area"],
'Last EPC: Wall <LISTING last_epc__wall>': self.EPC_API_DATA_NAMES["walls-description"],
'Last EPC: Roof <LISTING last_epc__roof>': self.EPC_API_DATA_NAMES["roof-description"],
'Last EPC: Floor <LISTING last_epc__floor>': self.EPC_API_DATA_NAMES["floor-description"],
'Last EPC: Room Height <LISTING last_epc__room_height>': self.EPC_API_DATA_NAMES["floor-height"],
'Last EPC: Age Band <LISTING last_epc__age_band>': self.EPC_API_DATA_NAMES["construction-age-band"],
'Deal Stage <DEAL dealstage>': 'Deal Stage <DEAL dealstage>',
'Pipeline <DEAL pipeline>': 'Pipeline <DEAL pipeline>',
'Expected Commencement Date <DEAL expected_commencement_date>': None, # TODO: Need to set this,
'Deal Name <DEAL dealname>': "dealname", # Need to create this,
'Product ID <LINE_ITEM hs_product_id>': 'Product ID <LINE_ITEM hs_product_id>',
'Name <LINE_ITEM name>': 'Name <LINE_ITEM name>',
'Unit price <LINE_ITEM price>': 'Unit price <LINE_ITEM price>',
'Quantity <LINE_ITEM quantity>': 'Quantity <LINE_ITEM quantity>',
'Deal Owner': 'surveyor_email',
'Amount <DEAL amount>': 'Unit price <LINE_ITEM price>',
}
# We now create the finalised dataset to be uploaded into Hubspot
variables_required = list(schema_mappings.values())
variables_required = [v for v in variables_required if v is not None]
# We now flag anything that has a none value, which is information we haven't got right now
none_variables = [k for k, v in schema_mappings.items() if v is None]
# We'll add placeholder columns for the None variables
programme_data = programme_data[variables_required]
for col in none_variables:
programme_data[col] = None
programme_data = programme_data.rename(
columns={v: k for k, v in schema_mappings.items() if v is not None}
)
self.hubspot_data = programme_data
def flag_outcomes(
self,
outcomes_filepath,
outcomes_sheetname,
outcomes_postcode,
outcomes_houseno
):
if outcomes_filepath is None:
return
# ToDO: Parameterise for future use?
self.outcomes = pd.read_excel(outcomes_filepath, sheet_name=outcomes_sheetname)
self.outcomes["row_id"] = self.outcomes.index
logger.info("Matching outcomes to asset list")
# Merge the outcomes onto the asset list - we check we're able to match sufficiently well
lookup = []
nomatch = []
for _, x in tqdm(self.outcomes.iterrows(), total=len(self.outcomes)):
address_clean = x["Address"].lower().replace(",", "").replace(" ", " ")
matched = self.standardised_asset_list[
(self.standardised_asset_list[
self.STANDARD_FULL_ADDRESS
].str.lower().str.replace(",", "").str.replace(" ", " ") == address_clean)
]
if matched.shape[0] == 1:
lookup.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
if "UPRN" in x:
matched = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] == x["UPRN"]
]
if matched.shape[0] == 1:
lookup.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
matched = self.standardised_asset_list[
(self.standardised_asset_list[self.STANDARD_POSTCODE] == x[outcomes_postcode])
].copy()
if not matched.empty:
matched["houseno"] = matched.apply(
lambda x: SearchEpc.get_house_number(x[self.STANDARD_ADDRESS_1], x[self.STANDARD_POSTCODE]),
axis=1
)
matched = matched[
matched["houseno"].astype(str) == str(x[outcomes_houseno])
]
if matched.shape[0] == 1:
lookup.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
elif not matched.empty:
# Use levenstein distance to match
matched["address"] = matched[self.STANDARD_ADDRESS_1] + " " + matched[self.STANDARD_POSTCODE]
best_match = process.extractOne(x["Address"], matched[self.STANDARD_FULL_ADDRESS].values)[0]
matched = matched[matched[self.STANDARD_FULL_ADDRESS] == best_match]
lookup.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
nomatch.append(x["row_id"])
self.outcomes_no_match = self.outcomes[self.outcomes["row_id"].isin(nomatch)]
lookup = pd.DataFrame(lookup)
# We will have duplicated domna property IDs, where a surveyor has been to a property multiple times
# Where we have multiple rows, we want to make a call on what the action should be. For example,
# there may be properties that have been visited multiple times where the outcome was "See notes" implying
# that the surveyor had a detailed explanation as to why they couldn't gain access so if this has
# happened multiple times, in this case we judge that the work may not be viable
date_col = "Week Commencing" if "Week Commencing" in self.outcomes else "Survey Date"
lookup = lookup.merge(
self.outcomes[["row_id", "Outcome", "Notes", date_col]], how="left", on="row_id"
)
visit_counts = (
lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"]
.count()
.reset_index()
.rename(columns={"row_id": "visit_count"})
.sort_values("visit_count", ascending=False)
)
pivot_df = lookup.groupby(["domna_property_id", "Outcome"]).size().unstack(fill_value=0).reset_index()
pivot_df = pivot_df.merge(
visit_counts, how="left", on="domna_property_id"
)
if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise Exception("We have duplicated property IDs in the outcomes data")
# We merge this data onto outcomes
self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin(lookup["row_id"].values)
self.outcomes = self.outcomes.merge(lookup[["row_id", "domna_property_id"]], how="left", on="row_id")
# We merge out pivoted outcomes onto the asset list
self.standardised_asset_list = self.standardised_asset_list.merge(
pivot_df, how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id"
)
self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False)
def flag_survey_master(
self,
master_filepaths,
master_to_asset_list_filepath=None
):
# TODO: This probably needs further expansion
if not master_filepaths:
return
if master_to_asset_list_filepath is not None:
id_map = pd.read_csv(master_to_asset_list_filepath)
else:
id_map = pd.DataFrame()
logger.info("Getting masters and merging onto asset list")
master_surveyed = []
for filepath in master_filepaths:
master_data = pd.read_csv(filepath)
# Strip columns
master_data.columns = [c.strip() for c in master_data.columns]
if not id_map.empty:
master_data = master_data.merge(
id_map, how="left", on=['NO.', 'Street / Block Name', 'Post Code']
)
install_col = (
"INSTALLED OR CANCELLED" if "INSTALLED OR CANCELLED" in master_data.columns
else "INSTALL / CANCELLATION DATE"
)
# We just need to check if any were cancelled
master_to_append = master_data[
["UPRN", install_col, "SUBMISSION DATE"]
].rename(columns={"UPRN": self.STANDARD_LANDLORD_PROPERTY_ID, install_col: "survey_status"})
master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel")
master_surveyed.append(master_to_append)
master_surveyed = pd.concat(master_surveyed)
master_surveyed = master_surveyed[~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID])]
master_surveyed = master_surveyed[
~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin(
["NOT ON ASSET LIST", "Missing From Asset List"]
)
]
master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[
self.STANDARD_LANDLORD_PROPERTY_ID
].astype(str)
# We de-dupe crudely on landlord property id
self.master_surveyed = master_surveyed.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
self.standardised_asset_list = self.standardised_asset_list.merge(
self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
)