Model/asset_list/AssetList.py

2841 lines
129 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import os
import re
import tiktoken
from pprint import pprint
from datetime import datetime
from numpy.ma.core import masked_not_equal
from openai import OpenAI
import numpy as np
import pandas as pd
from tqdm import tqdm
from thefuzz import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
from BaseUtility import Definitions
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
import asset_list.mappings.exising_pv as existing_pv_mappings
import asset_list.mappings.built_form as built_form_mappings
import asset_list.mappings.roof as roof_mappings
import asset_list.mappings.outcomes as outcomes_mappings
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = standard_values
self.standard_map = standard_map
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = max_tokens # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
# Replace double strings
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map or value in self.standard_map:
self.remap_dict[value] = (
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
)
continue
if value.lower() in self.standard_map:
self.remap_dict[value] = self.standard_map[value.lower()]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
class AssetList:
"""
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
"""
EPC_API_DATA_NAMES = {
"uprn": "epc_os_uprn",
"address1": "epc_address1",
"address": "epc_address",
"postcode": "epc_postcode",
"inspection-date": "epc_inspection_date",
"current-energy-efficiency": "epc_sap_score_on_register",
"current-energy-rating": "epc_rating_on_register",
"property-type": "epc_property_type",
"built-form": "epc_archetype",
"total-floor-area": "epc_total_floor_area",
"construction-age-band": "epc_age_band",
"floor-height": "epc_floor_height",
"number-habitable-rooms": "epc_number_habitable_rooms",
"walls-description": "epc_wall_construction",
"roof-description": "epc_roof_construction",
"floor-description": "epc_floor_construction",
"mainheat-description": "epc_heating_type",
'mainheatcont-description': "epc_heating_controls",
"secondheat-description": "epc_secondary_heating",
"transaction-type": "epc_reason",
"energy-consumption-current": "epc_heat_demand",
"photo-supply": "epc_photo_supply",
"estimated": "estimated"
}
FIND_EPC_DATA_NAMES = {
"heating_text": "epc_estiamted_heating_kwh",
"hot_water_text": "epc_estimated_hotwater_kwh",
'Assessors name': "epc_assessor_name",
"Assessor's Telephone": "epc_assessor_telephone",
"Assessor's Email": "epc_assessor_email",
"Accreditation scheme": "epc_assessor_accreditation",
"Assessors ID": "epc_assessor_id",
"Solar photovoltaics": "epc_solar_pv"
}
DATETIME_REMAP = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
# These are the accepted methods we have for cleaning the address1 column
ADDRESS_1_CLEANING_METHODS = [
"first_two_words", # This method will split on the fist two words, where the separator is a space
"first_word", # This method will split on the first word, where the separator is a space
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
# "address1_extraction" # This method will use the NLP model to extract address1
]
# Standard column Names
STANDARD_ADDRESS_1 = "domna_address_1"
STANDARD_POSTCODE = "domna_postcode"
STANDARD_FULL_ADDRESS = "domna_full_address"
STANDARD_YEAR_BUILT = "landlord_year_built"
STANDARD_UPRN = "ordnance_survey_uprn"
STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id"
STANDARD_PROPERTY_TYPE = "landlord_property_type"
STANDARD_BUILT_FORM = "landlord_built_form"
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
STANDARD_ROOF_CONSTRUCTION = "landlord_roof_construction"
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
STANDARD_EXISTING_PV = "landlord_existing_pv"
STANDARD_SAP = "landlord_sap_rating"
DOMNA_PROPERTY_ID = "domna_property_id"
# Regular expression for identifying if the address might point to multiple units
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
# List of columns relating to the non-intrusive data
NON_INTRUSIVES_COLNAMES = [
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
"Any further surveyor notes", 'Surveyors Name'
]
NON_INTRUSIVES_ELIGIBILITY_COLUMN = "Eligibility (Red/Yellow/Green)"
OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ['WFT Findings', 'ECO Eligibility']
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
FILLED_CAVITY_SAP_THRESHOLD = 75
# This SAP the
EMPTY_CAVITY_SAP_THRESHOLD = 75
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
# Properties before this year are more likely to have lower EPC ratings and more likely to qualify
EMPTY_CAVITY_YEAR_THRESHOLD = 2002
# Attributes - these are columns that we produce, calcualted based on other pieces of data
ATTRIBUTE_HAS_SOLAR = "attribute_has_solar"
ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors"
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}"
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
"cavity wall, as built, no insulation",
]
# List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated
EPC_INSULATED_WALLS_SUBSTRINGS = [
", insulated", "with external insulation", "with internal insulation", "filled cavity"
]
# List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated
EPC_INSULATED_ROOF_SUBSTRINGS = [
"(another dwelling above)", ", insulated", ", insulated (assumed) ",
", ceiling insulated",
]
# List of strings we look for in the EPC data, where substrings indicate that the cavity is empty
UNINSULATED_CAVITY_SUBSTRINGS = [
"cavity wall, as built, no insulation (assumed)",
"cavity wall, as built, no insulation",
"cavity wall, as built, partial insulation (assumed)",
"cavity wall, as built, partial insulation",
]
def __init__(
self,
local_filepath,
sheet_name,
address1_colname,
postcode_colname,
full_address_colname,
landlord_property_id=None,
full_address_cols_to_concat=None,
missing_postcodes_method=None,
address1_extraction_method=None,
landlord_year_built=None,
landlord_uprn=None,
landlord_property_type=None,
landlord_built_form=None,
landlord_wall_construction=None,
landlord_roof_construction=None,
landlord_heating_system=None,
landlord_existing_pv=None,
landlord_sap=None,
phase=False,
header=0
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
# Read in the data
if local_filepath.endswith(".xlsx"):
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
else:
self.raw_asset_list = pd.read_csv(local_filepath)
self.standardised_asset_list = self.raw_asset_list.copy()
# Will be used to store aggregated figures against the various work types
self.work_type_figures = {}
self.flat_data = None
self.duplicated_addresses = None
self.contact_details = None
self.contact_detail_fields = None
self.outcomes = None
self.outcomes_no_match = pd.DataFrame()
self.outcomes_for_output = pd.DataFrame()
self.master_surveyed = None
self.unmatched_submissions = pd.DataFrame()
self.ecosurv = None
self.ecosurv_no_match = pd.DataFrame()
# When this is True, we intend to break the programme into multiple phases. We may need to review
# how this is structured in the future, as depending on how we get future data, we may need to
# remove some existing phases from the reporting, or specifically highlight the phase (1 to n-1)
# properties, assuming the current phase is n.
self.phase = phase
# We detect the presence of the non-intrusive columns
self.non_intrusives_present = "CIGA Check Required" in self.raw_asset_list.columns
# We detect if we have the old format of non-intruvies
self.old_format_non_intrusives_present = "WFT Findings" in self.raw_asset_list.columns
self.non_intrusives_eligibility = "Eligibility (Red/Yellow/Green)" in self.raw_asset_list.columns
# Names of columns
self.landlord_property_id = landlord_property_id
self.address1_colname = address1_colname
self.postcode_colname = postcode_colname
self.full_address_colname = full_address_colname
self.landlord_year_built = landlord_year_built
self.landlord_uprn = landlord_uprn
self.landlord_property_type = landlord_property_type
self.landlord_built_form = landlord_built_form
self.landlord_wall_construction = landlord_wall_construction
self.landlord_roof_construction = landlord_roof_construction
self.landlord_heating_system = landlord_heating_system
self.landlord_existing_pv = landlord_existing_pv
self.landlord_sap = landlord_sap
# parameters for cleaning
self.full_address_cols_to_concat = full_address_cols_to_concat
self.missing_postcodes_method = missing_postcodes_method
self.address1_extraction_method = address1_extraction_method
self.debug_information = {
"property_type": None,
"wall_construction": None,
"heating_system": None,
"existing_pv": None
}
self.variable_mappings = {}
self.hubspot_data = None
self.rename_map = {}
self.keep_variables = []
# Finally, we handle the case where the landlord's property ID is actually the OS UPRN
if (self.landlord_uprn == self.landlord_property_id) and (self.landlord_property_id is not None):
self.standardised_asset_list[self.STANDARD_UPRN] = self.standardised_asset_list[self.landlord_uprn].copy()
# Update the reference to landlord UPRn
self.landlord_uprn = self.STANDARD_UPRN
# Handle the case when full address and address 1 are the same
if self.full_address_colname == self.address1_colname:
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.address1_colname].copy()
)
# Handle the case where the property type column and built form are missing
if self.landlord_property_type is None and self.landlord_built_form is None:
if "Archetype" in self.raw_asset_list.columns:
# We use the non-intrusives as our property type and built form
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_property_type] = (
self.standardised_asset_list["Archetype"].copy()
)
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list["Archetype"].copy()
)
else:
# We use the EPC data as our property type and built form
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_property_type] = None
self.standardised_asset_list[self.landlord_built_form] = None
# Handle the case where the property type column is the same as the built type
if self.landlord_property_type == self.landlord_built_form:
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list[self.landlord_property_type].copy()
)
# If landlord built form is None (which it often is) we use the built for from inspections
if (self.landlord_built_form is None) and self.non_intrusives_present:
self.landlord_built_form = self.STANDARD_BUILT_FORM
self.standardised_asset_list[self.landlord_built_form] = (
self.standardised_asset_list["Archetype"].copy()
)
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
if method not in self.ADDRESS_1_CLEANING_METHODS:
raise ValueError(f"Method {method} for producing address1 not recognized")
if method == "first_two_words":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list[self.address1_colname] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
for _, x in asset_list.iterrows():
SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col])
return asset_list
raise ValueError(f"Method {method} not recognized")
@staticmethod
def _address1_extraction(x):
pass
def create_property_id(self):
"""
This function creates the domna property ID, which is simply a hash of the full address and postcode
We want all figures to be positive
:return:
"""
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
def _make_hash(value):
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
# Normalize and remove special characters for cleaner ID
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
# Generate SHA-256 hash and truncate it
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
return f"{cleaned_value}-{short_hash}"
# Apply transformation
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
self.standardised_asset_list[self.full_address_colname] +
self.standardised_asset_list[self.postcode_colname]
).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash)
@staticmethod
def _strip_postcode_from_full_address(full_address, postcode):
cleaned = full_address.replace(postcode, "")
# Remove any trailing commas and spaces
cleaned = cleaned.rstrip(", ").strip(",").strip()
return cleaned
@classmethod
def _identify_multi_address(cls, address):
# We check if the address is comma separated
if "," in address:
address1_section = address.split(",")[0]
# We look for string in the form (x-y)
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
@staticmethod
def _convert_uprn(x):
"""
Used to convert UPRNS to integer strings
:param x: uprn to convert
:return: converted uprn
"""
if pd.isnull(x):
return x
# check if numeric
if np.isreal(x):
return str(int(x))
if str(x).isdigit():
return str(int(x))
return x
@staticmethod
def _clean_postcode(postcode):
# Remove double spaces
postcode = postcode.replace(" ", " ")
if " " not in postcode:
# Restructure it
return " ".join(
[postcode[:-3], postcode[-3:]]
)
return postcode
def init_standardise(self):
"""
This function is used to standardise the asset list
:return: standardised asset list
"""
# Remove rows without a postcode
if self.postcode_colname is not None:
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
# We also clean postcode columns where if there is not space, we create one
self.standardised_asset_list[self.postcode_colname] = self.standardised_asset_list[
self.postcode_colname
].apply(self._clean_postcode)
# We clean up portential non-breaking spaces, and double spaces
for col in [
c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if
c is not None
]:
self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False)
if self.address1_colname is None:
if self.address1_extraction_method is None:
raise ValueError("Missing address 1 - please specify an extraction method")
self.address1_colname = self.STANDARD_ADDRESS_1
# If we do not have this, we produce it
self.standardised_asset_list = self._extract_address1(
asset_list=self.standardised_asset_list,
full_address_col=self.full_address_colname,
postcode_col=self.postcode_colname,
method=self.address1_extraction_method
)
if self.full_address_colname is None:
if not self.full_address_cols_to_concat:
raise ValueError("Missing full address - please specify columns to concatenate")
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.full_address_cols_to_concat].apply(
lambda x: ", ".join([y for y in x if not pd.isnull(y)]),
axis=1
)
)
else:
# Make sure to strip the postcode out of the full address
self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply(
lambda x: self._strip_postcode_from_full_address(
full_address=x[self.full_address_colname],
postcode=x[self.postcode_colname]
),
axis=1
)
# We create the domna property id
self.create_property_id()
# Clean up the UPRN column, if the landlord has provided them
if self.landlord_uprn is not None:
self.standardised_asset_list[self.landlord_uprn] = (
self.standardised_asset_list[self.landlord_uprn].apply(self._convert_uprn)
)
# We keep just the columns we care about and will work through the various columns and standardise
variables = [
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_uprn,
self.landlord_property_type,
self.landlord_built_form,
self.landlord_year_built,
self.landlord_wall_construction,
self.landlord_roof_construction,
self.landlord_heating_system,
self.landlord_existing_pv,
self.landlord_sap,
]
# Keep just non-null variables (e.g landlord may not provide uprn
self.keep_variables = [v for v in variables if v is not None]
self.rename_map = {
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
self.address1_colname: self.STANDARD_ADDRESS_1,
self.postcode_colname: self.STANDARD_POSTCODE,
self.full_address_colname: self.STANDARD_FULL_ADDRESS,
self.landlord_uprn: self.STANDARD_UPRN,
self.landlord_property_type: self.STANDARD_PROPERTY_TYPE,
self.landlord_built_form: self.STANDARD_BUILT_FORM,
self.landlord_year_built: self.STANDARD_YEAR_BUILT,
self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION,
self.landlord_roof_construction: self.STANDARD_ROOF_CONSTRUCTION,
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
self.landlord_existing_pv: self.STANDARD_EXISTING_PV,
self.landlord_sap: self.STANDARD_SAP,
}
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
non_intrusive_columns = []
if self.non_intrusives_present:
non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES
if self.non_intrusives_eligibility:
non_intrusive_columns.append(self.NON_INTRUSIVES_ELIGIBILITY_COLUMN)
if self.old_format_non_intrusives_present:
# We check if we have the ECO Eligibility column, which we might not have
non_intrusive_columns = [
c for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES if c in self.standardised_asset_list.columns
]
if "Warmfront Finding" in self.standardised_asset_list.columns:
non_intrusive_columns.append("Warmfront Finding")
self.keep_variables += non_intrusive_columns
self.rename_map = {
**self.rename_map,
**dict(
zip(non_intrusive_columns, ["non-intrusives: " + c for c in non_intrusive_columns])
)
}
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
].apply(lambda x: self._identify_multi_address(x))
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
# we see instances of "average thermal transmittance" in the description
if self.landlord_wall_construction is not None:
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
"average thermal transmittance"
) == True,
"new build - average thermal transmittance",
self.standardised_asset_list[self.landlord_wall_construction]
)
else:
# We want to make sure that we have a column for wall construction
self.landlord_wall_construction = self.STANDARD_WALL_CONSTRUCTION
self.standardised_asset_list[self.landlord_wall_construction] = None
if self.landlord_roof_construction is None:
self.landlord_roof_construction = self.STANDARD_ROOF_CONSTRUCTION
self.standardised_asset_list[self.landlord_roof_construction] = None
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime - year built has not been renamed
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
)
no_data_codes = {"No Data": None}
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(no_data_codes)
)
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
self.standardised_asset_list[self.landlord_year_built]
)
# Convert this to year
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].dt.year
)
else:
# We attempt to convert the year built to a datetime, by detecting the format and converting
def extract_year(date_str):
"""
Extracts the year from a date string in the format '01-Jul-YYYY'.
Returns the extracted year as an integer or None if the format is incorrect.
"""
known_errors = [
"#MULTIVALUE",
"This cell has an external reference that can't be shown or edited. Editing this cell will "
"remove the external reference.",
"ND",
'PIMSS EMPTY',
"UNKNOWN"
]
if pd.isnull(date_str) or date_str in known_errors or (date_str == 0):
return None
if isinstance(date_str, str):
match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str)
if match:
return int(match.group(1)) # Extract the year and convert to integer
if "-" in date_str:
# Count the number of times we have "-", as we've seen double ranges
# (when we have extensions) so the format is like this:
# 'G: 1983-1990, H: 1991-1995'
if date_str.count("-") == 2:
# We have a range
return int(date_str.split("-")[1].split(",")[0])
# We probably have a range
return int(date_str.split("-")[1].strip())
if isinstance(date_str, datetime):
return date_str.year
if isinstance(date_str, float):
if str(int(date_str)).isdigit() & (len(str(int(date_str))) == 4):
return int(date_str)
# Check if date_str is a year itself
if str(date_str).isdigit() & (len(str(date_str)) == 4):
return int(date_str)
# Remove any non-numeric characters
date_str = re.sub(r"\D", "", str(date_str))
if str(date_str).isdigit() & (len(str(date_str)) == 4):
return int(date_str)
raise NotImplementedError(f"Unhandled format for year built, value is {date_str} - implement me")
self.standardised_asset_list[self.landlord_year_built] = self.standardised_asset_list[
self.landlord_year_built
].apply(extract_year)
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
"standard_map": property_type_mappings.PROPERTY_MAPPING
},
self.landlord_built_form: {
"standard_values": built_form_mappings.STANDARD_BUILT_FORMS,
"standard_map": built_form_mappings.BUILT_FORM_MAPPINGS
},
self.landlord_wall_construction: {
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
},
self.landlord_heating_system: {
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
"standard_map": heating_mappings.HEATING_MAPPINGS
},
self.landlord_existing_pv: {
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
},
self.landlord_roof_construction: {
"standard_values": roof_mappings.STANDARD_ROOF_CONSTRUCTIONS,
"standard_map": roof_mappings.ROOF_CONSTRUCTION_MAPPINGS
}
}
# Keep just entries where the key is not None
to_remap = {k: v for k, v in to_remap.items() if k is not None}
for variable, config in to_remap.items():
logger.info("Standardising variable: %s", variable)
# Strip each of these columns
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].str.strip()
values_to_remap = self.standardised_asset_list[variable].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
self.variable_mappings[variable] = remap_dictionary
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
# asset list is returned
for variable, mapping in self.variable_mappings.items():
pprint(f"Variable: {variable}")
pprint(mapping)
# Print a space
print("\n")
pprint("=======================================")
def apply_standardiation(self, override_empty_mappings=False):
"""
This function applies the standardisation to the asset list
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
if there are no categories which need remapping which is highly unlikely
:return:
"""
if self.phase:
# We filter on just the properties that have had an inspection
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list['Surveyors Name'].isin(["YET TO BE SURVEYED"])
]
if not self.variable_mappings and not override_empty_mappings:
raise ValueError("Please run init_standardise first")
logger.info("Applying standardisation to asset list")
for variable, mapping in self.variable_mappings.items():
self.standardised_asset_list[variable + "_original_from_landlord"] = (
self.standardised_asset_list[variable].copy()
)
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
# Drop the dupes
pprint(
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
f"addresses - dropping"
)
# Keep a record of duplicates
self.duplicated_addresses = self.standardised_asset_list[
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy()
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
]
# Apply renames to our standard names
# Perform final variable selection and renaming:
# We add the original columns to the keep variables
self.keep_variables += [
k + "_original_from_landlord" for k in self.variable_mappings.keys()
]
self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename(
columns=self.rename_map
)
# We fill any standard columns that are not in the data because they were not provided by the landlord
missing_variables = [
v for v in [
self.STANDARD_EXISTING_PV,
self.STANDARD_HEATING_SYSTEM,
self.STANDARD_UPRN,
self.STANDARD_PROPERTY_TYPE,
self.STANDARD_YEAR_BUILT,
self.STANDARD_WALL_CONSTRUCTION,
self.STANDARD_HEATING_SYSTEM,
self.STANDARD_EXISTING_PV
] if v not in self.standardised_asset_list.columns
]
for v in missing_variables:
self.standardised_asset_list[v] = None
# Convert to string
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = (
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str)
)
def merge_data(self, df: pd.DataFrame):
"""
Used to insert data into the standardised asset list, based on the domna property id
:return:
"""
if self.DOMNA_PROPERTY_ID not in df.columns:
raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}")
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
df = df.drop_duplicates(
subset=[self.DOMNA_PROPERTY_ID],
keep="first"
)
self.standardised_asset_list = self.standardised_asset_list.merge(
df, how="left", on=self.DOMNA_PROPERTY_ID
)
def extract_attributes(self, pull_epc=True):
# Used to extracty the typical attributes that we use to identify viable work
self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR] = (
self.standardised_asset_list[self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]] |
~self.standardised_asset_list[self.EPC_API_DATA_NAMES["photo-supply"]].isin(["0.0", 0, None, "", np.nan])
)
accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"]
# The logic here is:
# 1) Take the property type provided by the HA themselves
# 2) In absence of that, take the EPC property type
# 3) Otherwise use None
self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = self.standardised_asset_list.apply(
lambda x: estimate_number_of_floors(
property_type=(
str(x[self.STANDARD_PROPERTY_TYPE]).title() if
str(x[self.STANDARD_PROPERTY_TYPE]).title() in accepted_epc_property_types else (
x[self.EPC_API_DATA_NAMES["property-type"]] if not
pd.isnull(x[self.EPC_API_DATA_NAMES["property-type"]]) else None
)
)
),
axis=1
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].astype(float)
)
# Replace "" value with None
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].replace("", None)
)
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].astype(float)
)
# Estimate the perimeter
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
), axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = self.standardised_asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
floor_height=(
float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if
x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5
),
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
built_form=x[self.EPC_API_DATA_NAMES["built-form"]]
),
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = self.standardised_asset_list.apply(
lambda x: RoofAttributes(description=x[self.EPC_API_DATA_NAMES["roof-description"]]).process()[
"insulation_thickness"] if not pd.isnull(
x[self.EPC_API_DATA_NAMES["roof-description"]]) else None,
axis=1
)
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].str.replace("+", "")
)
# We produce some additional fields
# 1) Is the SAP rating below C75
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]].astype(float) <=
self.FILLED_CAVITY_SAP_THRESHOLD
)
# 2) Flag anything where the EPC is older than 5 years
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
pd.to_datetime(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
).dt.year < self.EPC_YEAR_THRESHOLD
)
self.process_age_band()
def process_age_band(self):
processed_age_band = []
for _, x in self.standardised_asset_list.iterrows():
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES
):
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": None,
"does_age_band_match_epc_age_band": "No EPC Age Band"
}
)
continue
# We exatract the upper and lower bounds
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
"England and Wales: 2007 onwards", "England and Wales: 2012 onwards"
]:
year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[
"construction-age-band"]] == "England and Wales: 2007 onwards" else 2012
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
else "EPC Age Band is older than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": year_lower_bound,
"epc_year_upper_bound": None,
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900":
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": None,
"epc_year_upper_bound": 1899,
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
)
else "EPC Age Band is different from Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
"does_age_band_match_epc_age_band": age_band_matches
}
)
continue
# Oherwise, we extract the upper and lower bounds
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1]
lower_date, upper_date = age_band.split("-")
if not x[self.STANDARD_YEAR_BUILT]:
age_band_matches = "No Year Built From Landlord"
else:
age_band_matches = (
"EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (
x[self.STANDARD_YEAR_BUILT] <= float(upper_date)
)
else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
else "EPC Age Band is newer than Year Built"
)
processed_age_band.append(
{
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
"epc_year_lower_bound": int(lower_date),
"epc_year_upper_bound": int(upper_date),
"does_age_band_match_epc_age_band": age_band_matches
}
)
processed_age_band = pd.DataFrame(processed_age_band)
self.standardised_asset_list = self.standardised_asset_list.merge(
processed_age_band, how="left"
)
def identify_worktypes(self, cleaned):
if self.landlord_sap is not None:
# We add a SAP category for all work type identification
self.standardised_asset_list["SAP Category"] = np.where(
(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54) |
(self.standardised_asset_list[self.STANDARD_SAP] <= 54)
),
"SAP Rating 54 or less",
np.where(
(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68) |
(self.standardised_asset_list[self.STANDARD_SAP] <= 68)
),
"SAP Rating 55-68",
np.where(
(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.EMPTY_CAVITY_SAP_THRESHOLD
) | (self.standardised_asset_list[self.STANDARD_SAP] <= self.EMPTY_CAVITY_SAP_THRESHOLD)
),
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
),
)
)
else:
# We add a SAP category for all work type identification
# We break into 4 categories (54 or less, 55-68, 69-74, 75 or more)
self.standardised_asset_list["SAP Category"] = np.where(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54),
"SAP Rating 54 or less",
np.where(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68),
"SAP Rating 55-68",
np.where(
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
self.EMPTY_CAVITY_SAP_THRESHOLD
),
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
),
)
)
# Before we being, we identify if a property has solar already as we use this
# for identifying cavity jobs
if self.non_intrusives_present:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF"
)
elif self.old_format_non_intrusives_present:
existing_solar_non_intrusives_check = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
["solar pv on roof"]
)
)
else:
# We don't have an indication
existing_solar_non_intrusives_check = False
self.standardised_asset_list["property_has_solar"] = (
(self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") |
existing_solar_non_intrusives_check |
(self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
)
# If we have non-intrusives completed, we can use this to identify work types
######################################################
# Empty cavity:
######################################################
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
# 2) The age is before 1995
# 3) We don't remove anything that haas access issues yet
if self.non_intrusives_present:
non_intrusives_wall_filter = (
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"])
)
elif self.old_format_non_intrusives_present:
non_intrusives_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
["empty cavity", "partial fill", "empty", "EMPTY CAVITY 70MM", "partial", "empty cav"]
) | (
(
self.standardised_asset_list['non-intrusives: WFT Findings']
.str.lower().str.strip().str.contains("empty cavity|partial fill") &
~self.standardised_asset_list['non-intrusives: WFT Findings']
.astype(str).str.lower().str.strip().str.contains("major access issues")
)
)
)
else:
# We set the filter to False, as we have no non-intrusives
non_intrusives_wall_filter = False
if self.landlord_year_built is None:
year_built_filter = self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
else:
year_built_filter = (
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) |
(self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD)
)
# Criteria:
# The property isn't a bedsit
# Non-intrusives indicate it needs a fill
# The EPC year is before 2002
# We also flag where the property has solar on the roof, because this is a signal of a high EPC rating
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
year_built_filter &
(
~self.standardised_asset_list["property_has_solar"]
)
)
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
year_built_filter &
(
# If the property has solar, there's a chance it won't qualify
self.standardised_asset_list["property_has_solar"]
)
)
# We also add a filter on anything that was generally identified by the non-intrusives
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_year_filter"] = (
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] &
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter
)
if (not self.non_intrusives_eligibility) and (not self.old_format_non_intrusives_present):
# If we have NO inspections data, we capture all of the wall types and don't filter on age of the EPC
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
else:
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) &
(
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) |
(self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD)
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
# Finally, we create a flag to indicate that the cavity is empty, based on the criteria above
self.standardised_asset_list["cavity_is_empty"] = (
non_intrusives_wall_filter |
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) |
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"])
)
######################################################
# Extraction
######################################################
# as needing a CIGA check. What is the logic we should be applying here?
if self.non_intrusives_present:
extraction_wall_filter = (
(self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") &
(self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) &
(~self.standardised_asset_list['non-intrusives: Material'].isin(
["GREY LOOSE BEAD", "COMPACTED BEAD", "FIBRE BATT NO CAVITY", "EMPTY NARROW BELOW 30mm"]
))
)
if self.non_intrusives_eligibility:
# If we have the eligibility column, we check if the wall is eligible
extraction_wall_filter = (
extraction_wall_filter &
~self.standardised_asset_list["non-intrusives: Eligibility (Red/Yellow/Green)"].isin(
["RED"]
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter & year_built_filter
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = (
extraction_wall_filter & ~year_built_filter
)
elif self.old_format_non_intrusives_present:
print("Review these categories!!!!")
extraction_wall_filter = (
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
[
'blown in yellow wool', 'retro drilled & filled', 'white fibre from build',
'foam filled from build', 'retro drilled gas in block', 'block in rock wool', 'rdf / tilehung',
'fibre from build', 'blown in rock wool', 'rdf / tile hung', 'retro drilled',
'rock wool from build', 'part rendered retro drilled', 'white fibtr from build.',
'retro drilled and filled', 'blown in white wool', 'blown in yellow fibre from build', 'rdf',
'polybead', 'foam filled', 'blown in white bead from build', 'blown in yellow fibre',
'retro drilled det', 'blown in rockwool', 'retro drilled det empty cav', 'retro drilled end',
'retro filled extension', 'retro filled', 'foam'
]
)
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
else:
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = False
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
######################################################
# Solar
######################################################
# Criteria:
# Check 1: Does the property have a valid heating system?
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
[
"air source heat pump",
"ground source heat pump",
"high heat retention storage heaters",
"electric boiler"
]
)
)
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] = (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
["electric storage heaters", "room heaters", "electric radiators", "no heating", "electric fuel"]
)
)
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().str.contains("air source heat pump|ground source heat pump|boiler and radiators, electric")
) | (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters"
) & (
self.standardised_asset_list[self.EPC_API_DATA_NAMES[
"mainheatcont-description"]] == "Controls for high heat retention storage heaters"
)
)
)
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
"electric storage heaters|room heaters"
) & (
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["mainheatcont-description"]
] != "Controls for high heat retention storage heaters"
)
)
# Basic check - both of the previous two shouldn't be true simultaneously
if (
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] &
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
).sum():
logger.info("We have an example of both heating system checks being true - checking known cases")
known_edge_cases = ['Ground source heat pump, radiators, electric, Electric storage heaters']
error_cases = self.standardised_asset_list[
(
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] &
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
)
]
if all(error_cases[self.EPC_API_DATA_NAMES["mainheat-description"]].isin(known_edge_cases)):
logger.info("Within known edge cases")
else:
raise ValueError("Both heating system checks are true - this should not be possible")
# Check 3: Does the property meet the fabric condition
# Solar PV installs are subject to the minimum insulation requirements which means:
# 1) one of the following insulation measures must be installed as part of the same
# ECO4 project:
# • roof insulation (flat roof, pitched roof, room-in-roof)
# • exterior facing wall insulation (cavity wall, solid wall)
# • party cavity wall insulation
# • floor insulation (solid and underfloor)
#
# OR
#
# all measures (except any exempted measure referred to in paragraph 4.28)
# listed in paragraph a) must already be installed
#
# With this in mind, we look for 2 clases
# 1) The property is fully insulated apart from the loft (<200mm insulation)
# 2) THe property is fully insulated
print("Should we include cavity properties where they might be uninsulated?")
self.standardised_asset_list["solar_landlord_walls_insulated"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
[
"filled cavity", "insulated solid brick", "insulated timber frame",
]
)
)
if self.non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: Insulated"].isin(
["EWI", "RETRO DRILLED", "FILLED AT BUILD"]
)
)
elif self.old_format_non_intrusives_present:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
[
"retro drilled", "retro filled", "ewi", "retro drilled/ solid", "retro drilled and filled",
]
) |
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().str.contains(
"retro drilled"
)
)
else:
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False
# We merge on the u-value for average thermal transmittance
walls_uvalue_data = pd.DataFrame(cleaned["walls-description"])
walls_uvalue_data = walls_uvalue_data[
~pd.isnull(walls_uvalue_data["thermal_transmittance"])
][["original_description", "thermal_transmittance"]].rename(
columns={
"original_description": self.EPC_API_DATA_NAMES["walls-description"],
"thermal_transmittance": "walls_u_value"
}
)
self.standardised_asset_list = self.standardised_asset_list.merge(
walls_uvalue_data, how="left", on=self.EPC_API_DATA_NAMES["walls-description"]
)
self.standardised_asset_list["solar_epc_walls_insulated"] = (
(
self.standardised_asset_list[
self.EPC_API_DATA_NAMES["walls-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS)
)
) | (
self.standardised_asset_list["walls_u_value"].apply(lambda x: x <= 0.7 if not pd.isnull(x) else False)
)
)
# We merge on the u-value for average thermal transmittance
roof_data = pd.DataFrame(cleaned["roof-description"])[
["original_description", "thermal_transmittance", "is_pitched", "is_loft"]
].rename(
columns={
"original_description": self.EPC_API_DATA_NAMES["roof-description"],
"thermal_transmittance": "roof_u_value",
}
)
self.standardised_asset_list = self.standardised_asset_list.merge(
roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
)
# If the u-value of a roof is less than 0.7 we consider it insulated
self.standardised_asset_list["solar_epc_roof_insulated"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]].str.lower().str.contains(
"|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS),
) | (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) >= 200 if str(x).isdigit() else False
)
) | (
self.standardised_asset_list["roof_u_value"].apply(
lambda x: x <= 0.7 if not pd.isnull(x) else False
)
)
)
self.standardised_asset_list["solar_epc_loft_needs_topup"] = (
self.standardised_asset_list[
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
lambda x: int(x) < 200 if str(x).isdigit() else False
) | (
(
self.standardised_asset_list["is_loft"] | self.standardised_asset_list["is_pitched"]
) & (
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].isin(
["below average", "none"]
)
)
)
)
self.standardised_asset_list["epc_has_floor_recommendation"] = (
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
)
# Check if the boiler is electric
# We check if it contains both the terms boiler & electric
self.standardised_asset_list["has_electric_boiler"] = (
(
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
.str.lower().isin(
["boiler and radiators, electric"])
) | (
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM] == "electric boiler"
)
)
####################################
# Check solar eligibility
####################################
# Set up the filters to stop repetition
correct_heating_system = (
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] |
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] |
self.standardised_asset_list["has_electric_boiler"]
)
needs_heating_upgrade = (
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] |
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
)
# The requirements for walls are:
# 1) walls are insulated
# 2) property is a cavity (can be done insulated or not)
walls_meet_solar_requirements = (
# The landlord is saying the walls are insulated
self.standardised_asset_list["solar_landlord_walls_insulated"] |
# EPC data is saying the walls are insulated
self.standardised_asset_list["solar_epc_walls_insulated"] |
# Non-intrusives are saying the walls are insulated
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] |
# It's empty cavity
self.standardised_asset_list["cavity_is_empty"] |
# It's a cavity wall
(self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].str.contains("cavity"))
)
not_a_flat = (
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat"
)
solar_roof_meets_criteria = (
self.standardised_asset_list["solar_epc_roof_insulated"] |
self.standardised_asset_list["solar_epc_loft_needs_topup"]
)
self.standardised_asset_list["solar_eligible"] = (
# Property isn't a flag
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof meets criteria
solar_roof_meets_criteria
)
# With heating upgrade
self.standardised_asset_list["solar_eligible_needs_heating_upgrade"] = (
not_a_flat &
# Needs heating upgrade
needs_heating_upgrade &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are insulated
walls_meet_solar_requirements &
# Roof meets criteria
solar_roof_meets_criteria
)
# We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E
# or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables
self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = (
not_a_flat &
# Landlord data or EPC data indicates the heating system is appropriate - in this case, we can also take
# electric boilers
correct_heating_system &
# The property doesn't currently have solar
~self.standardised_asset_list["property_has_solar"] &
# The walls are uninsulated solid
~walls_meet_solar_requirements &
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 57)
)
# Drop anything we don't need
self.standardised_asset_list = self.standardised_asset_list.drop(
columns=["walls_u_value", "roof_u_value"]
)
# Adjust flagged extraction jobs to remove anything for solar
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
~self.standardised_asset_list["solar_eligible"]
)
# Finally, we note why each property has been flagged
self.standardised_asset_list["cavity_reason"] = None
empty_cavity_map = {
"non_intrusive_indicates_empty_cavity": "Non-Intrusive Data Shows Empty Cavity: ",
"non_intrusive_indicates_empty_cavity_has_solar": "Non-Intrusive Data Shows Empty Cavity - property "
"already has solar: ",
"non_intrusive_indicates_empty_cavity_no_year_filter": f"Non-Intrusive Data Shows Empty Cavity, "
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
}
for variable, description in empty_cavity_map.items():
self.standardised_asset_list["cavity_reason"] = np.where(
self.standardised_asset_list[variable] &
pd.isnull(self.standardised_asset_list["cavity_reason"]),
description + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
# We break the cavity reason into a few different categories, when the EPC is different from inspections
if self.old_format_non_intrusives_present:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
[
"retro drilled and filled", "retro drilled", "retro filled", "retro drilled & filled",
]
)) &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
self.standardised_asset_list['non_intrusive_indicates_cavity_extraction'] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show filled or other: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
elif self.non_intrusives_present:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "RETRO DRILLED") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(self.standardised_asset_list['non-intrusives: Insulated'] == "FILLED AT BUILD") &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show filled at build: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
else:
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["epc_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"EPC Shows Empty Cavity, inspections show non-cavity build: " + self.standardised_asset_list[
"SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
# Landlord data: The landlord's data indicates that the wall is an uninsulated cavity wall, but EPC and
# inspections show filled
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
~self.standardised_asset_list["epc_indicates_empty_cavity"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"Landlord Data Shows Empty Cavity, EPC & Inspections Shows Filled or Non-cavity: " +
self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
# Flag extraction
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
"Non-Intrusive Data Shows Cavity Extraction: " + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"Non-Intrusive Data Shows Cavity Extraction, built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: " +
self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
######################################################
# Flag solar
######################################################
self.standardised_asset_list["solar_reason"] = None
# Map of variables and fill values for the solar_reason variable
# ordering of this map is important, where we flag our prioritised work types first
solar_reason_map = {
"solar_eligible": "Solar Eligible: ",
"solar_eligible_solid_wall_uninsulated": "Solar Eligible, Solid Wall Uninsulated, EPC E or Below: ",
"solar_eligible_needs_heating_upgrade": (
"Solar Eligible, Needs Heating Upgrade: "
)
}
for variable, reason in solar_reason_map.items():
self.standardised_asset_list["solar_reason"] = np.where(
self.standardised_asset_list[variable] & pd.isnull(self.standardised_asset_list["solar_reason"]),
reason + self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["solar_reason"]
)
# Finally, anything flagged for solar should not be flagged for cavity - make them None
self.standardised_asset_list["cavity_reason"] = np.where(
(
~pd.isnull(self.standardised_asset_list["solar_reason"]) &
~pd.isnull(self.standardised_asset_list["cavity_reason"])
),
None,
self.standardised_asset_list["cavity_reason"]
)
# Flag anything that has existing outcomes
if (self.outcomes is not None) and ("surveyed" in self.standardised_asset_list.columns):
if "installer refusal" not in self.standardised_asset_list.columns:
self.standardised_asset_list["cavity_reason"] = np.where(
(
(self.standardised_asset_list["surveyed"] > 0)
),
None,
self.standardised_asset_list["cavity_reason"]
)
else:
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(self.standardised_asset_list["surveyed"] > 0) |
(self.standardised_asset_list["installer refusal"] > 0)
),
None,
self.standardised_asset_list[col]
)
if self.master_surveyed is not None:
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(~pd.isnull(self.standardised_asset_list["submission_date"]))
),
None,
self.standardised_asset_list[col]
)
if self.ecosurv is not None:
for col in ["cavity_reason", "solar_reason"]:
self.standardised_asset_list[col] = np.where(
(
(~pd.isnull(self.standardised_asset_list["ecosurv_reference"]))
),
None,
self.standardised_asset_list[col]
)
blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats"
]
non_blocks_of_flats = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats"
]
# Produce some aggregate figures
self.work_type_figures = {
**non_blocks_of_flats["cavity_reason"].value_counts().to_dict(),
**{
k + " (Block of flats)": v for k, v in
blocks_of_flats["solar_reason"].value_counts().to_dict().items()
},
**self.standardised_asset_list["solar_reason"].value_counts().to_dict()
}
# We prepare outcomes for output
if self.outcomes is not None:
logger.info("Preparing outcomes for output")
identified_work = self.standardised_asset_list[
~pd.isnull(self.standardised_asset_list["cavity_reason"]) |
~pd.isnull(self.standardised_asset_list["solar_reason"])
][self.DOMNA_PROPERTY_ID].values
if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
self.outcomes_for_output = self.outcomes[
self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work)
]
def flat_analysis(self):
# We need to deduce the building name - we strip out the house number
# We want to deduce if flats have 50% of the properties below C75
# We group by postcode and property type
grouped = self.standardised_asset_list.groupby(
[self.STANDARD_POSTCODE, self.STANDARD_PROPERTY_TYPE]
)
flat_data = []
for _, group in grouped:
if "flat" in group[self.STANDARD_PROPERTY_TYPE].values:
num_flats = group[self.STANDARD_PROPERTY_TYPE].shape[0]
num_below_c75 = group[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
].lt(self.FILLED_CAVITY_SAP_THRESHOLD).sum()
# Check if any flats are below C69
num_flats_below_c69 = group[
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
].lt(69).sum()
flat_data.append(
{
"Postcode": group[self.STANDARD_POSTCODE].iloc[0],
"Property Type": "Flat",
"Number of Flats with EPC": num_flats,
"Number of Flats below C75": num_below_c75,
"Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats),
"Number of Flats Below C69": num_flats_below_c69,
}
)
flat_data = pd.DataFrame(flat_data)
self.flat_data = flat_data
@staticmethod
def split_full_name(x):
if pd.isnull(x):
return None, None, None
x = x.lower()
titles = ["mr", "mrs", "ms", "miss", "dr", "prof"]
# Remove titles
detected_title = [title for title in titles if x.startswith(title)]
if detected_title:
for title in detected_title:
x = x.replace(title, "")
x = x.strip()
first_name, last_name = x.split(" ")[0], x.split(" ")[-1]
title = detected_title[0].title() if detected_title else None
return title, first_name.title(), last_name.title()
def load_contact_details(
self,
local_filepath,
sheet_name,
landlord_property_id,
phone_number_column=None,
email_column=None,
fullname_column=None,
firstname_column=None,
lastname_column=None
):
self.contact_detail_fields = {
"landlord_property_id": landlord_property_id,
"phone_number": phone_number_column,
"email": email_column,
"fullname": fullname_column,
"firstname": firstname_column,
"lastname": lastname_column
}
details_colnames = [
phone_number_column, email_column, fullname_column, firstname_column, lastname_column
]
# We'll fill them
none_details = [x for x in details_colnames if x is None]
details_colnames = [x for x in details_colnames if x is not None]
contact_details = pd.read_excel(
local_filepath, sheet_name=sheet_name
)[[self.contact_detail_fields["landlord_property_id"]] + details_colnames]
contact_details = contact_details[
~pd.isnull(contact_details[self.contact_detail_fields["landlord_property_id"]])
]
# Fill anything we don't have
for detail in none_details:
contact_details[detail] = None
if fullname_column and not (firstname_column and lastname_column):
contact_details["title"], contact_details["first_name"], contact_details["last_name"] = zip(
*contact_details[fullname_column].apply(self.split_full_name)
)
else:
raise NotImplementedError("Implement me")
self.contact_details = contact_details
def prepare_for_crm(self, company_domain, crm_pipeline_name, first_dealstage, assigned_surveyors):
"""
This function prepares the data for upload into Hubspot
:return:
"""
# This is a placeholder for now
# This maps the opportunities as we reference them, to the product data as stored in Hubspot
product_lookup_table = {
"Non-Intrusive Data Showed Cavity Extraction": {
"name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500
},
"Non-Intrusive Data Showed Empty Cavity": {
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
},
"Non-Intrusive Data Showed Empty Cavity but all SAP scores allowed": {
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
},
"Non-Intrusive Data Showed Cavity Extraction but all SAP scores allowed": {
"name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500
},
"EPC Data Showed Empty Cavity": {
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
},
"Solid Floor, Insulated, No Solar": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
},
"Solid Floor, Insulated, Needs Loft": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
},
"Other Floor, Insulated, No Solar": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
},
"Other Floor, Insulated, Needs Loft": {
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
}
}
# We check if all products are covered in the lookup table
cavity_products = self.standardised_asset_list["cavity_reason"].unique()
solar_products = self.standardised_asset_list["solar_reason"].unique()
# Check if there any options not in out lookup table
if (
any(x for x in cavity_products if x not in product_lookup_table) or
any(x for x in solar_products if x not in product_lookup_table)
):
raise ValueError("We have products not referenced in the lookup table - check this")
programme_data = self.standardised_asset_list.copy()
# Exclusions - these are properties we won't treat for the moment
product_exclusions = [
"Other Floor, Insulated, No Solar",
"Other Floor, Insulated, Needs Loft"
]
if product_exclusions:
logger.warning("Excluding products: %s", product_exclusions)
programme_data = programme_data[programme_data["solar_reason"].isin(product_exclusions) == False]
# Merge on the contact details
programme_data = programme_data.merge(
self.contact_details,
how="left",
left_on=self.STANDARD_LANDLORD_PROPERTY_ID,
right_on=self.landlord_property_id,
)
programme_data["Company Domain Name <COMPANY domain>"] = company_domain
# Append the product data onto the programme data
programme_data["cavity_product"] = programme_data["cavity_reason"].map(
lambda x: product_lookup_table.get(x, {"name": None})["name"]
)
programme_data["solar_product"] = programme_data["solar_reason"].map(
lambda x: product_lookup_table.get(x, {"name": None})["name"]
)
programme_data["domna_product"] = programme_data["solar_reason"].copy()
programme_data["domna_product"] = np.where(
pd.isnull(programme_data["domna_product"]),
programme_data["solar_product"],
programme_data["domna_product"]
)
# We filter just on rows where we have a product
programme_data = programme_data[
~pd.isnull(programme_data["domna_product"])
]
programme_data = programme_data.drop(columns=["solar_product", "cavity_product"])
product_df = (
pd.DataFrame(product_lookup_table).T[["name", "id", "unit_price"]]
.reset_index()
.rename(
columns={
"name": "Name <LINE_ITEM name>",
"id": 'Product ID <LINE_ITEM hs_product_id>',
"unit_price": 'Unit price <LINE_ITEM price>',
"index": "domna_product"
}
)
)
product_df['Quantity <LINE_ITEM quantity>'] = 1
# Append on the product data
programme_data = programme_data.merge(
product_df,
how="left",
on="domna_product",
)
# Add in deal and pipeline information
programme_data["dealname"] = programme_data[self.STANDARD_FULL_ADDRESS] + " : " + programme_data[
"domna_product"]
programme_data['Pipeline <DEAL pipeline>'] = crm_pipeline_name
programme_data['Deal Stage <DEAL dealstage>'] = first_dealstage
programme_data['Associations: Listing'] = "Property Owner"
programme_data = programme_data.merge(
assigned_surveyors.rename(
columns={self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID}
), how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
)
# This maps the hubspot schema to the template. Anything that is not covered in this will be flagged
schema_mappings = {
'Name <LISTING hs_name>': self.DOMNA_PROPERTY_ID, # TODO: Maybe change this?
'Company Domain Name <COMPANY domain>': 'Company Domain Name <COMPANY domain>',
'Email <CONTACT email>': (
self.contact_detail_fields["email"] if self.contact_detail_fields["email"] else None
), # TODO: Review
'First Name <CONTACT firstname>': (
self.contact_detail_fields["firstname"] if self.contact_detail_fields["firstname"] else None
), # TODO: Review
'Last Name <CONTACT lastname>': (
self.contact_detail_fields["lastname"] if self.contact_detail_fields["lastname"] else None
), # TODO: Review
'Phone <CONTACT phone>': (
self.contact_detail_fields["phone_number"] if self.contact_detail_fields["phone_number"] else None
), # TODO: Review
'Full Address <LISTING full_address>': self.STANDARD_FULL_ADDRESS,
'Address 1 <LISTING hs_address_1>': self.STANDARD_ADDRESS_1,
'Address 2 <LISTING hs_address_2>': None, # TODO: Don't have this for the moment
'Postcode <LISTING hs_zip>': self.STANDARD_POSTCODE,
'Property Type <LISTING property_type>': self.STANDARD_PROPERTY_TYPE,
'Property Sub Type <LISTING property_sub_type>': None, # TODO: Don't have this for the moment
'Bedroom(s) <LISTING hs_bedrooms>': None, # TODO: Don't have this for the moment
'Domna Property ID <LISTING domna_property_id>': self.DOMNA_PROPERTY_ID,
'National UPRN <LISTING national_uprn>': (
self.STANDARD_UPRN if self.STANDARD_UPRN is not None else self.EPC_API_DATA_NAMES["uprn"]
),
'Owner Property ID <LISTING owner_property_id>': self.STANDARD_LANDLORD_PROPERTY_ID,
'Wall Construction <LISTING wall_construction>': self.STANDARD_WALL_CONSTRUCTION,
'Heating System <LISTING heating_system>': self.STANDARD_HEATING_SYSTEM,
'Year Built <LISTING hs_year_built>': self.STANDARD_YEAR_BUILT,
'Boiler Make <LISTING boiler_make>': None, # TODO: Don't have this for the moment
'Boiler Model <LISTING boiler_model>': None, # TODO: Don't have this for the moment
'Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>': None,
# TODO: Don't have this for the moment
'Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>': (
"non-intrusives: Construction" if self.non_intrusives_present else None
),
'Non-intrusives: Insulation <LISTING non_intrusives__insulation>': (
"non-intrusives: Insulated" if self.non_intrusives_present else None
),
'Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>': (
"non-intrusives: Material" if self.non_intrusives_present else None
),
'Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>': (
'non-intrusives: CIGA Check Required' if self.non_intrusives_present else None
),
'Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>': (
'non-intrusives: PV, ACCESS ISSUE, SEE NOTES' if self.non_intrusives_present else None
),
'Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>': (
'non-intrusives: OFF GAS - ROOF ORIENTATION' if self.non_intrusives_present else None
),
'Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>': (
'non-intrusives: Any further surveyor notes' if self.non_intrusives_present else None
),
'Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>': (
'non-intrusives: Surveyors Name' if self.non_intrusives_present else None
),
'CIGA: Date Requested <LISTING ciga__date_requested>': None, # TODO: Don't have this for the moment
'CIGA: Cavity Guarantee Found <LISTING ciga__cavity_guarantee_found>': None,
'Last EPC: Is Estimated <LISTING last_epc__is_estimated>': self.EPC_API_DATA_NAMES["estimated"],
'Last EPC: EPC Rating <LISTING last_epc__epc_rating>': self.EPC_API_DATA_NAMES["current-energy-rating"],
'Last EPC: SAP Rating <LISTING last_epc__sap_rating>': self.EPC_API_DATA_NAMES["current-energy-efficiency"],
'Last EPC: Main Heating Description <LISTING last_epc__main_heating_description>': self.EPC_API_DATA_NAMES[
"mainheat-description"],
'Last EPC: Heating Controls <LISTING last_epc__heating_controls>': self.EPC_API_DATA_NAMES[
"mainheatcont-description"],
'Last EPC: Lodgement Date <LISTING last_epc__lodgement_date>': self.EPC_API_DATA_NAMES["inspection-date"],
'Last EPC: Floor Area <LISTING last_epc__floor_area>': self.EPC_API_DATA_NAMES["total-floor-area"],
'Last EPC: Wall <LISTING last_epc__wall>': self.EPC_API_DATA_NAMES["walls-description"],
'Last EPC: Roof <LISTING last_epc__roof>': self.EPC_API_DATA_NAMES["roof-description"],
'Last EPC: Floor <LISTING last_epc__floor>': self.EPC_API_DATA_NAMES["floor-description"],
'Last EPC: Room Height <LISTING last_epc__room_height>': self.EPC_API_DATA_NAMES["floor-height"],
'Last EPC: Age Band <LISTING last_epc__age_band>': self.EPC_API_DATA_NAMES["construction-age-band"],
'Deal Stage <DEAL dealstage>': 'Deal Stage <DEAL dealstage>',
'Pipeline <DEAL pipeline>': 'Pipeline <DEAL pipeline>',
'Expected Commencement Date <DEAL expected_commencement_date>': None, # TODO: Need to set this,
'Deal Name <DEAL dealname>': "dealname", # Need to create this,
'Product ID <LINE_ITEM hs_product_id>': 'Product ID <LINE_ITEM hs_product_id>',
'Name <LINE_ITEM name>': 'Name <LINE_ITEM name>',
'Unit price <LINE_ITEM price>': 'Unit price <LINE_ITEM price>',
'Quantity <LINE_ITEM quantity>': 'Quantity <LINE_ITEM quantity>',
'Deal Owner': 'surveyor_email',
'Amount <DEAL amount>': 'Unit price <LINE_ITEM price>',
}
# We now create the finalised dataset to be uploaded into Hubspot
variables_required = list(schema_mappings.values())
variables_required = [v for v in variables_required if v is not None]
# We now flag anything that has a none value, which is information we haven't got right now
none_variables = [k for k, v in schema_mappings.items() if v is None]
# We'll add placeholder columns for the None variables
programme_data = programme_data[variables_required]
for col in none_variables:
programme_data[col] = None
programme_data = programme_data.rename(
columns={v: k for k, v in schema_mappings.items() if v is not None}
)
self.hubspot_data = programme_data
def flag_ecosurv(self, ecosurv_landlords=None, landlords_to_ignore=None):
"""
This class will match ecosurv data to the asset list
:return:
"""
if ecosurv_landlords is None:
return
# TODO: Fetch from Sharepoint
ecosurv_filepath = "/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/07.05.2025.csv"
logger.info("Getting Ecosurv data from %s", ecosurv_filepath)
self.ecosurv = pd.read_csv(ecosurv_filepath, encoding="cp437")
landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False)
landlord_references = landlords[
landlords["Landlord"].str.lower().str.contains(ecosurv_landlords)
]
landlord_ecosurv_data = self.ecosurv[
self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values)
]
if landlords_to_ignore is not None:
landlord_ecosurv_data = landlord_ecosurv_data[
~landlord_ecosurv_data["Landlord"].isin(landlords_to_ignore)
]
# Try and match to asset list
matched = []
unmatched = []
for _, row in tqdm(landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0]):
postcode = row["Postcode"].lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE].str.replace(" ", "").str.lower() ==
postcode
)
].copy()
if df.empty:
unmatched.append(row["Reference"])
continue
if df.shape[0] > 1:
house_no = SearchEpc.get_house_number(row["Address Line 1"], row["Postcode"])
df["house_no"] = df.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE]
),
axis=1
)
df = df[df["house_no"] == house_no]
if df.shape[0] > 1:
# We compare address line 1 to full address
if any(
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
row["Address Line 1"].lower(), na=False)
):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
row["Address Line 1"].lower(), na=False
)
]
if df.shape[0] > 1:
df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"]
if df.shape[0] == 1:
matched.append(
{
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
"ecosurv_reference": row["Reference"],
"ecosurv_address1": row["Address Line 1"],
"ecosurv_postcode": row["Postcode"],
}
)
continue
if df.shape[0] > 1:
unmatched.append(row["Reference"])
continue
logger.info("Matched %s properties to ecosurv data", len(matched))
logger.info("%s properties in Ecosurv remain unmatched", len(unmatched))
# We now match
matched = pd.DataFrame(matched)
# We'll possibly have duplicates here, where properties have been sold twice. Ww de-dupe
if matched[self.STANDARD_LANDLORD_PROPERTY_ID].duplicated().sum():
# It doesn't matter too much which record we take
matched = matched.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
self.standardised_asset_list = self.standardised_asset_list.merge(
matched,
how="left",
on=self.STANDARD_LANDLORD_PROPERTY_ID,
)
# We keep a record of submissions that were NOT matches
self.ecosurv_no_match = self.ecosurv[
self.ecosurv["Reference"].isin(unmatched)
].copy()
def flag_outcomes(
self,
outcomes_filepaths,
outcomes_sheetname,
outcomes_address,
outcomes_postcode,
outcomes_houseno,
outcomes_id
):
if not outcomes_filepaths:
return
self.outcomes = []
outcomes_no_match = []
lookup = []
for idx, outcomes_filepath in enumerate(outcomes_filepaths):
outcomes = pd.read_excel(outcomes_filepath, sheet_name=outcomes_sheetname[idx])
outcomes["row_id"] = outcomes.index
if outcomes_houseno[idx] is None:
outcomes_houseno = "houseno"
outcomes["houseno"] = outcomes[outcomes_address[idx]].apply(
lambda x: SearchEpc.get_house_number(x, outcomes[outcomes_postcode])
)
# We handle an edge case that occured for LHP
if "Notes / Outcomes" in outcomes.columns and "Outcome" not in outcomes.columns:
# We use the re-mapper to handle this:
outcomes["Notes / Outcomes"] = outcomes["Notes / Outcomes"].str.strip()
values_to_remap = outcomes["Notes / Outcomes"].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(
standard_values=outcomes_mappings.outcomes_values, standard_map=outcomes_mappings.outcomes_map
)
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
# Perform the remap
outcomes["Outcome"] = outcomes["Notes / Outcomes"].map(remap_dictionary)
outcomes["Outcome"] = outcomes["Outcome"].str.lower()
logger.info("Matching outcomes to asset list")
# Merge the outcomes onto the asset list - we check we're able to match sufficiently well
lookup_i = []
nomatch_i = []
for _, x in tqdm(outcomes.iterrows(), total=len(outcomes)):
if pd.isnull(x[outcomes_address[idx]]) or not x[outcomes_address[idx]]:
continue
# Check if we have an id
oid = x[outcomes_id[idx]] if outcomes_id[idx] is not None else None
if oid is not None:
matched = self.standardised_asset_list[
(self.standardised_asset_list[
self.STANDARD_LANDLORD_PROPERTY_ID
].str.strip() == oid)
]
if matched.shape[0] == 1:
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
address_clean = x[outcomes_address[idx]].lower().replace(",", "").replace(" ", " ")
matched = self.standardised_asset_list[
(self.standardised_asset_list[
self.STANDARD_FULL_ADDRESS
].str.lower().str.replace(",", "").str.replace(" ", " ") == address_clean)
]
if matched.shape[0] == 1:
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
matched = self.standardised_asset_list[
(self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip() == x[outcomes_postcode[idx]])
].copy()
if not matched.empty:
matched["houseno"] = matched.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
),
axis=1
)
if pd.isnull(x[outcomes_houseno[idx]]):
house_no_to_match = SearchEpc.get_house_number(
str(x[outcomes_address[idx]]), str(x[outcomes_postcode[idx]])
)
if isinstance(house_no_to_match, str):
house_no_to_match = house_no_to_match.lower()
else:
house_no_to_match = str(x[outcomes_houseno[idx]]).strip()
matched = matched[matched["houseno"].astype(str) == house_no_to_match]
if matched.shape[0] == 1:
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
elif not matched.empty:
# Use levenstein distance to match
matched["address"] = (
matched[self.STANDARD_ADDRESS_1] + " " + matched[self.STANDARD_POSTCODE]
)
best_match = process.extractOne(
x[outcomes_address[idx]], matched[self.STANDARD_FULL_ADDRESS].values
)[0]
matched = matched[matched[self.STANDARD_FULL_ADDRESS] == best_match]
lookup_i.append(
{
"row_id": x["row_id"],
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
}
)
continue
nomatch_i.append(x["row_id"])
outcomes_no_match_i = outcomes[outcomes["row_id"].isin(nomatch_i)]
lookup_i = pd.DataFrame(lookup_i)
outcomes_no_match.append(outcomes_no_match_i)
lookup.append(lookup_i)
self.outcomes.append(outcomes)
lookup = pd.concat(lookup)
self.outcomes_no_match = pd.concat(outcomes_no_match)
self.outcomes = pd.concat(self.outcomes)
if lookup.empty:
return
# We will have duplicated domna property IDs, where a surveyor has been to a property multiple times
# Where we have multiple rows, we want to make a call on what the action should be. For example,
# there may be properties that have been visited multiple times where the outcome was "See notes" implying
# that the surveyor had a detailed explanation as to why they couldn't gain access so if this has
# happened multiple times, in this case we judge that the work may not be viable
if "Week Commencing" in self.outcomes.columns:
date_col = "Week Commencing"
elif "Survey Date" in self.outcomes.columns:
date_col = "Survey Date"
elif "Date letters sent" in self.outcomes.columns:
date_col = "Date letters sent"
elif "Date Letter sent" in self.outcomes.columns:
date_col = "Date Letter sent"
else:
raise NotImplementedError("Invalid date in outcomes - implement me")
notes_col = "Notes" if "Notes" in outcomes.columns else "Notes / Outcomes"
lookup = lookup.merge(
self.outcomes[["row_id", "Outcome", notes_col, date_col]], how="left", on="row_id"
)
visit_counts = (
lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"]
.count()
.reset_index()
.rename(columns={"row_id": "visit_count"})
.sort_values("visit_count", ascending=False)
)
def extract_date(s):
if isinstance(s, str):
match = re.search(r"(\d{2}\.\d{2}\.\d{4})", s)
if match:
return pd.to_datetime(match.group(1), format="%d.%m.%Y", errors="coerce")
return pd.NaT
lookup['parsed_date'] = lookup[date_col].apply(extract_date)
def get_latest_note(group):
surveyed = group[group['Outcome'] == 'surveyed']
if not surveyed.empty:
return surveyed.sort_values('parsed_date', ascending=False).iloc[0]
else:
return group.sort_values('parsed_date', ascending=False).iloc[0]
latest_note = (
lookup.groupby('domna_property_id', group_keys=False).
apply(get_latest_note).
reset_index(drop=True)
)
latest_note = latest_note[["domna_property_id", notes_col]]
pivot_df = lookup.groupby(["domna_property_id", "Outcome"]).size().unstack(fill_value=0).reset_index()
pivot_df = pivot_df.merge(
visit_counts, how="left", on="domna_property_id"
)
# We want the latest note
if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise Exception("We have duplicated property IDs in the outcomes data")
# We merge this data onto outcomes
self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin(lookup["row_id"].values)
self.outcomes = self.outcomes.merge(lookup[["row_id", "domna_property_id"]], how="left", on="row_id")
# We merge out pivoted outcomes onto the asset list
self.standardised_asset_list = self.standardised_asset_list.merge(
pivot_df, how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id"
)
# Merge the latest note
self.standardised_asset_list = self.standardised_asset_list.merge(
latest_note.rename(columns={notes_col: "Latest Route March Note"}),
how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id"
)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError("Duplicates appreared - something went wrong")
self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False)
def flag_survey_master(
self,
master_filepaths,
master_to_asset_list_filepath=None
):
# TODO: This probably needs further expansion
if not master_filepaths:
return
if master_to_asset_list_filepath is not None:
id_map = pd.read_csv(master_to_asset_list_filepath)
else:
id_map = pd.DataFrame()
logger.info("Getting masters and merging onto asset list")
master_surveyed = []
unmatched_submissions = []
for filepath in master_filepaths:
master_data = pd.read_csv(filepath)
# Strip columns
master_data.columns = [c.strip() for c in master_data.columns]
master_data.columns = [re.sub(r'\s+', ' ', c) for c in master_data.columns]
# Drop any unnamed columns
unnamed_columns = [c for c in master_data.columns if "Unnamed:" in c]
master_data = master_data.drop(columns=unnamed_columns)
if not id_map.empty:
master_data = master_data.merge(
id_map, how="left", on=['NO.', 'Street / Block Name', 'Post Code']
)
if "INSTALLED OR CANCELLED" in master_data.columns:
install_col = "INSTALLED OR CANCELLED"
elif "INSTALL / CANCELLATION DATE" in master_data.columns:
install_col = "INSTALL / CANCELLATION DATE"
elif 'INSTALL/ CANCELLATION DATE' in master_data.columns:
install_col = 'INSTALL/ CANCELLATION DATE'
else:
raise ValueError("No install or cancellation date")
submission_col = (
"SUBMISSION DATE" if "SUBMISSION DATE" in master_data.columns else "SUBMISSION DATE TO INSTALLERS"
)
# if "UPRN" in master_data.columns:
# # We just need to check if any were cancelled
# master_to_append = master_data[
# ["UPRN", install_col, submission_col]
# ].rename(
# columns={
# "UPRN": self.STANDARD_LANDLORD_PROPERTY_ID,
# install_col: "survey_status",
# submission_col: "submission_date"
# }
# )
# master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel")
#
# master_surveyed.append(master_to_append)
# continue
master_data["row_id"] = master_data.index
self.standardised_asset_list["house_no"] = self.standardised_asset_list.apply(
lambda x: SearchEpc.get_house_number(
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
),
axis=1
)
scheme_col = (
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" if
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns else "AFFORDABLE WARMTH"
)
postcode_col = "POSTCODE" if "POSTCODE" in master_data.columns else "Post Code"
house_no_col = 'NO.' if 'NO.' in master_data.columns else "NO"
property_type_col = (
"PROPERTY TYPE As per table emailed" if
"PROPERTY TYPE As per table emailed" in
master_data.columns else "PROPERTY TYPE As per table emailed"
)
measure_mix_col = "MEASURE COMBO"
# Otherwise, we need to match algorithmically
has_property_id = "UPRN" in master_data.columns
logger.info("Matching master data to asset list")
matched = []
unmatched = []
for _, row in tqdm(master_data.iterrows(), total=len(master_data)):
original_house_no = row[house_no_col]
original_street = row["Street / Block Name"]
original_postcode = row[postcode_col]
if pd.isnull(row[postcode_col]):
continue
# if has_property_id:
# submission_uprn = row["UPRN"]
#
# if not pd.isnull(submission_uprn):
# df = self.standardised_asset_list[
# self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] == submission_uprn
# ]
postcode_no_space = row[postcode_col].strip().replace(" ", "").lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE]
.str.strip().str.lower().str.replace(" ", "") == postcode_no_space
)
]
house_no = row[house_no_col]
if isinstance(house_no, (float, int)):
house_no = str(int(house_no))
if house_no not in df["house_no"].values:
# Handle postcode errors
postal_region = row[postcode_col].split(" ")[0].lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE]
.str.strip().str.lower().str.startswith(postal_region)
)
]
if house_no not in df["house_no"].values:
unmatched.append(row["row_id"])
continue
df = df[df["house_no"] == house_no]
if df.shape[0] > 1:
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(row["Street / Block Name"].lower())
]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
continue
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
if house_no in df["house_no"].values:
df = df[df["house_no"] == house_no]
if df.shape[0] != 1:
# Levenstein distance
if any(df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])
]
else:
# Levenstein distance
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().apply(
lambda x: process.extractOne(
" ".join([row[house_no_col], row["Street / Block Name"], row["TOWN"]]).lower(),
x
)[1]
) > 90
]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
continue
if any(df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
" ".join([row[house_no_col], row["Street / Block Name"]]).lower()
)):
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
" ".join([row[house_no_col], row["Street / Block Name"]]).lower()
)
]
if any(
df[self.STANDARD_PROPERTY_TYPE].str.contains(row[property_type_col].split(" ")[-1].lower())
):
# We ignore "block of flats" entries
df = df[
df[self.STANDARD_PROPERTY_TYPE].str.contains(
row[property_type_col].split(" ")[-1].lower()
) & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
]
if df.shape[0] != 1:
# We have multiple matches
raise NotImplementedError("FIX ME")
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
self.standardised_asset_list = self.standardised_asset_list.drop(columns="house_no")
# We match the "UPRN" which is the landlords ID, onto the master sheet
matched = pd.DataFrame(matched)
master_to_append = master_data[[scheme_col, "row_id", install_col, submission_col, measure_mix_col]].merge(
matched, how="left", on="row_id"
).rename(
columns={
scheme_col: "funding_scheme",
measure_mix_col: "measure_mix",
install_col: "survey_status",
submission_col: "submission_date"
}
)
master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel")
master_surveyed.append(master_to_append)
unmatched_df = master_data[
master_data["row_id"].isin(unmatched)
]
# The columns are massively different - we take just a few
unmatched_df = unmatched_df[
[
scheme_col, house_no_col, "Street / Block Name", postcode_col, install_col, submission_col
]
].rename(
columns={
scheme_col: "Funding Scheme",
house_no_col: "House Number",
postcode_col: "Postcode",
install_col: "survey_status",
submission_col: "submission_date"
}
)
unmatched_submissions.append(unmatched_df)
master_surveyed = pd.concat(master_surveyed)
master_surveyed = master_surveyed[~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID])]
master_surveyed = master_surveyed[
~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin(
["NOT ON ASSET LIST", "Missing From Asset List"]
)
]
master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[
self.STANDARD_LANDLORD_PROPERTY_ID
].astype(str)
# We de-dupe crudely on landlord property id
self.master_surveyed = master_surveyed.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
self.standardised_asset_list = self.standardised_asset_list.merge(
self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
)
# Finally, we keep a record of the unmatched
if unmatched_submissions:
self.unmatched_submissions = pd.concat(
unmatched_submissions
)