Model/asset_list/AssetList.py
2025-09-18 16:23:18 +01:00

3711 lines
173 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import os
import re
import tiktoken
from pprint import pprint
from datetime import datetime
import asset_list.hubspot.config as hubspot_config
from openai import OpenAI
import numpy as np
import pandas as pd
from tqdm import tqdm
from thefuzz import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
from BaseUtility import Definitions
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
import asset_list.mappings.exising_pv as existing_pv_mappings
import asset_list.mappings.built_form as built_form_mappings
import asset_list.mappings.roof as roof_mappings
import asset_list.mappings.outcomes as outcomes_mappings
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = standard_values
self.standard_map = standard_map
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = max_tokens # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
# Replace double strings
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map or value in self.standard_map:
self.remap_dict[value] = (
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
)
continue
if value.lower() in self.standard_map:
self.remap_dict[value] = self.standard_map[value.lower()]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
class AssetList:
"""
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
"""
EPC_API_DATA_NAMES = {
"uprn": "epc_os_uprn",
"address1": "epc_address1",
"address": "epc_address",
"postcode": "epc_postcode",
"inspection-date": "epc_inspection_date",
"current-energy-efficiency": "epc_sap_score_on_register",
"current-energy-rating": "epc_rating_on_register",
"property-type": "epc_property_type",
"built-form": "epc_archetype",
"total-floor-area": "epc_total_floor_area",
"construction-age-band": "epc_age_band",
"floor-height": "epc_floor_height",
"number-habitable-rooms": "epc_number_habitable_rooms",
"walls-description": "epc_wall_construction",
"roof-description": "epc_roof_construction",
"floor-description": "epc_floor_construction",
"mainheat-description": "epc_heating_type",
'mainheatcont-description': "epc_heating_controls",
"secondheat-description": "epc_secondary_heating",
"transaction-type": "epc_reason",
"energy-consumption-current": "epc_heat_demand",
"photo-supply": "epc_photo_supply",
"estimated": "estimated"
}
FIND_EPC_DATA_NAMES = {
"heating_text": "epc_estiamted_heating_kwh",
"hot_water_text": "epc_estimated_hotwater_kwh",
'Assessors name': "epc_assessor_name",
"Assessor's Telephone": "epc_assessor_telephone",
"Assessor's Email": "epc_assessor_email",
"Accreditation scheme": "epc_assessor_accreditation",
"Assessors ID": "epc_assessor_id",
"Solar photovoltaics": "epc_solar_pv"
}
DATETIME_REMAP = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
# These are the accepted methods we have for cleaning the address1 column
ADDRESS_1_CLEANING_METHODS = [
"first_two_words", # This method will split on the fist two words, where the separator is a space
"first_word", # This method will split on the first word, where the separator is a space
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
# "address1_extraction" # This method will use the NLP model to extract address1
]
# Standard column Names
STANDARD_ADDRESS_1 = "domna_address_1"
STANDARD_POSTCODE = "domna_postcode"
STANDARD_FULL_ADDRESS = "domna_full_address"
STANDARD_YEAR_BUILT = "landlord_year_built"
STANDARD_UPRN = "ordnance_survey_uprn"
STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id"
STANDARD_PROPERTY_TYPE = "landlord_property_type"
STANDARD_BUILT_FORM = "landlord_built_form"
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
STANDARD_ROOF_CONSTRUCTION = "landlord_roof_construction"
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
STANDARD_EXISTING_PV = "landlord_existing_pv"
STANDARD_SAP = "landlord_sap_rating"
STANDARD_BLOCK_REFERENCE = "landlord_block_reference"
DOMNA_PROPERTY_ID = "domna_property_id"
# Regular expression for identifying if the address might point to multiple units
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
# List of columns relating to the non-intrusive data
NON_INTRUSIVES_COLNAMES = [
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
"Any further surveyor notes", 'Surveyors Name'
]
NON_INTRUSIVES_NEW_FORMAT_COLNAMES = [
"Has the property been re-walled?", "Is the property tile hung?", "Does the property have a render?",
"Does the property have cladding?", "Gable Wall Obstructions",
"Does the property have foliage that needs removal?",
"Potential unsafe environment", "Date of Inspection", "Borescoped?"
]
# Another version of non-intrusives:
NON_INTRUSIVES_NEW_FORMAT_COLNAMES_V2 = [
'Archetype', 'Archetype 2', 'Construction', 'Insulated', 'Material', 'Borescoped?',
'CIGA Check Required', 'ROOF ORIENTATION', 'TILE HUNG', 'RENDERED',
'CLADDING', 'ACCESS ISSUES', 'FURTHER SURVEYOR NOTES', 'DATE',
'NAME OF SURVEYOR'
]
NON_INTRUSIVES_ELIGIBILITY_COLUMN = "Eligibility (Red/Yellow/Green)"
OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ['WFT Findings', 'ECO Eligibility']
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
FILLED_CAVITY_SAP_THRESHOLD = 75
# This SAP the
EMPTY_CAVITY_SAP_THRESHOLD = 75
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
# Properties before this year are more likely to have lower EPC ratings and more likely to qualify
EMPTY_CAVITY_YEAR_THRESHOLD = 2002
# Attributes - these are columns that we produce, calcualted based on other pieces of data
ATTRIBUTE_HAS_SOLAR = "attribute_has_solar"
ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors"
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}"
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
"cavity wall, as built, no insulation",
]
# List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated
EPC_INSULATED_WALLS_SUBSTRINGS = [
", insulated", "with external insulation", "with internal insulation", "filled cavity"
]
# List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated
EPC_INSULATED_ROOF_SUBSTRINGS = [
"(another dwelling above)", ", insulated", ", insulated (assumed) ",
", ceiling insulated",
]
# List of strings we look for in the EPC data, where substrings indicate that the cavity is empty
UNINSULATED_CAVITY_SUBSTRINGS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, no insulation",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
]
# Work type prefixes:
# Empties
EMPTY_CAVITY_NON_INTRUSIVE = "Non-Intrusive Data Shows Empty Cavity"
EMPTY_CAVITY_NON_INTRUSIVE_YEAR = 'Non-Intrusive Data Shows Empty Cavity, built after 2002'
EPC_EMPTY_INSPECTIONS_RETRO_DRILLED = "EPC Shows Empty Cavity, inspections show retro drilled"
EPC_EMPTY_INSPECTIONS_FILLED = "EPC Shows Empty Cavity, inspections show filled or other"
EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD = "EPC Shows Empty Cavity, inspections show filled at build"
EPC_EMPTY_INSPECTIONS_NON_CAVITY = "EPC Shows Empty Cavity, inspections show non-cavity build"
EPC_EMPTY = "EPC Shows Empty Cavity"
LANDLORD_EMPTY_INSPECTIONS_OTHER = ("Landlord Data Shows Empty Cavity, EPC & Inspections Shows Filled or "
"Non-cavity")
# Extraction
EXTRACTION_NON_INTRUSIVE = "Non-Intrusive Data Shows Cavity Extraction"
# Solar
SOLAR_ELIGIBLE = "Solar Eligible"
SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED = "Solar Eligible, Solid Wall Uninsulated, EPC E or Below"
SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE = "Solar Eligible, Needs Heating Upgrade"
CRM_HISTORICAL_CAVITY_PRODUCT = {
"id": 156989182176, "unit_price": 0, "name": "Historical ECO Cavity"
}
CRM_PRODUCTS = {
"Empty Cavity - ECO4": {"id": 82733738177, "unit_price": 1000, "name": "Empty Cavity - ECO4"},
"Extract & Fill - ECO4": {"id": 100307905778, "unit_price": 500, "name": "Extract & Fill - ECO4"},
"Solar PV - ECO4": {"id": 82623589564, "unit_price": 1608, "name": "Solar PV - ECO4"},
"Solar PV + HHRSH - ECO4": {"id": 155529972924, "unit_price": 1608, "name": "Solar PV + HHRSH - ECO4"},
"Solar PV + Heating Upgrade - ECO4": {
"id": 109265426665, "unit_price": 1608, "name": "Solar PV + Heating Upgrade - ECO4"
},
"Historical ECO Cavity": CRM_HISTORICAL_CAVITY_PRODUCT
}
def __init__(
self,
local_filepath,
sheet_name,
address1_colname,
postcode_colname,
full_address_colname,
landlord_property_id=None,
full_address_cols_to_concat=None,
missing_postcodes_method=None,
address1_extraction_method=None,
landlord_year_built=None,
landlord_uprn=None,
landlord_property_type=None,
landlord_built_form=None,
landlord_wall_construction=None,
landlord_roof_construction=None,
landlord_heating_system=None,
landlord_existing_pv=None,
landlord_sap=None,
landlord_block_reference=None,
phase=False,
header=0
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
# Read in the data
if local_filepath.endswith(".xlsx"):
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
else:
self.raw_asset_list = pd.read_csv(local_filepath)
self.standardised_asset_list = self.raw_asset_list.copy()
# Will be used to store aggregated figures against the various work types
self.work_type_figures = {}
self.block_analysis_df = None
self.duplicated_addresses = None
self.contact_details = None
self.contact_detail_fields = None
self.outcomes = None
self.outcomes_no_match = pd.DataFrame()
self.outcomes_for_output = pd.DataFrame()
self.master_surveyed = None
self.unmatched_submissions = pd.DataFrame()
self.ecosurv = None
self.ecosurv_no_match = pd.DataFrame()
self.geographical_areas = pd.DataFrame()
# When this is True, we intend to break the programme into multiple phases. We may need to review
# how this is structured in the future, as depending on how we get future data, we may need to
# remove some existing phases from the reporting, or specifically highlight the phase (1 to n-1)
# properties, assuming the current phase is n.
self.phase = phase
# We detect the presence of the non-intrusive columns
self.non_intrusives_present = "CIGA Check Required" in self.raw_asset_list.columns
# We detect if we have the old format of non-intruvies
self.old_format_non_intrusives_present = "WFT Findings" in self.raw_asset_list.columns
if self.old_format_non_intrusives_present:
self.non_intrusives_present = False
self.non_intrusives_eligibility = "Eligibility (Red/Yellow/Green)" in self.raw_asset_list.columns
self.new_format_non_insturives_present = (
"Has the property been re-walled?" in self.raw_asset_list.columns
)
self.new_format_non_insturives_present_v2 = 'TILE HUNG' in self.raw_asset_list.columns
# Names of columns
self.landlord_property_id = landlord_property_id
self.address1_colname = address1_colname
self.postcode_colname = postcode_colname
self.full_address_colname = full_address_colname
self.landlord_year_built = landlord_year_built
self.landlord_uprn = landlord_uprn
self.landlord_property_type = landlord_property_type
self.landlord_built_form = landlord_built_form
self.landlord_wall_construction = landlord_wall_construction
self.landlord_roof_construction = landlord_roof_construction
self.landlord_heating_system = landlord_heating_system
self.landlord_existing_pv = landlord_existing_pv
self.landlord_sap = landlord_sap
self.landlord_block_reference = landlord_block_reference
# parameters for cleaning
self.full_address_cols_to_concat = full_address_cols_to_concat
self.missing_postcodes_method = missing_postcodes_method
self.address1_extraction_method = address1_extraction_method
self.debug_information = {
"property_type": None,
"wall_construction": None,
"heating_system": None,
"existing_pv": None
}
self.variable_mappings = {}
self.hubspot_data = None
self.rename_map = {}
self.keep_variables = []
# Finally, we handle the case where the landlord's property ID is actually the OS UPRN
if (self.landlord_uprn == self.landlord_property_id) and (self.landlord_property_id is not None):
self.standardised_asset_list[self.STANDARD_UPRN] = self.standardised_asset_list[self.landlord_uprn].copy()
# Update the reference to landlord UPRn
self.landlord_uprn = self.STANDARD_UPRN
# Handle the case when full address and address 1 are the same
if self.full_address_colname == self.address1_colname:
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.address1_colname].copy()
)
# Handle the case where the property type column and built form are missing
if self.landlord_property_type is None and self.landlord_built_form is None:
if "Archetype" in self.raw_asset_list.columns:
# We use the non-intrusives as our property type and built form
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_property_type] = (
self.standardised_asset_list["Archetype"].copy()
)
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list["Archetype"].copy()
)
else:
# We use the EPC data as our property type and built form
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_property_type] = None
self.standardised_asset_list[self.landlord_built_form] = None
# Handle the case where the property type column is the same as the built type
if self.landlord_property_type == self.landlord_built_form:
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list[self.landlord_property_type].copy()
)
# If landlord built form is None (which it often is) we use the built for from inspections
if (self.landlord_built_form is None) and self.non_intrusives_present:
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list["Archetype"].copy()
)
self.prefixes_to_products = {
# Empty
self.EMPTY_CAVITY_NON_INTRUSIVE: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
self.EPC_EMPTY_INSPECTIONS_FILLED: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
self.EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
self.EPC_EMPTY_INSPECTIONS_NON_CAVITY: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
self.EPC_EMPTY: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
self.LANDLORD_EMPTY_INSPECTIONS_OTHER: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
# Extraction
self.EXTRACTION_NON_INTRUSIVE: self.CRM_PRODUCTS["Extract & Fill - ECO4"],
# Solar
self.SOLAR_ELIGIBLE: self.CRM_PRODUCTS["Solar PV - ECO4"],
self.SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED: self.CRM_PRODUCTS["Solar PV - ECO4"],
self.SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE: self.CRM_PRODUCTS["Solar PV + Heating Upgrade - ECO4"],
}
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
if method not in self.ADDRESS_1_CLEANING_METHODS:
raise ValueError(f"Method {method} for producing address1 not recognized")
if method == "first_two_words":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list[self.address1_colname] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
for _, x in asset_list.iterrows():
SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col])
return asset_list
raise ValueError(f"Method {method} not recognized")
@staticmethod
def _address1_extraction(x):
pass
def create_property_id(self):
"""
This function creates the domna property ID, which is simply a hash of the full address and postcode
We want all figures to be positive
:return:
"""
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
def _make_hash(value):
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
# Normalize and remove special characters for cleaner ID
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
# Generate SHA-256 hash and truncate it
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
return f"{cleaned_value}-{short_hash}"
# Apply transformation
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
self.standardised_asset_list[self.full_address_colname] +
self.standardised_asset_list[self.postcode_colname]
).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash)
@staticmethod
def _strip_postcode_from_full_address(full_address, postcode):
cleaned = full_address.replace(postcode, "")
# Remove any trailing commas and spaces
cleaned = cleaned.rstrip(", ").strip(",").strip()
return cleaned
@classmethod
def _identify_multi_address(cls, address):
# We check if the address is comma separated
if "," in address:
address1_section = address.split(",")[0]
# We look for string in the form (x-y)
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
@staticmethod
def _convert_uprn(x):
"""
Used to convert UPRNS to integer strings
:param x: uprn to convert
:return: converted uprn
"""
if pd.isnull(x):
return x
# check if numeric
if np.isreal(x):
return str(int(x))
if str(x).isdigit():
return str(int(x))
return x
@staticmethod
def _clean_postcode(postcode):
# Remove double spaces
postcode = postcode.replace(" ", " ")
if " " not in postcode:
# Restructure it
return " ".join(
[postcode[:-3], postcode[-3:]]
)
return postcode
def init_standardise(self):
"""
This function is used to standardise the asset list
:return: standardised asset list
"""
# Remove rows without a postcode
if self.postcode_colname is not None:
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
# We also clean postcode columns where if there is not space, we create one
self.standardised_asset_list[self.postcode_colname] = self.standardised_asset_list[
self.postcode_colname
].apply(self._clean_postcode)
# We clean up portential non-breaking spaces, and double spaces
for col in [
c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if
c is not None
]:
self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False)
if self.address1_colname is None:
if self.address1_extraction_method is None:
raise ValueError("Missing address 1 - please specify an extraction method")
self.address1_colname = self.STANDARD_ADDRESS_1
# If we do not have this, we produce it
self.standardised_asset_list = self._extract_address1(
asset_list=self.standardised_asset_list,
full_address_col=self.full_address_colname,
postcode_col=self.postcode_colname,
method=self.address1_extraction_method
)
if self.full_address_colname is None:
if not self.full_address_cols_to_concat:
raise ValueError("Missing full address - please specify columns to concatenate")
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.full_address_cols_to_concat].apply(
lambda x: ", ".join([y for y in x if not pd.isnull(y)]),
axis=1
)
)
else:
# Make sure to strip the postcode out of the full address
self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply(
lambda x: self._strip_postcode_from_full_address(
full_address=x[self.full_address_colname],
postcode=x[self.postcode_colname]
),
axis=1
)
# We create the domna property id
self.create_property_id()
# Clean up the UPRN column, if the landlord has provided them
if self.landlord_uprn is not None:
self.standardised_asset_list[self.landlord_uprn] = (
self.standardised_asset_list[self.landlord_uprn].apply(self._convert_uprn)
)
# We keep just the columns we care about and will work through the various columns and standardise
variables = [
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_uprn,
self.landlord_property_type,
self.landlord_built_form,
self.landlord_year_built,
self.landlord_wall_construction,
self.landlord_roof_construction,
self.landlord_heating_system,
self.landlord_existing_pv,
self.landlord_sap,
self.landlord_block_reference,
]
# Keep just non-null variables (e.g landlord may not provide uprn
self.keep_variables = [v for v in variables if v is not None]
self.rename_map = {
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
self.address1_colname: self.STANDARD_ADDRESS_1,
self.postcode_colname: self.STANDARD_POSTCODE,
self.full_address_colname: self.STANDARD_FULL_ADDRESS,
self.landlord_uprn: self.STANDARD_UPRN,
self.landlord_property_type: self.STANDARD_PROPERTY_TYPE,
self.landlord_built_form: self.STANDARD_BUILT_FORM,
self.landlord_year_built: self.STANDARD_YEAR_BUILT,
self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION,
self.landlord_roof_construction: self.STANDARD_ROOF_CONSTRUCTION,
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
self.landlord_existing_pv: self.STANDARD_EXISTING_PV,
self.landlord_sap: self.STANDARD_SAP,
self.landlord_block_reference: self.STANDARD_BLOCK_REFERENCE
}
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
non_intrusive_columns = []
if self.non_intrusives_present and not self.new_format_non_insturives_present_v2:
non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES
if self.non_intrusives_eligibility:
non_intrusive_columns.append(self.NON_INTRUSIVES_ELIGIBILITY_COLUMN)
if self.new_format_non_insturives_present:
non_intrusive_columns += self.NON_INTRUSIVES_NEW_FORMAT_COLNAMES
if self.new_format_non_insturives_present_v2:
non_intrusive_columns += self.NON_INTRUSIVES_NEW_FORMAT_COLNAMES_V2
if self.old_format_non_intrusives_present:
# We check if we have the ECO Eligibility column, which we might not have
non_intrusive_columns = [
c for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES if c in self.standardised_asset_list.columns
]
if "Warmfront Finding" in self.standardised_asset_list.columns:
non_intrusive_columns.append("Warmfront Finding")
self.keep_variables += non_intrusive_columns
self.rename_map = {
**self.rename_map,
**dict(
zip(non_intrusive_columns, ["non-intrusives: " + c for c in non_intrusive_columns])
)
}
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
].apply(lambda x: self._identify_multi_address(x))
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
# we see instances of "average thermal transmittance" in the description
if self.landlord_wall_construction is not None:
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
"average thermal transmittance"
) == True,
"new build - average thermal transmittance",
self.standardised_asset_list[self.landlord_wall_construction]
)
else:
# We want to make sure that we have a column for wall construction
self.landlord_wall_construction = self.STANDARD_WALL_CONSTRUCTION
self.standardised_asset_list[self.landlord_wall_construction] = None
if self.landlord_roof_construction is None:
self.landlord_roof_construction = self.STANDARD_ROOF_CONSTRUCTION
self.standardised_asset_list[self.landlord_roof_construction] = None
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime - year built has not been renamed
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
)
no_data_codes = {"No Data": None}
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(no_data_codes)
)
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
self.standardised_asset_list[self.landlord_year_built]
)
# Convert this to year
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].dt.year
)
else:
# We attempt to convert the year built to a datetime, by detecting the format and converting
def extract_year(date_str):
known_errors = {
"#MULTIVALUE",
"ND",
"PIMSS EMPTY",
"UNKNOWN",
"This cell has an external reference that can't be shown or edited. Editing this cell will "
"remove the external reference.",
0
}
if pd.isnull(date_str) or date_str in known_errors:
return None
# Handle datetime
if isinstance(date_str, datetime):
return date_str.year
# Handle numeric year (float or int)
if isinstance(date_str, (int, float, np.int_)):
if 1000 <= int(date_str) <= 2100:
return int(date_str)
# Now handle string-based logic
if isinstance(date_str, str):
# Direct date match e.g. 01-Jul-2021
match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str)
if match:
return int(match.group(1))
# Find all 4-digit years in string
years = [int(y) for y in re.findall(r"\b(?:19|20)\d{2}\b", date_str)]
if years:
return max(years) # Return most recent year
# If only numbers are present without format
numeric_str = re.sub(r"\D", "", date_str)
if len(numeric_str) == 4 and numeric_str.isdigit():
return int(numeric_str)
raise NotImplementedError(f"Unhandled format for year built, value is {date_str} - implement me")
self.standardised_asset_list[self.landlord_year_built] = self.standardised_asset_list[
self.landlord_year_built
].apply(extract_year)
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
"standard_map": property_type_mappings.PROPERTY_MAPPING
},
self.landlord_built_form: {
"standard_values": built_form_mappings.STANDARD_BUILT_FORMS,
"standard_map": built_form_mappings.BUILT_FORM_MAPPINGS
},
self.landlord_wall_construction: {
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
},
self.landlord_heating_system: {
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
"standard_map": heating_mappings.HEATING_MAPPINGS
},
self.landlord_existing_pv: {
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
},
self.landlord_roof_construction: {
"standard_values": roof_mappings.STANDARD_ROOF_CONSTRUCTIONS,
"standard_map": roof_mappings.ROOF_CONSTRUCTION_MAPPINGS
}
}
# Keep just entries where the key is not None
to_remap = {k: v for k, v in to_remap.items() if k is not None}
for variable, config in to_remap.items():
logger.info("Standardising variable: %s", variable)
# Strip each of these columns
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].str.strip()
values_to_remap = self.standardised_asset_list[variable].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
self.variable_mappings[variable] = remap_dictionary
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
# asset list is returned
for variable, mapping in self.variable_mappings.items():
pprint(f"Variable: {variable}")
pprint(mapping)
# Print a space
print("\n")
pprint("=======================================")
def apply_standardiation(self, override_empty_mappings=False):
"""
This function applies the standardisation to the asset list
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
if there are no categories which need remapping which is highly unlikely
:return:
"""
if self.phase:
# We filter on just the properties that have had an inspection
if self.new_format_non_insturives_present_v2:
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list['NAME OF SURVEYOR'].isin(
["YET TO BE SURVEYED", "", None]
)
]
self.standardised_asset_list = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["NAME OF SURVEYOR"])
]
else:
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list['Surveyors Name'].isin(["YET TO BE SURVEYED"])
]
if not self.variable_mappings and not override_empty_mappings:
raise ValueError("Please run init_standardise first")
logger.info("Applying standardisation to asset list")
for variable, mapping in self.variable_mappings.items():
self.standardised_asset_list[variable + "_original_from_landlord"] = (
self.standardised_asset_list[variable].copy()
)
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
# Drop the dupes
pprint(
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
f"addresses - dropping"
)
# Keep a record of duplicates
self.duplicated_addresses = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy()
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
]
# Apply renames to our standard names
# Perform final variable selection and renaming:
# We add the original columns to the keep variables
self.keep_variables += [
k + "_original_from_landlord" for k in self.variable_mappings.keys()
]
self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename(
columns=self.rename_map
)
# We fill any standard columns that are not in the data because they were not provided by the landlord
missing_variables = [
v for v in [
self.STANDARD_EXISTING_PV,
self.STANDARD_HEATING_SYSTEM,
self.STANDARD_UPRN,
self.STANDARD_PROPERTY_TYPE,
self.STANDARD_YEAR_BUILT,
self.STANDARD_WALL_CONSTRUCTION,
self.STANDARD_HEATING_SYSTEM,
self.STANDARD_BLOCK_REFERENCE,
] if v not in self.standardised_asset_list.columns
]
for v in missing_variables:
self.standardised_asset_list[v] = None
# Convert to string
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = (
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str)
)
# CLean up the standard SAP column, that can be problematic
if self.landlord_sap is not None:
self.standardised_asset_list[self.STANDARD_SAP] = (
self.standardised_asset_list[self.STANDARD_SAP]
.astype(str)
.str.replace('\xa0', ' ', regex=False)
.str.strip()
)
self.standardised_asset_list[self.STANDARD_SAP] = np.where(
self.standardised_asset_list[self.STANDARD_SAP] == "",
None,
self.standardised_asset_list[self.STANDARD_SAP]
)
self.standardised_asset_list[self.STANDARD_SAP] = (
self.standardised_asset_list[self.STANDARD_SAP].astype(float)
)
# If it's zero, we set it to None
self.standardised_asset_list[self.STANDARD_SAP] = np.where(
self.standardised_asset_list[self.STANDARD_SAP] == 0,
None,
self.standardised_asset_list[self.STANDARD_SAP]
)
has_blocks_of_flats = (self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats").sum()
# Perform block splitting, ahead of fetching the EPC data
# If we blocks of flats, without a landlord block reference, we create this
self.fill_landlord_block_reference(has_blocks_of_flats)
# If we have blocks of flats, we split these out into individual units.
self.split_blocks()
def merge_data(self, df: pd.DataFrame):
"""
Used to insert data into the standardised asset list, based on the domna property id
:return:
"""
if self.DOMNA_PROPERTY_ID not in df.columns:
raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}")
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
df = df.drop_duplicates(
subset=[self.DOMNA_PROPERTY_ID],
keep="first"
)
self.standardised_asset_list = self.standardised_asset_list.merge(
df, how="left", on=self.DOMNA_PROPERTY_ID
)
def extract_attributes(self, pull_epc=True):
# Used to extracty the typical attributes that we use to identify viable work
self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR] = (
self.standardised_asset_list[self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]] |
~self.standardised_asset_list[self.EPC_API_DATA_NAMES["photo-supply"]].isin(["0.0", 0, None, "", np.nan])
)
accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"]
# The logic here is:
# 1) Take the property type provided by the HA themselves
# 2) In absence of that, take the EPC property type
# 3) Otherwise use None
self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = self.standardised_asset_list.apply(
lambda x: estimate_number_of_floors(
property_type=(
str(x[self.STANDARD_PROPERTY_TYPE]).title() if
str(x[self.STANDARD_PROPERTY_TYPE]).title() in accepted_epc_property_types else (
x[self.EPC_API_DATA_NAMES["property-type"]] if not
pd.isnull(x[self.EPC_API_DATA_NAMES["property-type"]]) else None
)
)
),
axis=1
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].astype(float)
)
# Replace "" value with None
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].replace("", None)
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].astype(float)
)
# Estimate the perimeter
# Handle funky edge case
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = np.where(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] == 0),
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].mean(),
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]]
)
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
), axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = self.standardised_asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
floor_height=(
float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if
not pd.isnull(x[self.EPC_API_DATA_NAMES["floor-height"]]) else 2.5
),
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
built_form=x[self.EPC_API_DATA_NAMES["built-form"]]
),
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = self.standardised_asset_list.apply(
lambda x: RoofAttributes(description=x[self.EPC_API_DATA_NAMES["roof-description"]]).process()[
"insulation_thickness"] if not pd.isnull(
x[self.EPC_API_DATA_NAMES["roof-description"]]) else None,
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].str.replace("+", "")
)
# We produce some additional fields
# 1) Is the SAP rating below C75
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]].astype(float) <=
self.FILLED_CAVITY_SAP_THRESHOLD
)
# 2) Flag anything where the EPC is older than 5 years
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
pd.to_datetime(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
).dt.year < self.EPC_YEAR_THRESHOLD
)
self.process_age_band()
def process_age_band(self):
processed_age_band = []
for _, x in self.standardised_asset_list.iterrows():
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES
):
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": None,
"does_age_band_match_epc_age_band": "No EPC Age Band"
}
)
continue
# We exatract the upper and lower bounds
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
"England and Wales: 2007 onwards", "England and Wales: 2012 onwards"
]:
year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[
"construction-age-band"]] == "England and Wales: 2007 onwards" else 2012
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
else "EPC Age Band is older than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": year_lower_bound,
"epc_year_upper_bound": None,
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900":
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": 1899,
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
)
else "EPC Age Band is different from Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
# Oherwise, we extract the upper and lower bounds
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1]
lower_date, upper_date = age_band.split("-")
if not x[self.STANDARD_YEAR_BUILT]:
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (
x[self.STANDARD_YEAR_BUILT] <= float(upper_date)
)
else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(lower_date),
"epc_year_upper_bound": int(upper_date),
"does_age_band_match_epc_age_band": age_band_matches
}
)
processed_age_band = pd.DataFrame(processed_age_band)
self.standardised_asset_list = self.standardised_asset_list.merge(
processed_age_band, how="left"
)
def identify_worktypes(self):
if self.landlord_sap is not None:
# We add a SAP category for all work type identification
self.standardised_asset_list["SAP Category"] = np.where(
(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54) |
(self.standardised_asset_list[self.STANDARD_SAP] <= 54)
),
"SAP Rating 54 or less",
np.where(
(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68) |
(self.standardised_asset_list[self.STANDARD_SAP] <= 68)
),
"SAP Rating 55-68",
np.where(
(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.EMPTY_CAVITY_SAP_THRESHOLD
) | (self.standardised_asset_list[self.STANDARD_SAP] <= self.EMPTY_CAVITY_SAP_THRESHOLD)
),
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
),
)
)
self.standardised_asset_list["SAP Category"] = np.where(
pd.isnull(self.standardised_asset_list[self.STANDARD_SAP]) &
pd.isnull(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]]),
"SAP Unknown",
self.standardised_asset_list["SAP Category"]
)
else:
# We add a SAP category for all work type identification
# We break into 4 categories (54 or less, 55-68, 69-74, 75 or more)
self.standardised_asset_list["SAP Category"] = np.where(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54),
"SAP Rating 54 or less",
np.where(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68),
"SAP Rating 55-68",
np.where(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.EMPTY_CAVITY_SAP_THRESHOLD
),
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
),
)
)
self.standardised_asset_list["SAP Category"] = np.where(
pd.isnull(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]]),
"SAP Unknown",
self.standardised_asset_list["SAP Category"]
)
# Before we being, we identify if a property has solar already as we use this
# for identifying cavity jobs
if self.non_intrusives_present and not self.old_format_non_intrusives_present:
if self.new_format_non_insturives_present_v2:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: ROOF ORIENTATION"].str.strip().isin(
["ALREADY HAS SOLAR PV"]
)
)
else:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF"
)
elif self.old_format_non_intrusives_present:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
["solar pv on roof"]
)
)
else:
# We don't have an indication
existing_solar_non_intrusives_check = False
self.standardised_asset_list["property_has_solar"] = (
(self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") |
existing_solar_non_intrusives_check |
(self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
)
# If we have non-intrusives completed, we can use this to identify work types
######################################################
# Empty cavity:
######################################################
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
# 2) The age is before 1995
# 3) We don't remove anything that haas access issues yet
if self.non_intrusives_present:
if self.new_format_non_insturives_present_v2:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL", "EMPTY CAVITY"])
)
else:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"])
)
elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
["empty cavity", "partial fill", "empty", "EMPTY CAVITY 70MM", "partial", "empty cav"]
) | (
(
self.standardised_asset_list['non-intrusives: WFT Findings']
.str.lower().str.strip().str.contains("empty cavity|partial fill") &
~self.standardised_asset_list['non-intrusives: WFT Findings']
.astype(str).str.lower().str.strip().str.contains("major access issues")
)
)
)
else:
# We set the filter to False, as we have no non-intrusives
non_intrusives_wall_filter = False
if self.landlord_year_built is None:
year_built_filter = self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
else:
year_built_filter = (
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) |
(self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD)
)
# Criteria:
# The property isn't a bedsit
# Non-intrusives indicate it needs a fill
# The EPC year is before 2002
# We also flag where the property has solar on the roof, because this is a signal of a high EPC rating
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
year_built_filter &
(
~self.standardised_asset_list["property_has_solar"]
)
)
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
year_built_filter &
(
# If the property has solar, there's a chance it won't qualify
self.standardised_asset_list["property_has_solar"]
)
)
# We also add a filter on anything that was generally identified by the non-intrusives
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_year_filter"] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] &
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter
)
if (not self.non_intrusives_eligibility) and (not self.old_format_non_intrusives_present):
# If we have NO inspections data, we capture all of the wall types and don't filter on age of the EPC
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
else:
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) &
(
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) |
(self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD)
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
# Finally, we create a flag to indicate that the cavity is empty, based on the criteria above
self.standardised_asset_list["cavity_is_empty"] = (
non_intrusives_wall_filter |
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) |
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"])
)
######################################################
# Extraction
######################################################
# as needing a CIGA check. What is the logic we should be applying here?
if self.non_intrusives_present:
extraction_wall_filter = (
(self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") &
(self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) &
(~self.standardised_asset_list['non-intrusives: Material'].isin(
["GREY LOOSE BEAD", "COMPACTED BEAD", "FIBRE BATT NO CAVITY", "EMPTY NARROW BELOW 30mm"]
))
)
if self.non_intrusives_eligibility:
# If we have the eligibility column, we check if the wall is eligible
extraction_wall_filter = (
extraction_wall_filter &
~self.standardised_asset_list["non-intrusives: Eligibility (Red/Yellow/Green)"].isin(
["RED"]
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter & year_built_filter
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = (
extraction_wall_filter & ~year_built_filter
)
elif self.old_format_non_intrusives_present:
print("Review these categories!!!!")
extraction_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
[
'blown in yellow wool', 'retro drilled & filled', 'white fibre from build',
'foam filled from build', 'retro drilled gas in block', 'block in rock wool', 'rdf / tilehung',
'fibre from build', 'blown in rock wool', 'rdf / tile hung', 'retro drilled',
'rock wool from build', 'part rendered retro drilled', 'white fibtr from build.',
'retro drilled and filled', 'blown in white wool', 'blown in yellow fibre from build', 'rdf',
'polybead', 'foam filled', 'blown in white bead from build', 'blown in yellow fibre',
'retro drilled det', 'blown in rockwool', 'retro drilled det empty cav', 'retro drilled end',
'retro filled extension', 'retro filled', 'foam'
]
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
else:
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = False
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
######################################################
# Solar
######################################################
# Criteria:
# Check 1: Does the property have a valid heating system?
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
[
"air source heat pump",
"ground source heat pump",
"high heat retention storage heaters",
"electric boiler"
]
)
)
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["electric storage heaters", "room heaters", "electric radiators", "no heating", "electric fuel"]
)
)
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().str.contains("air source heat pump|ground source heat pump|boiler and radiators, electric")
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters"
) & (
self.standardised_asset_list[self.EPC_API_DATA_NAMES[
"mainheatcont-description"]] == "Controls for high heat retention storage heaters"
)
)
)
# If the landlord has given us the heating system, we default to that on heating upgrades. Because of the
# poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the
# landlord data suggests otherwise (e.g. there's a gas boiler), we default to what the landlord has told us
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters|room heaters"
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
] != "Controls for high heat retention storage heaters"
)
) & (
~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["district heating", "communal heating", "communal gas boiler"]
) & ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].astype(str).str.contains("gas ")
)
)
# Basic check - both of the previous two shouldn't be true simultaneously
if (
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] &
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
).sum():
logger.info("We have an example of both heating system checks being true - checking known cases")
known_edge_cases = ['Ground source heat pump, radiators, electric, Electric storage heaters']
error_cases = self.standardised_asset_list[
(
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] &
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
)
]
if all(error_cases[self.EPC_API_DATA_NAMES["mainheat-description"]].isin(known_edge_cases)):
logger.info("Within known edge cases")
else:
raise ValueError("Both heating system checks are true - this should not be possible")
# Check 3: Does the property meet the fabric condition
# Solar PV installs are subject to the minimum insulation requirements which means:
# 1) one of the following insulation measures must be installed as part of the same
# ECO4 project:
# • roof insulation (flat roof, pitched roof, room-in-roof)
# • exterior facing wall insulation (cavity wall, solid wall)
# • party cavity wall insulation
# • floor insulation (solid and underfloor)
#
# OR
#
# all measures (except any exempted measure referred to in paragraph 4.28)
# listed in paragraph a) must already be installed
#
# With this in mind, we look for 2 clases
# 1) The property is fully insulated apart from the loft (<200mm insulation)
# 2) THe property is fully insulated
self.standardised_asset_list["solar_landlord_walls_insulated"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
[
"filled cavity",
"insulated solid brick",
"insulated timber frame",
"uninsulated cavity",
"insulated system built",
"insulated granite or whinstone",
"insulated sandstone or limestone",
"new build - average thermal transmittance"
]
)
)
if self.non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EWI", "RETRO DRILLED", "FILLED AT BUILD"]
)
)
elif self.old_format_non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
[
"retro drilled", "retro filled", "ewi", "retro drilled/ solid", "retro drilled and filled",
]
) |
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().str.contains(
"retro drilled"
)
)
else:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False
self.standardised_asset_list["walls_u_value"] = self.standardised_asset_list[
self.EPC_API_DATA_NAMES["walls-description"]
].apply(lambda x: WallAttributes(x).process()["thermal_transmittance"] if not pd.isnull(x) else None)
self.standardised_asset_list["solar_epc_walls_insulated"] = (
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["walls-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS)
)
) | (
self.standardised_asset_list["walls_u_value"].apply(lambda x: x <= 0.7 if not pd.isnull(x) else False)
)
)
roof_data = []
for desc in self.standardised_asset_list[
self.EPC_API_DATA_NAMES["roof-description"]
].unique():
if pd.isnull(desc):
continue
roof_data.append(
{
self.EPC_API_DATA_NAMES["roof-description"]: desc,
**RoofAttributes(desc).process()
}
)
roof_data = pd.DataFrame(roof_data)
roof_data = roof_data.rename(columns={"thermal_transmittance": "roof_u_value"})
self.standardised_asset_list = self.standardised_asset_list.merge(
roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
)
# If the u-value of a roof is less than 0.7 we consider it insulated
self.standardised_asset_list["solar_epc_roof_insulated"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS),
) | (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) >= 200 if str(x).isdigit() else False
)
) | (
self.standardised_asset_list["roof_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(x) else False
)
)
)
self.standardised_asset_list["solar_epc_loft_needs_topup"] = (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) < 200 if str(x).isdigit() else False
) | (
(
self.standardised_asset_list["is_loft"] | self.standardised_asset_list["is_pitched"]
) & (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].isin(
["below average", "none"]
)
)
)
)
self.standardised_asset_list["epc_has_floor_recommendation"] = (
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
)
# Check if the boiler is electric
# We check if it contains both the terms boiler & electric
self.standardised_asset_list["has_electric_boiler"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().isin(
["boiler and radiators, electric"])
) | (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM] == "electric boiler"
)
)
####################################
# Check solar eligibility
####################################
# Set up the filters to stop repetition
correct_heating_system = (
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] |
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] |
self.standardised_asset_list["has_electric_boiler"]
)
needs_heating_upgrade = (
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] |
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
)
# The requirements for walls are:
# 1) walls are insulated
# 2) property is a cavity (can be done insulated or not)
walls_meet_solar_requirements = (
# The landlord is saying the walls are insulated
self.standardised_asset_list["solar_landlord_walls_insulated"] |
# EPC data is saying the walls are insulated
self.standardised_asset_list["solar_epc_walls_insulated"] |
# Non-intrusives are saying the walls are insulated
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] |
# It's empty cavity
self.standardised_asset_list["cavity_is_empty"] |
# It's a cavity wall
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
["filled cavity", "partial insulated cavity"]
)
)
# Determine if the client gave us property type in the first place
if all(self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "unknown"):
# Use EPC
not_a_flat = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["property-type"]] != "Flat"
)
else:
not_a_flat = (
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat"
)
solar_roof_meets_criteria = (
self.standardised_asset_list["solar_epc_roof_insulated"] |
self.standardised_asset_list["solar_epc_loft_needs_topup"]
)
self.standardised_asset_list["solar_eligible"] = (
# Property isn't a flag
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof meets criteria
solar_roof_meets_criteria
)
# With heating upgrade
self.standardised_asset_list["solar_eligible_needs_heating_upgrade"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof meets criteria
solar_roof_meets_criteria
)
# We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E
# or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables
self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate - in this case, we can also take
# electric boilers
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are uninsulated solid
~walls_meet_solar_requirements &
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 57)
)
# Drop anything we don't need
self.standardised_asset_list = self.standardised_asset_list.drop(
columns=["walls_u_value", "roof_u_value"]
)
# Adjust flagged extraction jobs to remove anything for solar
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
~self.standardised_asset_list["solar_eligible"]
)
# Finally, we note why each property has been flagged
self.standardised_asset_list["cavity_reason"] = None
empty_cavity_map = {
"non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE + ": ",
"non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property "
"already has solar: ",
"non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, "
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
}
for variable, description in empty_cavity_map.items():
self.standardised_asset_list["cavity_reason"] = np.where(
self.standardised_asset_list[variable] &
pd.isnull(self.standardised_asset_list["cavity_reason"]),
description + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
# We break the cavity reason into a few different categories, when the EPC is different from inspections
if self.old_format_non_intrusives_present:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
[
"retro drilled and filled", "retro drilled", "retro filled", "retro drilled & filled",
]
)) &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED}: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
self.standardised_asset_list['non_intrusive_indicates_cavity_extraction'] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY_INSPECTIONS_FILLED}: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
elif self.non_intrusives_present:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "RETRO DRILLED") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED}: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "FILLED AT BUILD") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD}: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
else:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EPC_EMPTY_INSPECTIONS_NON_CAVITY}: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
# Work type prefixes
# Landlord data: The landlord's data indicates that the wall is an uninsulated cavity wall, but EPC and
# inspections show filled
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
~self.standardised_asset_list["epc_indicates_empty_cavity"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.LANDLORD_EMPTY_INSPECTIONS_OTHER}: " +
self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
# Flag extraction
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EXTRACTION_NON_INTRUSIVE}: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"{self.EXTRACTION_NON_INTRUSIVE}, built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: " +
self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
######################################################
# Flag solar
######################################################
self.standardised_asset_list["solar_reason"] = None
# Map of variables and fill values for the solar_reason variable
# ordering of this map is important, where we flag our prioritised work types first
solar_reason_map = {
"solar_eligible": f"{self.SOLAR_ELIGIBLE}: ",
"solar_eligible_solid_wall_uninsulated": f"{self.SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED}: ",
"solar_eligible_needs_heating_upgrade": f"{self.SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE}: "
}
for variable, reason in solar_reason_map.items():
self.standardised_asset_list["solar_reason"] = np.where(
self.standardised_asset_list[variable] & pd.isnull(self.standardised_asset_list["solar_reason"]),
reason + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["solar_reason"]
)
# Finally, anything flagged for solar should not be flagged for cavity - make them None
self.standardised_asset_list["cavity_reason"] = np.where(
(
~pd.isnull(self.standardised_asset_list["solar_reason"]) &
~pd.isnull(self.standardised_asset_list["cavity_reason"])
),
None,
self.standardised_asset_list["cavity_reason"]
)
# Flag anything that has existing outcomes
if (self.outcomes is not None) and ("surveyed" in self.standardised_asset_list.columns):
if "installer refusal" not in self.standardised_asset_list.columns:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(self.standardised_asset_list["surveyed"] > 0)
),
None,
self.standardised_asset_list["cavity_reason"]
)
else:
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(self.standardised_asset_list["surveyed"] > 0) |
(self.standardised_asset_list["installer refusal"] > 0)
),
None,
self.standardised_asset_list[col]
)
if self.master_surveyed is not None:
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(~pd.isnull(self.standardised_asset_list["submission_status"]))
),
None,
self.standardised_asset_list[col]
)
if self.ecosurv is not None and "ecosurv_install_status" in self.standardised_asset_list.columns:
# If we didn't match anything to ecosurv, the ecosurv_install_status won't exist
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(~pd.isnull(self.standardised_asset_list["ecosurv_install_status"]))
),
None,
self.standardised_asset_list[col]
)
# We prepare outcomes for output
if self.outcomes is not None:
logger.info("Preparing outcomes for output")
identified_work = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["cavity_reason"]) |
~pd.isnull(self.standardised_asset_list["solar_reason"])
][self.DOMNA_PROPERTY_ID].values
if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
self.outcomes_for_output = self.outcomes[
self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work)
]
# Finally, direct operations feedback has suggested that if a property is a flat that has a SAP rating of
# 76 or above, we should exclude it because it's likely not going to be eligible for anyting
self.standardised_asset_list["cavity_reason"] = np.where(
(self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "flat") &
(self.standardised_asset_list["SAP Category"] == "SAP Rating 76 or more"),
self.standardised_asset_list["cavity_reason"] + " - (unlikely to quality)",
self.standardised_asset_list["cavity_reason"]
)
# Split cavity_reason on the colon and check if the first part is equal to one of the two options above
# that indicates empties
self.standardised_asset_list["identified_empty_cavity"] = (
self.standardised_asset_list["cavity_reason"].str.split(":").str[0].isin(
[self.EMPTY_CAVITY_NON_INTRUSIVE, self.EMPTY_CAVITY_NON_INTRUSIVE_YEAR, self.EPC_EMPTY]
)
)
def get_work_figures(self):
blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats"
]
non_blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats"
]
# Produce some aggregate figures
self.work_type_figures = {
**non_blocks_of_flats["cavity_reason"].value_counts().to_dict(),
**{
k + " (Block of flats)": v for k, v in
blocks_of_flats["solar_reason"].value_counts().to_dict().items()
},
**self.standardised_asset_list["solar_reason"].value_counts().to_dict()
}
pprint(self.work_type_figures)
def fill_landlord_block_reference(self, has_blocks_of_flats):
if not has_blocks_of_flats:
return
# If we have blocks of flats, we fill the landlord_block_reference field with address 1 + postcode
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE] = np.where(
(self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats") & (
pd.isnull(self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE])
),
self.standardised_asset_list[self.STANDARD_ADDRESS_1] + " " +
self.standardised_asset_list[self.STANDARD_POSTCODE],
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE]
)
def split_blocks(self):
"""
Where we have a single row that is a block of flats, we split this into multiple rows,
one for each unit. The data that we have will be copied across rows
:return:
"""
blocks = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats"
].copy()
if blocks.empty:
return
RANGE_RE = re.compile(r'\b(\d+[A-Za-z]?)\s*[-]\s*(\d+[A-Za-z]?)\b')
NUM_RE = re.compile(r'\b\d+[A-Za-z]?\b') # captures 12, 12A, etc.
TO_RANGE_RE = re.compile(r'\b(\d+[A-Za-z]?)\s+(?:to|To|TO)\s+(\d+[A-Za-z]?)\b') # captures "13 to 15"
LETTER_RANGE_RE = re.compile(r'\b(\d+)([A-Za-z]?)\s*[-]\s*(\d+)([A-Za-z]?)\b') # captures "1A-3B"
expanded_rows = []
for _, row in blocks.iterrows():
addr = str(row[self.STANDARD_ADDRESS_1])
full_addr = row[self.STANDARD_FULL_ADDRESS]
# We also look for terms like "Odd", "even", "all" in the address to indicate if it should be just
# the odds, evens or all of the numbers
has_odd = (
"(odd)" in addr.lower() or
"(odd)" in full_addr.lower() or
"(odds)" in addr.lower() or
"(odds)" in full_addr.lower()
)
has_even = (
"(even)" in addr.lower() or
"(even)" in full_addr.lower() or
"(evens)" in addr.lower() or
"(evens)" in full_addr.lower()
)
# 1 ─ Range (e.g. 1-7)
m_range = RANGE_RE.search(addr)
to_range = TO_RANGE_RE.search(addr)
if m_range or to_range:
start, end = m_range.groups() if m_range else to_range.groups()
start, end = int(re.match(r'\d+', start)[0]), int(re.match(r'\d+', end)[0])
if start > end or (end - start) > 200:
raise ValueError(f"Suspicious range '{addr}'")
# We define the looping range on whether we have odd, even or all numbers
house_number_range = range(start, end + 1)
if has_odd:
house_number_range = [x for x in house_number_range if x % 2 != 0]
if has_even:
house_number_range = [x for x in house_number_range if x % 2 == 0]
for n in house_number_range:
new = row.copy()
range_text = m_range.group(0) if m_range else to_range.group(0)
new_addr = addr.replace(range_text, str(n))
# Build the new full address by also swapping out the range_text
original_full_address = new[self.STANDARD_FULL_ADDRESS]
new_full_address = original_full_address.replace(range_text, str(n))
new[self.STANDARD_ADDRESS_1] = str(n)
new[self.STANDARD_FULL_ADDRESS] = new_full_address
new[self.STANDARD_PROPERTY_TYPE] = "flat"
# Keep a record of the previous address 1
new["block_address1"] = addr
new["block_full_address"] = original_full_address
new["is_expended_block"] = True
# We update the full address
new[self.DOMNA_PROPERTY_ID] = f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}"
expanded_rows.append(new.to_dict())
continue
# 2 ─ Explicit list (e.g. 1, 2, 5 Block) or split by an ampersand (e.g. 1 & 2 Block)
nums = NUM_RE.findall(addr)
if len(nums) > 1 and (',' in addr or '&' in addr or ' and ' in addr.lower()):
for n in nums:
new = row.copy()
new_addr = re.sub(NUM_RE, n, addr, count=1) # replace the first number only
new[self.STANDARD_ADDRESS_1] = new_addr
new[self.DOMNA_PROPERTY_ID] = f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}"
expanded_rows.append(new.to_dict())
continue
# Check for a range of lettered addresses e.g 31A - 31D
letter_range = LETTER_RANGE_RE.search(full_addr)
if letter_range:
start_num, start_letter, end_num, end_letter = letter_range.groups()
start_num, end_num = int(start_num), int(end_num)
if start_num != end_num:
raise NotImplementedError(f"Unusual range - handle me")
# We define the looping range on whether we have odd, even or all numbers
house_number_range = range(start_num, end_num + 1)
if has_odd:
house_number_range = [x for x in house_number_range if x % 2 != 0]
if has_even:
house_number_range = [x for x in house_number_range if x % 2 == 0]
for n in house_number_range:
for letter in range(ord(start_letter), ord(end_letter) + 1):
new = row.copy()
new_addr = f"{n}{chr(letter)}"
new[self.STANDARD_ADDRESS_1] = new_addr
new[self.DOMNA_PROPERTY_ID] = f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}"
expanded_rows.append(new.to_dict())
continue
# 4 ─ Single number or no number, treat as individual dwelling
if (len(nums) == 1) or not nums:
expanded_rows.append(row.to_dict())
continue
# Anything else with digits is unrecognised
raise NotImplementedError(f"Unhandled block format: '{addr}'")
expanded_blocks = pd.DataFrame(expanded_rows)
# Check for duplicated domna ids
if expanded_blocks[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError("expanded blocks has duplicated IDs")
# We drop the blocks from the standardised asset list and append on the expanded blocks
self.standardised_asset_list = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats"
]
self.standardised_asset_list = pd.concat(
[self.standardised_asset_list, expanded_blocks],
ignore_index=True
)
# As a final clean up, for any blocks that are size 1, we don't includr a project code
sizes = (
expanded_blocks
.groupby(self.STANDARD_BLOCK_REFERENCE)[self.DOMNA_PROPERTY_ID]
.nunique()
.reset_index()
)
size_1 = sizes[sizes[self.DOMNA_PROPERTY_ID] <= 1]
# Remove the size 1 blocks from the standardised asset list
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE] = np.where(
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin(
size_1[self.STANDARD_BLOCK_REFERENCE].values
),
None,
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE]
)
def label_property_status(self):
"""
This function is designed to be run after identify_worktypes() has been run, and will create a "property_status"
column, which will note where each property is (to be surveyed, surveyed, installed), using the stages we
recognise within hubspot
:return:
"""
# For anything that is ready to go, that gets set to ready to be scheduled
self.standardised_asset_list["hubspot_status"] = np.where(
~pd.isnull(self.standardised_asset_list["cavity_reason"]) |
~pd.isnull(self.standardised_asset_list["solar_reason"]),
hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label,
None
)
# we step through the process of flagging completed surveys
# We utilise submissions, ecosurv and outcomes to define the hubspot status
# We'll take the maximum of these three columns, based on the enum integer value
label_to_enum = {e.label: e for e in hubspot_config.HubspotProcessStatus}
def get_max_status_from_columns(row):
status_candidates = []
for col in ["submission_status", "ecosurv_install_status", "outcome_status"]:
label = row.get(col)
if label in label_to_enum:
status_candidates.append(label_to_enum[label])
if not status_candidates:
return row["hubspot_status"] # fallback to existing status if no updates
return max(status_candidates).label
self.standardised_asset_list["hubspot_status"] = self.standardised_asset_list.apply(
get_max_status_from_columns, axis=1
)
self.standardised_asset_list["project_code"] = None
# if we have any blocks, where work is eligible, we flag them now
# These blocks may be refecence via the landlord_block_reference field, or by property types being
# blocks of flats
has_landlord_block_reference = sum(~pd.isnull(self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE]))
if has_landlord_block_reference:
# For blocks that have a 50% allocation, we create project codes
self.block_analysis()
# find any block refs with more than 50% emptires
viable_empty_blocks = self.block_analysis_df[
self.block_analysis_df['Percentage of Empties'] >= 0.50
]
if not viable_empty_blocks.empty:
project_code_lookup = viable_empty_blocks[["Block Reference"]].copy()
self.standardised_asset_list = self.standardised_asset_list.merge(
project_code_lookup, how="left", left_on=self.STANDARD_BLOCK_REFERENCE, right_on="Block Reference"
)
self.standardised_asset_list["project_code"] = np.where(
~pd.isnull(self.standardised_asset_list["Block Reference"]),
self.standardised_asset_list["Block Reference"],
self.standardised_asset_list["project_code"]
)
self.standardised_asset_list = self.standardised_asset_list.drop(columns=["Block Reference"])
def analyse_geographies(self):
cavity_programme = (
self.standardised_asset_list[["domna_postcode", "cavity_reason"]]
.groupby(["domna_postcode"])["cavity_reason"]
.count()
.reset_index()
)
solar_programme = (
self.standardised_asset_list[["domna_postcode", "solar_reason"]]
.groupby(["domna_postcode"])["solar_reason"]
.count()
.reset_index()
)
postcodes = (
self.standardised_asset_list[["domna_postcode", "landlord_property_id"]]
.groupby("domna_postcode")["landlord_property_id"]
.count()
.reset_index()
.rename(columns={"landlord_property_id": "n_properties"})
)
geographical_areas = postcodes.merge(cavity_programme, how="left", on="domna_postcode").merge(
solar_programme, how="left", on="domna_postcode"
).fillna(0)
geographical_areas["coverage"] = (
(
geographical_areas["solar_reason"] + geographical_areas["cavity_reason"]
) / geographical_areas["n_properties"] * 100
)
geographical_areas = geographical_areas.sort_values("coverage", ascending=False)
self.geographical_areas = geographical_areas
def block_analysis(self):
# Reverse mapping: label -> enum
LABEL_TO_ENUM = {e.label: e for e in hubspot_config.HubspotProcessStatus}
# Threshold status - anything that is at this stage or beyond is considered surveyed
threshold = hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.value
block_analysis = []
for block_reference, group in self.standardised_asset_list.groupby(self.STANDARD_BLOCK_REFERENCE):
cavity_breakdown = group["cavity_reason"].fillna("No Eligibility").value_counts(normalize=True) * 100
if all(cavity_breakdown.index == "No Eligibility"):
continue
# We check the % of empty vs not empty as right now, we're focused on empty
n_empties = (
(group["identified_empty_cavity"] == True) &
(~pd.isnull(group["cavity_reason"])) &
(~group["cavity_reason"].str.contains("(unlikely to quality)", case=False, na=False, regex=False))
).sum()
n_empties_high_confidence = (
(group["identified_empty_cavity"] == True) &
(~group["SAP Category"].isin(["SAP Rating 69-75", "SAP Rating 76 or more"])) &
(~pd.isnull(group["cavity_reason"])) &
(~group["cavity_reason"].str.contains("(unlikely to quality)", case=False, na=False, regex=False))
).sum()
# Average age of the EPCs
group["time_since_epc"] = (
pd.to_datetime("now") - pd.to_datetime(
group[self.EPC_API_DATA_NAMES["inspection-date"]])
).dt.days
average_age_of_epc = group["time_since_epc"].mean()
works = group["hubspot_status"]
above_threshold = works.map(LABEL_TO_ENUM.get).dropna()
count_above = (above_threshold >= threshold).sum()
proportion_surveyed = count_above / len(works)
proportion_empty = n_empties / len(works)
proportion_empty_high_confidence = n_empties_high_confidence / len(works)
# We auto-populate any blocks that have greater than 50% proportion empty
block_analysis.append(
{
"Block Reference": block_reference,
"Block Size": len(group),
"average_age_of_epc": average_age_of_epc,
"Proportion of properties suryeyed": proportion_surveyed,
"Percentage of Empties": proportion_empty,
"Percentage of Empties (high confidence)": proportion_empty_high_confidence,
**cavity_breakdown.to_dict(),
}
)
block_analysis = pd.DataFrame(block_analysis)
block_analysis = block_analysis.fillna(0)
# We flag which properties are eligible for works. We need at least 50%
block_analysis["Eligible for Works"] = (
block_analysis["Percentage of Empties"] >= 0.50
)
block_analysis = block_analysis.sort_values("Percentage of Empties", ascending=False)
# For properties that are NOT eligible, we should update the cavity reason
ineligible_blocks = block_analysis[
~block_analysis["Eligible for Works"]
]["Block Reference"].values
eligible_blocks = block_analysis[
block_analysis["Eligible for Works"]
]["Block Reference"].values
self.standardised_asset_list["cavity_reason"] = np.where(
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin(ineligible_blocks),
self.standardised_asset_list["cavity_reason"] + " (Flat in block with less than 50% eligible)",
self.standardised_asset_list["cavity_reason"]
)
# if the property is in a block of flats that eligible, but the property itself is not eligible, we flag this
# The criteria is:
# =The property should be in a block of flats
self.standardised_asset_list["cavity_reason"] = np.where(
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin(eligible_blocks),
self.standardised_asset_list["cavity_reason"]
+ " " + "(Flat in block with more than 50% eligible)",
self.standardised_asset_list["cavity_reason"]
)
self.block_analysis_df = block_analysis
@staticmethod
def split_full_name(x):
if pd.isnull(x):
return None, None, None
x = x.lower()
titles = ["mr", "mrs", "ms", "miss", "dr", "prof"]
# Remove titles
detected_title = [title for title in titles if x.startswith(title)]
if detected_title:
for title in detected_title:
x = x.replace(title, "")
x = x.strip()
first_name, last_name = x.split(" ")[0], x.split(" ")[-1]
title = detected_title[0].title() if detected_title else None
return title, first_name.title(), last_name.title()
def load_contact_details(
self,
local_filepath,
sheet_name,
landlord_property_id,
phone_number_column=None,
secondary_phone_number_column=None,
secondary_contact_full_name=None,
email_column=None,
fullname_column=None,
firstname_column=None,
lastname_column=None
):
self.contact_detail_fields = {
"landlord_property_id": landlord_property_id,
"phone_number": phone_number_column,
"secondary_phone_number": secondary_phone_number_column,
"secondary_contact_full_name": secondary_contact_full_name,
"email": email_column,
"fullname": fullname_column,
"firstname": firstname_column,
"lastname": lastname_column
}
details_colnames = [
phone_number_column, secondary_phone_number_column, email_column, fullname_column, firstname_column,
lastname_column
]
# We'll fill them
none_details = [x for x in details_colnames if x is None]
details_colnames = [x for x in details_colnames if x is not None]
if local_filepath is None:
# Create an empty DataFrame based on the fields in self.contact_detail_fields
self.contact_details = pd.DataFrame(columns=list(self.contact_detail_fields.keys()))
return
contact_details = pd.read_excel(
local_filepath, sheet_name=sheet_name
)[[self.contact_detail_fields["landlord_property_id"]] + details_colnames]
contact_details = contact_details[
~pd.isnull(contact_details[self.contact_detail_fields["landlord_property_id"]])
]
# Fill anything we don't have
for detail in none_details:
contact_details[detail] = None
if fullname_column and not (firstname_column and lastname_column):
contact_details["title"], contact_details["first_name"], contact_details["last_name"] = zip(
*contact_details[fullname_column].apply(self.split_full_name)
)
else:
contact_details["title"] = None
self.contact_details = contact_details
@classmethod
def load_standardised_asset_list(cls, filepath, sheet_name, header):
"""
This function is designed to load the standardised asset list from a file
:return:
"""
# This is a placeholder for now
# instantiate the class
instance = cls(
local_filepath=filepath,
sheet_name=sheet_name,
address1_colname=cls.STANDARD_ADDRESS_1,
postcode_colname=cls.STANDARD_POSTCODE,
full_address_colname=cls.STANDARD_FULL_ADDRESS,
landlord_property_id=cls.STANDARD_LANDLORD_PROPERTY_ID,
full_address_cols_to_concat=[],
missing_postcodes_method=None,
address1_extraction_method=None,
landlord_year_built=cls.STANDARD_YEAR_BUILT,
landlord_uprn=cls.STANDARD_UPRN,
landlord_property_type=cls.STANDARD_PROPERTY_TYPE,
landlord_built_form=cls.STANDARD_BUILT_FORM,
landlord_wall_construction=cls.STANDARD_WALL_CONSTRUCTION,
landlord_roof_construction=cls.STANDARD_ROOF_CONSTRUCTION,
landlord_heating_system=cls.STANDARD_HEATING_SYSTEM,
landlord_existing_pv=cls.STANDARD_EXISTING_PV,
landlord_sap=cls.STANDARD_SAP,
landlord_block_reference=cls.STANDARD_BLOCK_REFERENCE,
phase=False,
header=header
)
return instance
def prepare_for_crm(self, company_domain, installer_name, reconcile_programme=False):
"""
This function prepares the data for upload into Hubspot
:param company_domain: The company domain name to be used in the CRM
:param installer_name: The name of the installer to be used in the CRM
:param reconcile_programme: If True, will include all properties with a project code, regardless of status
:raises ValueError: If the installer name is not valid or if there are missing products
:return:
"""
# This maps the opportunities as we reference them, to the product data as stored in Hubspot
if not hubspot_config.Installer.is_valid_value(installer_name):
raise ValueError(f"Installer name {installer_name} is not valid. Please check the installer name.")
# We check if all products are covered in the lookup table
cavity_products = self.standardised_asset_list["cavity_reason"].unique().tolist()
cavity_products = [x for x in cavity_products if not pd.isnull(x)]
solar_products = self.standardised_asset_list["solar_reason"].unique().tolist()
solar_products = [x for x in solar_products if not pd.isnull(x)]
product_map = {}
for identified_product in cavity_products + solar_products:
if pd.isnull(identified_product):
continue
matched_product = None
for product_prefix, crm_product in self.prefixes_to_products.items():
if identified_product.startswith(product_prefix):
matched_product = crm_product
product_map[identified_product] = matched_product
# For each cavity and solar product, we iterate through the prexies and map to the products
programme_data = self.standardised_asset_list.copy()
programme_data["domna_full_address"] = (
programme_data["domna_full_address"].str.replace(";", ", ", regex=False).str.replace(" ", "")
)
# Format the two date columns
programme_data["survey_date"] = pd.to_datetime(programme_data["survey_date"], errors="coerce")
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]] = pd.to_datetime(
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]],
errors="coerce"
)
# Convert to dd/mm/yyyy format
programme_data["survey_date"] = programme_data["survey_date"].dt.strftime("%d/%m/%Y")
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]] = (
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]].dt.strftime("%d/%m/%Y")
)
# We take rows that have a survyor and a date for the survey
# We include properties under 2 circumstances:
# 1) The hubspot status is ready to be scheduled and there is an assigned surveyor and week for survey
# 2) The hubspot status is something else, meaning this has been included in an existing programme
# 3) reconcile programme is true, and therefore all proeprties with a project code will be included
if reconcile_programme:
programme_data = programme_data[~pd.isnull(programme_data["project_code"])]
else:
if programme_data["hubspot_status"].nunique() > 1:
logger.info("Multiple hubspot_status found - are you sure you don't want to reconcile the programme?")
ready_to_be_scheduled = (
(
programme_data["hubspot_status"] == hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label
)
)
# completed_works = (
# (programme_data["hubspot_status"] !=
# hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label) &
# (~pd.isnull(programme_data["hubspot_status"]))
# )
programme_data = programme_data[ready_to_be_scheduled]
# Merge on the contact details
programme_data = programme_data.merge(
self.contact_details,
how="left",
left_on=self.STANDARD_LANDLORD_PROPERTY_ID,
right_on=self.landlord_property_id,
)
programme_data["Company Domain Name <COMPANY domain>"] = company_domain
# Append the product data onto the programme data
programme_data["cavity_product"] = programme_data["cavity_reason"].map(
lambda x: product_map.get(x, {"name": None})["name"]
)
programme_data["solar_product"] = programme_data["solar_reason"].map(
lambda x: product_map.get(x, {"name": None})["name"]
)
# We check if we have any missings
cavity_missing = pd.isnull(programme_data[~pd.isnull(programme_data["cavity_reason"])]["cavity_product"]).sum()
solar_missing = pd.isnull(programme_data[~pd.isnull(programme_data["solar_reason"])]["solar_product"]).sum()
if cavity_missing > 0 or solar_missing > 0:
raise ValueError(
f"We have {cavity_missing} cavity products and {solar_missing} solar products that are not "
"mapped to a product in the lookup table. Please check the mapping."
)
programme_data["domna_product"] = programme_data["solar_product"].copy()
programme_data["domna_product"] = np.where(
pd.isnull(programme_data["domna_product"]),
programme_data["cavity_product"],
programme_data["domna_product"]
)
# We filter just on rows where we have a product
if reconcile_programme:
# We include historical works, which will include hisorical cavity so we set these as extraction (as
# this is the main work mix)
programme_data["domna_product"] = programme_data["domna_product"].fillna(
self.CRM_HISTORICAL_CAVITY_PRODUCT["name"]
)
else:
# We shouldn't have any missing products
# programme_data = programme_data[
# ~pd.isnull(programme_data["survey_date"])
# ]
if pd.isnull(programme_data["domna_product"]).sum():
raise ValueError("Missing products")
programme_data = programme_data.drop(columns=["solar_product", "cavity_product"])
product_df = (
pd.DataFrame(self.CRM_PRODUCTS).T[["name", "id", "unit_price"]]
.reset_index()
.rename(
columns={
"name": "Name <LINE_ITEM name>",
"id": 'Product ID <LINE_ITEM hs_product_id>',
"unit_price": 'Unit price <LINE_ITEM price>',
"index": "domna_product"
}
)
)
product_df['Quantity <LINE_ITEM quantity>'] = 1
# Append on the product data
programme_data = programme_data.merge(product_df, how="left", on="domna_product")
# Add in deal and pipeline information
programme_data["dealname"] = (
programme_data[self.STANDARD_FULL_ADDRESS] + ", " +
programme_data[self.STANDARD_POSTCODE] + " : " + programme_data["domna_product"]
)
programme_data['Pipeline <DEAL pipeline>'] = hubspot_config.CRM_PIPELINE_NAME
programme_data['Associations: Listing'] = "Property Owner"
# We determine which column we should use for the UPRN
if self.STANDARD_UPRN not in programme_data.columns:
uprn_column = self.EPC_API_DATA_NAMES["uprn"]
# If we're working form the EPC, we don't have this information if the EPC is estimated
programme_data[uprn_column] = np.where(
programme_data["estimated"] == True, None, programme_data[uprn_column]
)
else:
# Use the value that has the most coverage
uprn_column = "hubspot_uprn"
programme_data[uprn_column] = programme_data[self.STANDARD_UPRN].fillna(
programme_data[self.EPC_API_DATA_NAMES["uprn"]]
)
# Remove any negative URPSN which are not valid
programme_data[uprn_column] = np.where(
programme_data["estimated"].isin([1, True]),
None,
programme_data[uprn_column]
)
# Add in some columns if we have them
date_of_inspections = (
"Non-Intrusives: Date of Inspection" if
"Non-Intrusives: Date of Inspection" in programme_data.columns else None
)
# Ammend the property type and built form columns
programme_data["hubspot_property_type"] = programme_data[self.STANDARD_PROPERTY_TYPE].copy()
# We don't already have this
if self.STANDARD_BUILT_FORM in programme_data.columns:
programme_data["hubspot_built_form"] = programme_data[self.STANDARD_BUILT_FORM].copy()
else:
programme_data["hubspot_built_form"] = None
def _replace_property_description_data(programme_data, column_name):
"""
Helper function to replace property type or built form data with a specified value.
"""
if column_name == "hubspot_property_type":
valid_values = ["house", "bungalow", "flat", "maisonette"]
epc_fill_col = "property-type"
elif column_name == "hubspot_built_form":
valid_values = ["detached", "semi-detached", "mid-terrace", "end-terrace"]
epc_fill_col = "built-form"
else:
raise ValueError(f"Invalid column name: {column_name}. Must be 'hubspot_property_type' or "
f"'hubspot_built_form'.")
# Any vakue that is not house, bungalow, flat or maisonette is set to None
programme_data[column_name] = np.where(
~programme_data[column_name].isin(valid_values),
None,
programme_data[column_name]
)
# We fill with the EPC property type
programme_data[column_name] = np.where(
pd.isnull(programme_data[column_name]),
programme_data[self.EPC_API_DATA_NAMES[epc_fill_col]],
programme_data[column_name]
)
programme_data[column_name] = programme_data[column_name].fillna("unknown")
return programme_data
# Clean up the property type and built form columns
programme_data = _replace_property_description_data(programme_data, "hubspot_property_type")
programme_data = _replace_property_description_data(programme_data, "hubspot_built_form")
# We accomodate the old vs new inspections format
if "non-intrusives: WFT Findings" in programme_data.columns:
# We have the old format - we only have notes
non_intrusives_surveyor_notes = "non-intrusives: WFT Findings"
non_intrusives_construction = None
non_intrusives_insulated = None
non_intrusives_insulation_material = None
non_intrusives_ciga_check_required = None
non_intrusives_pv_access = None
non_intrusives_roof_orientation = None
non_intrusives_surveyor_name = None
else:
non_intrusives_surveyor_notes = 'non-intrusives: Any further surveyor notes'
non_intrusives_construction = "non-intrusives: Construction"
non_intrusives_insulated = "non-intrusives: Insulated"
non_intrusives_insulation_material = "non-intrusives: Material"
non_intrusives_ciga_check_required = 'non-intrusives: CIGA Check Required'
non_intrusives_pv_access = 'non-intrusives: PV, ACCESS ISSUE, SEE NOTES'
non_intrusives_roof_orientation = 'non-intrusives: OFF GAS - ROOF ORIENTATION'
non_intrusives_surveyor_name = 'non-intrusives: Surveyors Name'
# This maps the hubspot schema to the template. Anything that is not covered in this will be flagged
schema_mappings = {
'Company Domain Name <COMPANY domain>': 'Company Domain Name <COMPANY domain>',
'Email <CONTACT email>': (
self.contact_detail_fields["email"] if self.contact_detail_fields["email"] else None
), # TODO: Review
'First Name <CONTACT firstname>': (
self.contact_detail_fields["firstname"] if self.contact_detail_fields["firstname"] else None
), # TODO: Review
'Last Name <CONTACT lastname>': (
self.contact_detail_fields["lastname"] if self.contact_detail_fields["lastname"] else None
), # TODO: Review
'Phone <CONTACT phone>': (
self.contact_detail_fields["phone_number"] if self.contact_detail_fields["phone_number"] else None
), # TODO: Review
'Secondary Phone <CONTACT secondary_phone_number>': (
self.contact_detail_fields["secondary_phone_number"] if
self.contact_detail_fields["secondary_phone_number"] else None
),
"Secondary Contact Full Name <CONTACT secondary_contact_full_name>": (
self.contact_detail_fields["secondary_contact_full_name"] if
self.contact_detail_fields["secondary_contact_full_name"] else None
),
'Full Address <LISTING full_address>': self.STANDARD_FULL_ADDRESS,
'Address 1 <LISTING hs_address_1>': self.STANDARD_ADDRESS_1,
'Address 2 <LISTING hs_address_2>': None, # TODO: Don't have this for the moment
'Postcode <LISTING hs_zip>': self.STANDARD_POSTCODE,
'Property Type <LISTING property_type>': "hubspot_property_type",
'Property Sub Type <LISTING property_sub_type>': "hubspot_built_form",
'Bedroom(s) <LISTING hs_bedrooms>': None, # TODO: Don't have this for the moment
'Domna Property ID <LISTING domna_property_id>': self.DOMNA_PROPERTY_ID,
# We populate this with the column that we have
'National UPRN <LISTING national_uprn>': uprn_column,
'Owner Property ID <LISTING owner_property_id>': self.STANDARD_LANDLORD_PROPERTY_ID,
'Wall Construction <LISTING wall_construction>': self.STANDARD_WALL_CONSTRUCTION,
'Heating System <LISTING heating_system>': self.STANDARD_HEATING_SYSTEM,
'Year Built <LISTING hs_year_built>': self.STANDARD_YEAR_BUILT,
'Boiler Make <LISTING boiler_make>': None, # TODO: Don't have this for the moment
'Boiler Model <LISTING boiler_model>': None, # TODO: Don't have this for the moment
'Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>': date_of_inspections,
'Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>': non_intrusives_construction,
'Non-intrusives: Insulation <LISTING non_intrusives__insulation>': non_intrusives_insulated,
'Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>':
non_intrusives_insulation_material,
'Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>':
non_intrusives_ciga_check_required,
'Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>': non_intrusives_pv_access,
'Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>':
non_intrusives_roof_orientation,
'Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>': non_intrusives_surveyor_notes,
'Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>': non_intrusives_surveyor_name,
'CIGA: Date Requested <LISTING ciga__date_requested>': None, # TODO: Don't have this for the moment
'CIGA: Cavity Guarantee Found <LISTING ciga__cavity_guarantee_found>': None,
'Last EPC: Is Estimated <LISTING last_epc__is_estimated>': self.EPC_API_DATA_NAMES["estimated"],
'Last EPC: EPC Rating <LISTING last_epc__epc_rating>': self.EPC_API_DATA_NAMES["current-energy-rating"],
'Last EPC: SAP Rating <LISTING last_epc__sap_rating>': self.EPC_API_DATA_NAMES["current-energy-efficiency"],
'Last EPC: Main Heating Description <LISTING last_epc__main_heating_description>': self.EPC_API_DATA_NAMES[
"mainheat-description"],
'Last EPC: Heating Controls <LISTING last_epc__heating_controls>': self.EPC_API_DATA_NAMES[
"mainheatcont-description"],
'Last EPC: Lodgement Date <LISTING last_epc__lodgement_date>': self.EPC_API_DATA_NAMES["inspection-date"],
'Last EPC: Floor Area <LISTING last_epc__floor_area>': self.EPC_API_DATA_NAMES["total-floor-area"],
'Last EPC: Wall <LISTING last_epc__wall>': self.EPC_API_DATA_NAMES["walls-description"],
'Last EPC: Roof <LISTING last_epc__roof>': self.EPC_API_DATA_NAMES["roof-description"],
'Last EPC: Floor <LISTING last_epc__floor>': self.EPC_API_DATA_NAMES["floor-description"],
'Last EPC: Room Height <LISTING last_epc__room_height>': self.EPC_API_DATA_NAMES["floor-height"],
'Last EPC: Age Band <LISTING last_epc__age_band>': self.EPC_API_DATA_NAMES["construction-age-band"],
'Pipeline <DEAL pipeline>': 'Pipeline <DEAL pipeline>',
'Expected Commencement Date <DEAL expected_commencement_date>': "survey_date",
'Deal Name <DEAL dealname>': "dealname", # Need to create this,
'Product ID <LINE_ITEM hs_product_id>': 'Product ID <LINE_ITEM hs_product_id>',
'Name <LINE_ITEM name>': 'Name <LINE_ITEM name>',
'Unit price <LINE_ITEM price>': 'Unit price <LINE_ITEM price>',
'Quantity <LINE_ITEM quantity>': 'Quantity <LINE_ITEM quantity>',
'Deal Owner': 'surveyor',
'Project Code <DEAL project_code>': 'project_code',
'Associations: Listing': 'Associations: Listing',
'Deal Stage <DEAL dealstage>': "hubspot_status",
}
# We sometimes columns if the landlord never provided them
missed_mapping_cols = [c for c in schema_mappings.values() if c not in programme_data.columns if c is not None]
for c in missed_mapping_cols:
programme_data[c] = None
# We now create the finalised dataset to be uploaded into Hubspot
variables_required = list(schema_mappings.values())
variables_required = [v for v in variables_required if v is not None]
# We now flag anything that has a none value, which is information we haven't got right now
none_variables = [k for k, v in schema_mappings.items() if v is None]
# We'll add placeholder columns for the None variables
programme_data = programme_data[variables_required]
for col in none_variables:
programme_data[col] = None
programme_data = programme_data.rename(
columns={v: k for k, v in schema_mappings.items() if v is not None}
)
programme_data['Postcode <DEAL postcode>'] = programme_data['Postcode <LISTING hs_zip>'].copy()
programme_data['Installer <DEAL installer>'] = installer_name
programme_data['Name <LISTING hs_name>'] = (
programme_data['Full Address <LISTING full_address>'] + " ," + programme_data['Postcode <LISTING hs_zip>']
)
# The listing owner email is the same as the surveyor email (deal owner), so they can see the listing
programme_data['Listing Owner Email <LISTING hubspot_owner_id>'] = programme_data['Deal Owner']
programme_data['Amount <DEAL amount>'] = 0
programme_data["Deal Owner"] = np.where(
~pd.isnull(programme_data["Deal Owner"]),
programme_data["Deal Owner"].astype(str).str.lower(),
programme_data["Deal Owner"]
)
# We make sure we have all of the columns that we need
missed_columns = [c for c in hubspot_config.CRM_UPLOAD_COLUMNS if c not in programme_data.columns]
if missed_columns:
raise ValueError(
f"We have the following columns that are not in the programme data: {missed_columns}. "
"Please check the mapping and ensure all required columns are present."
)
self.hubspot_data = programme_data
def flag_ecosurv(self, ecosurv_landlords=None, landlords_to_ignore=None):
"""
This class will match ecosurv data to the asset list
:return:
"""
if ecosurv_landlords is None:
return
# TODO: Fetch from Sharepoint
ecosurv_filepath = "/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/07.05.2025.csv"
logger.info("Getting Ecosurv data from %s", ecosurv_filepath)
self.ecosurv = pd.read_csv(ecosurv_filepath, encoding="cp437")
landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False)
landlord_references = landlords[
landlords["Landlord"].str.lower().str.contains(ecosurv_landlords)
]
landlord_ecosurv_data = self.ecosurv[
self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values)
]
if landlords_to_ignore is not None:
landlord_ecosurv_data = landlord_ecosurv_data[
~landlord_ecosurv_data["Landlord"].isin(landlords_to_ignore)
]
# Try and match to asset list
matched = []
unmatched = []
for _, row in tqdm(landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0]):
postcode = row["Postcode"].lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE].str.replace(" ", "").str.lower() ==
postcode
)
].copy()
if df.empty:
unmatched.append(row["Reference"])
continue
if df.shape[0] > 1:
house_no = SearchEpc.get_house_number(row["Address Line 1"], row["Postcode"])
df["house_no"] = df.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE]
),
axis=1
)
df = df[df["house_no"] == house_no]
if df.shape[0] > 1:
# We compare address line 1 to full address
if any(
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
row["Address Line 1"].lower(), na=False)
):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
row["Address Line 1"].lower(), na=False
)
]
if df.shape[0] > 1:
df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"]
if df.shape[0] == 1:
matched.append(
{
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
"ecosurv_reference": row["Reference"],
"ecosurv_address1": row["Address Line 1"],
"ecosurv_postcode": row["Postcode"],
}
)
continue
if df.shape[0] > 1:
unmatched.append(row["Reference"])
continue
logger.info("Matched %s properties to ecosurv data", len(matched))
logger.info("%s properties in Ecosurv remain unmatched", len(unmatched))
if not matched:
return
# We now match
matched = pd.DataFrame(matched)
# We'll possibly have duplicates here, where properties have been sold twice. Ww de-dupe
if matched[self.STANDARD_LANDLORD_PROPERTY_ID].duplicated().sum():
# It doesn't matter too much which record we take
matched = matched.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
# We merge on the status of the property
matched = matched.merge(
self.ecosurv[["Reference", "Status", "Lead Status", "Tags"]].rename(
columns={
"Reference": "ecosurv_reference",
"Status": "ecosurv_status",
"Lead Status": "ecosurv_lead_status",
"Tags": "ecosurv_tags",
"Installer": "ecosurv_installer"
}
), how="left", on="ecosurv_reference"
)
matched["ecosurv_install_status"] = hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER
# This mapping is ordered by process order, where lodgment is the final step so if we have an indication
# that the property is ready for lodgement, we set the status to that. We then proceed through the other
# statuses where the penultimate status is install complete
mapping = {
"Cancelled": hubspot_config.HubspotProcessStatus.INSTALLER_CANCELLED_FINALIZED,
"TrustMark: Lodged": hubspot_config.HubspotProcessStatus.LODGEMENT_COMPLETE,
"Retrofit: Complete": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE,
"Retrofit: Awaiting TrustMark": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE,
"Retrofit: Awaiting post checks": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE,
"Installer Notification Sent": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"Submitted to RC": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"COONEY": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"Signed off for install": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"Retrofit: Signed off for install": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"Audit": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"Accepted": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
"Sold": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER
}
def get_max_status(tag_str):
if pd.isna(tag_str):
return None
matched_statuses = []
for tag, status in mapping.items():
if tag in tag_str:
matched_statuses.append(status)
if not matched_statuses:
return None
return max(matched_statuses).label
matched["ecosurv_install_status"] = matched["ecosurv_tags"].apply(get_max_status)
self.standardised_asset_list = self.standardised_asset_list.merge(
matched,
how="left",
on=self.STANDARD_LANDLORD_PROPERTY_ID,
)
# We keep a record of submissions that were NOT matches
self.ecosurv_no_match = self.ecosurv[
self.ecosurv["Reference"].isin(unmatched)
].copy()
def flag_outcomes(
self,
outcomes_filepaths,
outcomes_sheetname,
outcomes_address,
outcomes_postcode,
outcomes_houseno,
outcomes_id
):
if not outcomes_filepaths:
return
self.outcomes = []
outcomes_no_match = []
lookup = []
for idx, outcomes_filepath in enumerate(outcomes_filepaths):
outcomes = pd.read_excel(outcomes_filepath, sheet_name=outcomes_sheetname[idx])
outcomes["row_id"] = outcomes.index
if outcomes_houseno[idx] is None:
outcomes_houseno[idx] = "houseno"
outcomes["houseno"] = outcomes[outcomes_address[idx]].apply(
lambda x: SearchEpc.get_house_number(x, outcomes[outcomes_postcode])
)
# We handle an edge case that occured for LHP
if "Notes / Outcomes" in outcomes.columns and "Outcome" not in outcomes.columns:
# We use the re-mapper to handle this:
outcomes["Notes / Outcomes"] = outcomes["Notes / Outcomes"].str.strip()
values_to_remap = outcomes["Notes / Outcomes"].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(
standard_values=outcomes_mappings.outcomes_values, standard_map=outcomes_mappings.outcomes_map
)
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
# Perform the remap
outcomes["Outcome"] = outcomes["Notes / Outcomes"].map(remap_dictionary)
outcomes["Outcome"] = outcomes["Outcome"].str.lower().str.strip()
logger.info("Matching outcomes to asset list")
# Merge the outcomes onto the asset list - we check we're able to match sufficiently well
lookup_i = []
nomatch_i = []
for _, x in tqdm(outcomes.iterrows(), total=len(outcomes)):
if pd.isnull(x[outcomes_address[idx]]) or not x[outcomes_address[idx]]:
continue
# Check if we have an id
oid = x[outcomes_id[idx]] if outcomes_id[idx] is not None else None
if oid is not None:
matched = self.standardised_asset_list[
(self.standardised_asset_list[
self.STANDARD_LANDLORD_PROPERTY_ID
].str.strip() == oid)
]
if matched.shape[0] == 1:
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
address_clean = x[outcomes_address[idx]].lower().replace(",", "").replace(" ", " ")
matched = self.standardised_asset_list[
(self.standardised_asset_list[
self.STANDARD_FULL_ADDRESS
].str.lower().str.replace(",", "").str.replace(" ", " ") == address_clean)
]
if matched.shape[0] == 1:
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
matched = self.standardised_asset_list[
(self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip() == x[outcomes_postcode[idx]])
].copy()
if not matched.empty:
matched["houseno"] = matched.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
),
axis=1
)
if pd.isnull(x[outcomes_houseno[idx]]):
house_no_to_match = SearchEpc.get_house_number(
str(x[outcomes_address[idx]]), str(x[outcomes_postcode[idx]])
)
if isinstance(house_no_to_match, str):
house_no_to_match = house_no_to_match.lower()
else:
house_no_to_match = str(x[outcomes_houseno[idx]]).strip()
matched = matched[matched["houseno"].astype(str) == house_no_to_match]
if matched.shape[0] == 1:
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
elif not matched.empty:
# Use levenstein distance to match
matched["address"] = (
matched[self.STANDARD_ADDRESS_1] + " " + matched[self.STANDARD_POSTCODE]
)
best_match = process.extractOne(
x[outcomes_address[idx]], matched[self.STANDARD_FULL_ADDRESS].values
)[0]
matched = matched[matched[self.STANDARD_FULL_ADDRESS] == best_match]
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
nomatch_i.append(x["row_id"])
outcomes_no_match_i = outcomes[outcomes["row_id"].isin(nomatch_i)]
lookup_i = pd.DataFrame(lookup_i)
outcomes_no_match.append(outcomes_no_match_i)
lookup.append(lookup_i)
self.outcomes.append(outcomes)
lookup = pd.concat(lookup)
self.outcomes_no_match = pd.concat(outcomes_no_match)
self.outcomes = pd.concat(self.outcomes)
if lookup.empty:
return
# We will have duplicated domna property IDs, where a surveyor has been to a property multiple times
# Where we have multiple rows, we want to make a call on what the action should be. For example,
# there may be properties that have been visited multiple times where the outcome was "See notes" implying
# that the surveyor had a detailed explanation as to why they couldn't gain access so if this has
# happened multiple times, in this case we judge that the work may not be viable
if "Week Commencing" in self.outcomes.columns:
date_col = "Week Commencing"
elif "Survey Date" in self.outcomes.columns:
date_col = "Survey Date"
elif "Date letters sent" in self.outcomes.columns:
date_col = "Date letters sent"
elif "Date Letter sent" in self.outcomes.columns:
date_col = "Date Letter sent"
elif "WEEK COMMENCING" in self.outcomes.columns:
date_col = "WEEK COMMENCING"
else:
raise NotImplementedError("Invalid date in outcomes - implement me")
if "Notes" in self.outcomes.columns:
notes_col = "Notes"
elif "Notes / Outcomes" in self.outcomes.columns:
notes_col = "Notes / Outcomes"
elif "NOTES" in self.outcomes.columns:
notes_col = "NOTES"
else:
raise NotImplementedError("Invalid notes in outcomes - implement me")
lookup = lookup.merge(
self.outcomes[["row_id", "Outcome", notes_col, date_col]], how="left", on="row_id"
)
visit_counts = (
lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"]
.count()
.reset_index()
.rename(columns={"row_id": "visit_count"})
.sort_values("visit_count", ascending=False)
)
def extract_date(s):
if isinstance(s, str):
match = re.search(r"(\d{2}\.\d{2}\.\d{4})", s)
if match:
return pd.to_datetime(match.group(1), format="%d.%m.%Y", errors="coerce")
return pd.NaT
lookup['parsed_date'] = lookup[date_col].apply(extract_date)
def get_latest_note(group):
surveyed = group[group['Outcome'] == 'surveyed']
if not surveyed.empty:
return surveyed.sort_values('parsed_date', ascending=False).iloc[0]
else:
return group.sort_values('parsed_date', ascending=False).iloc[0]
latest_note = (
lookup.groupby('domna_property_id', group_keys=False).
apply(get_latest_note).
reset_index(drop=True)
)
latest_note = latest_note[["domna_property_id", notes_col, "Outcome"]].rename(
columns={"Notes": "latest_outcome_note", "Outcome": "latest_outcome"}
)
pivot_df = lookup.groupby(["domna_property_id", "Outcome"]).size().unstack(fill_value=0).reset_index()
pivot_df = pivot_df.merge(visit_counts, how="left", on="domna_property_id")
pivot_df = pivot_df.merge(latest_note, how="left", on="domna_property_id")
# We want the latest note
if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise Exception("We have duplicated property IDs in the outcomes data")
# We merge this data onto outcomes
self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin(lookup["row_id"].values)
self.outcomes = self.outcomes.merge(lookup[["row_id", "domna_property_id"]], how="left", on="row_id")
# We flag the outcome status, based on the outcome
pivot_df["outcome_status"] = None
if "surveyed" in pivot_df.columns:
pivot_df["outcome_status"] = np.where(
pivot_df["surveyed"] > 0, hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.label,
pivot_df["outcome_status"]
)
if "installer refusal" in pivot_df.columns:
pivot_df["outcome_status"] = np.where(
pivot_df["installer refusal"] > 0, hubspot_config.HubspotProcessStatus.NOT_VIABLE.label,
pivot_df["outcome_status"]
)
pivot_df["outcome_status"] = np.where(
pivot_df["latest_outcome"].isin(["see notes"]) &
(pivot_df["outcome_status"] != hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.label),
hubspot_config.HubspotProcessStatus.SURVEYED_NO_ACCESS_NEEDS_SIGN_OFF.label,
pivot_df["outcome_status"]
)
# We merge out pivoted outcomes onto the asset list
self.standardised_asset_list = self.standardised_asset_list.merge(
pivot_df, how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id"
)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError("Duplicates appreared - something went wrong")
self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False)
def flag_survey_master(
self,
master_filepaths,
master_id_colnames,
master_to_asset_list_filepath=None
):
# TODO: This probably needs further expansion
if not master_filepaths:
return
if master_to_asset_list_filepath is not None:
id_map = pd.read_csv(master_to_asset_list_filepath)
else:
id_map = pd.DataFrame()
logger.info("Getting masters and merging onto asset list")
master_surveyed = []
unmatched_submissions = []
for idx, filepath in enumerate(master_filepaths):
master_data = pd.read_csv(filepath)
# Strip columns
master_data.columns = [c.strip() for c in master_data.columns]
master_data.columns = [re.sub(r'\s+', ' ', c) for c in master_data.columns]
# Drop any unnamed columns
unnamed_columns = [c for c in master_data.columns if "Unnamed:" in c]
master_data = master_data.drop(columns=unnamed_columns)
if not id_map.empty:
master_data = master_data.merge(
id_map, how="left", on=['NO.', 'Street / Block Name', 'Post Code']
)
if "INSTALLED OR CANCELLED" in master_data.columns:
install_col = "INSTALLED OR CANCELLED"
elif "INSTALL / CANCELLATION DATE" in master_data.columns:
install_col = "INSTALL / CANCELLATION DATE"
elif 'INSTALL/ CANCELLATION DATE' in master_data.columns:
install_col = 'INSTALL/ CANCELLATION DATE'
elif "INSTALL/CANCELLATION DATE" in master_data.columns:
install_col = "INSTALL/CANCELLATION DATE"
elif 'Measure 1 Install Date' in master_data.columns:
install_col = 'Measure 1 Install Date'
else:
raise ValueError("No install or cancellation date")
if "SUBMISSION DATE" in master_data.columns:
submission_col = "SUBMISSION DATE"
elif "SUBMISSION DATE TO INSTALLERS" in master_data.columns:
submission_col = "SUBMISSION DATE TO INSTALLERS"
elif "Submission Date" in master_data.columns:
submission_col = "Submission Date"
else:
raise ValueError("No submission date column found in master data")
master_data["row_id"] = master_data.index
self.standardised_asset_list["house_no"] = self.standardised_asset_list.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
),
axis=1
)
if "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns:
scheme_col = "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION"
elif "AFFORDABLE WARMTH" in master_data.columns:
scheme_col = "AFFORDABLE WARMTH"
elif "Scheme" in master_data.columns:
scheme_col = "Scheme"
elif "Affordable Warmth" in master_data.columns:
scheme_col = "Affordable Warmth"
else:
scheme_col = "OFFICE USE ONLY"
postcode_col = "POSTCODE" if "POSTCODE" in master_data.columns else "Post Code"
if 'NO.' in master_data.columns:
house_no_col = 'NO.'
elif "NO" in master_data.columns:
house_no_col = 'NO'
else:
house_no_col = "NUMBER"
if "PROPERTY TYPE As per table emailed" in master_data.columns:
property_type_col = "PROPERTY TYPE As per table emailed"
elif "PROPERTY TYPE As per table emailed" in master_data.columns:
property_type_col = "PROPERTY TYPE As per table emailed"
elif "PROPERTY TYPE" in master_data.columns:
property_type_col = "PROPERTY TYPE"
elif 'Property Type' in master_data.columns:
property_type_col = 'Property Type'
else:
property_type_col = "PROPERTY TYPE (SEE DEEMED SCORES SHEET) Eg. 3W_Flat_1 (As per Matrix)"
if "INSTALLERS NOTES ; REASONS FOR CANCELLATIONS" in master_data.columns:
installer_notes_col = "INSTALLERS NOTES ; REASONS FOR CANCELLATIONS"
elif "INSTALLERS NOTES" in master_data.columns:
installer_notes_col = "INSTALLERS NOTES"
elif 'Installers Notes' in master_data.columns:
installer_notes_col = 'Installers Notes'
elif 'NOTES ; REASONS FOR CANCELLATIONS OR WHERE INSTALL DATE WAS OBTAINED FROM' in master_data.columns:
installer_notes_col = 'NOTES ; REASONS FOR CANCELLATIONS OR WHERE INSTALL DATE WAS OBTAINED FROM'
elif ('INSTALLERS NOTES / REASONS FOR CANCELLATIONS / WHERE INSTALL DATE WAS RECEIVED FROM' in
master_data.columns):
installer_notes_col = ('INSTALLERS NOTES / REASONS FOR CANCELLATIONS / WHERE INSTALL DATE WAS RECEIVED '
'FROM')
else:
raise ValueError("No installer notes column found in master data")
if "INSTALLER" in master_data.columns:
installer_col = "INSTALLER"
elif "Installer" in master_data.columns:
installer_col = "Installer"
else:
raise ValueError("No installer column found in master data")
measure_mix_col = "MEASURE COMBO"
if "TOWN" in master_data.columns:
town_colname = "TOWN"
elif 'Town/Area' in master_data.columns:
town_colname = 'Town/Area'
else:
town_colname = "Town/City"
logger.info("Matching master data to asset list")
matched = []
unmatched = []
for _, row in tqdm(master_data.iterrows(), total=len(master_data)):
original_house_no = row[house_no_col]
original_street = row["Street / Block Name"]
original_postcode = row[postcode_col]
if pd.isnull(row[postcode_col]):
continue
if master_id_colnames[idx] is not None:
# Filter the standardised asset list on this
df = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] == row[master_id_colnames[idx]]
]
if df.shape[0] == 1:
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
continue
postcode_no_space = row[postcode_col].strip().replace(" ", "").lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE]
.str.strip().str.lower().str.replace(" ", "") == postcode_no_space
)
]
house_no = row[house_no_col]
if pd.isnull(house_no):
house_no = None
if isinstance(house_no, (float, int)):
house_no = str(int(house_no))
if house_no not in df["house_no"].values:
# Handle postcode errors
postal_region = row[postcode_col].split(" ")[0].lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE]
.str.strip().str.lower().str.startswith(postal_region)
)
]
if house_no not in df["house_no"].values:
unmatched.append(row["row_id"])
continue
df = df[df["house_no"] == house_no]
if df.shape[0] > 1:
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(row["Street / Block Name"].lower())
]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
continue
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
continue
if house_no in df["house_no"].values:
df = df[df["house_no"] == house_no]
if df.shape[0] != 1:
# Levenstein distance
if any(df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])
]
else:
# Levenstein distance
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().apply(
lambda x: process.extractOne(
" ".join(
[row[house_no_col], row["Street / Block Name"], row[town_colname]]).lower(),
x
)[1]
) > 90
]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
continue
if any(df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
" ".join([row[house_no_col], row["Street / Block Name"]]).lower()
)):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
" ".join([row[house_no_col], row["Street / Block Name"]]).lower()
)
]
if any(
df[self.STANDARD_PROPERTY_TYPE].str.contains(row[property_type_col].split(" ")[-1].lower())
):
# We ignore "block of flats" entries
df = df[
df[self.STANDARD_PROPERTY_TYPE].str.contains(
row[property_type_col].split(" ")[-1].lower()
) & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
]
if df.shape[0] != 1:
# We have multiple matches - it's likely because the landlord has a duplicate
# that has been referenced in totally different ways so we just match to both
for _, x in df.iterrows():
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: x[self.STANDARD_LANDLORD_PROPERTY_ID],
}
)
continue
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
self.standardised_asset_list = self.standardised_asset_list.drop(columns="house_no")
# We match the "UPRN" which is the landlords ID, onto the master sheet
if measure_mix_col not in master_data.columns:
master_data[measure_mix_col] = "Measure mix not recorded"
matched = pd.DataFrame(matched)
if matched.empty:
continue
master_to_append = master_data[
[scheme_col, "row_id", install_col, submission_col, measure_mix_col, installer_notes_col, installer_col]
].merge(
matched, how="left", on="row_id"
).rename(
columns={
scheme_col: "funding_scheme",
measure_mix_col: "measure_mix",
install_col: "survey_status",
submission_col: "submission_date",
installer_notes_col: "submission_installer_notes",
installer_col: "submission_installer"
}
)
master_to_append["submission_cancelled"] = (
master_to_append["survey_status"].str.lower().str.contains("cancel")
)
master_to_append["submission_installed"] = (
master_to_append["survey_status"].str.lower().str.contains("installed")
)
master_surveyed.append(master_to_append)
unmatched_df = master_data[
master_data["row_id"].isin(unmatched)
]
# The columns are massively different - we take just a few
unmatched_df = unmatched_df[
[
scheme_col, house_no_col, "Street / Block Name", postcode_col, install_col, submission_col
]
].rename(
columns={
scheme_col: "Funding Scheme",
house_no_col: "House Number",
postcode_col: "Postcode",
install_col: "survey_status",
submission_col: "submission_date"
}
)
unmatched_submissions.append(unmatched_df)
master_surveyed = pd.concat(master_surveyed)
master_surveyed = master_surveyed[~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID])]
master_surveyed = master_surveyed[
~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin(
["NOT ON ASSET LIST", "Missing From Asset List"]
)
]
master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[
self.STANDARD_LANDLORD_PROPERTY_ID
].astype(str)
# We de-dupe crudely on landlord property id
self.master_surveyed = master_surveyed.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID]).copy()
# We now add the submission status, based on the hubspot stages
self.master_surveyed["submission_status"] = hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER.label
self.master_surveyed["submission_status"] = np.where(
self.master_surveyed["submission_cancelled"] == True,
hubspot_config.HubspotProcessStatus.INSTALLER_CANCELLED_FINALIZED.label,
self.master_surveyed["submission_status"]
)
self.master_surveyed["submission_status"] = np.where(
self.master_surveyed["submission_installed"] == True,
hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE.label,
self.master_surveyed["submission_status"]
)
self.standardised_asset_list = self.standardised_asset_list.merge(
self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
)
# Make sure no dupes
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError("duplicated ids!")
# Finally, we keep a record of the unmatched
if unmatched_submissions:
self.unmatched_submissions = pd.concat(
unmatched_submissions
)