mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
2822 lines
128 KiB
Python
2822 lines
128 KiB
Python
import hashlib
|
||
import os
|
||
import re
|
||
import tiktoken
|
||
from pprint import pprint
|
||
from datetime import datetime
|
||
|
||
from numpy.ma.core import masked_not_equal
|
||
from openai import OpenAI
|
||
import numpy as np
|
||
import pandas as pd
|
||
from tqdm import tqdm
|
||
from fuzzywuzzy import process
|
||
from utils.logger import setup_logger
|
||
from backend.SearchEpc import SearchEpc
|
||
from BaseUtility import Definitions
|
||
import asset_list.mappings.property_type as property_type_mappings
|
||
import asset_list.mappings.walls as walls_mappings
|
||
import asset_list.mappings.heating_systems as heating_mappings
|
||
import asset_list.mappings.exising_pv as existing_pv_mappings
|
||
import asset_list.mappings.built_form as built_form_mappings
|
||
import asset_list.mappings.roof as roof_mappings
|
||
import asset_list.mappings.outcomes as outcomes_mappings
|
||
|
||
from recommendations.recommendation_utils import (
|
||
estimate_perimeter,
|
||
estimate_external_wall_area,
|
||
estimate_number_of_floors
|
||
)
|
||
|
||
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
|
||
|
||
logger = setup_logger()
|
||
|
||
# OpenAI API Key (set this in your environment variables for security)
|
||
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
||
|
||
|
||
class DataRemapper:
|
||
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
|
||
"""
|
||
Initialize the remapper with standard values and a predefined mapping.
|
||
|
||
:param standard_values: Set of allowed standardized values.
|
||
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
|
||
"""
|
||
self.standard_values = standard_values
|
||
self.standard_map = standard_map
|
||
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
|
||
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
|
||
|
||
# Tokenizer for counting tokens
|
||
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
|
||
|
||
# Track token usage and remap dictionary
|
||
self.total_tokens_used = 0
|
||
self.total_cost = 0
|
||
self.remap_dict = {} # {original_value: standardized_value}
|
||
self.max_tokens = max_tokens # Limit for OpenAI API
|
||
|
||
# Memoization for AI calls
|
||
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
|
||
# Capture the reponse for debugging
|
||
self.ai_response = None
|
||
|
||
# OpenAI pricing (as of Feb 2024)
|
||
self.pricing = {
|
||
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
|
||
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
|
||
}
|
||
|
||
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
|
||
|
||
@staticmethod
|
||
def clean_string(text):
|
||
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
|
||
if not isinstance(text, str):
|
||
return None
|
||
text = text.strip().lower()
|
||
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
|
||
# Replace double strings
|
||
text = re.sub(r'\s+', ' ', text)
|
||
return text
|
||
|
||
def fuzzy_match(self, text):
|
||
"""Use fuzzy matching to find the closest standard value."""
|
||
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
|
||
return match if score >= self.fuzzy_threshold else None
|
||
|
||
def count_tokens(self, text):
|
||
"""Estimate the number of tokens in a given text."""
|
||
return len(self.tokenizer.encode(text)) if text else 0
|
||
|
||
def ai_standardize(self, unmapped_values):
|
||
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
|
||
if not unmapped_values:
|
||
return {}
|
||
|
||
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
|
||
if unmapped_tuple in self.ai_cache:
|
||
return self.ai_cache[unmapped_tuple] # Return memoized result
|
||
|
||
prompt = f"""
|
||
You are an expert in data classification. Standardize each of these values into one of the categories:
|
||
{list(self.standard_values)}.
|
||
|
||
Return only a JSON dictionary where:
|
||
- The keys are the original values.
|
||
- The values are the standardized ones.
|
||
|
||
Strictly return JSON **without markdown formatting** or extra text.
|
||
|
||
Example Output:
|
||
{{
|
||
"BLKHOUS": "block house",
|
||
"BEDSIT": "bedsit"
|
||
}}
|
||
|
||
Values to standardize:
|
||
{unmapped_values}
|
||
"""
|
||
|
||
# Count input tokens
|
||
input_tokens = self.count_tokens(prompt)
|
||
if input_tokens > self.max_tokens:
|
||
raise ValueError("Input tokens exceed the maximum limit.")
|
||
|
||
logger.info("Calling OpenAI API for standardization...")
|
||
response = self.openai_client.chat.completions.create(
|
||
model=self.ai_model,
|
||
messages=[{"role": "user", "content": prompt}],
|
||
max_tokens=self.max_tokens,
|
||
temperature=0.1,
|
||
)
|
||
|
||
output_text = response.choices[0].message.content.strip()
|
||
output_tokens = self.count_tokens(output_text) # Count output tokens
|
||
|
||
# Track total token usage
|
||
self.total_tokens_used += input_tokens + output_tokens
|
||
|
||
# Estimate cost
|
||
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
|
||
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
|
||
self.total_cost += input_cost + output_cost
|
||
|
||
try:
|
||
# Parse response as dictionary
|
||
mapping = eval(output_text) # OpenAI should return a valid dictionary
|
||
except:
|
||
mapping = {val: "unknown" for val in unmapped_values} # Fallback
|
||
|
||
# Memoize the AI response
|
||
self.ai_cache[unmapped_tuple] = mapping
|
||
# We store the raw AI response for debugging
|
||
logger.debug(f"AI Response: {mapping}")
|
||
self.ai_response = output_text
|
||
|
||
return mapping
|
||
|
||
def standardize_list(self, values_to_remap):
|
||
"""
|
||
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
|
||
|
||
:param values_to_remap: List of raw values to standardize.
|
||
:return: Dictionary {original_value: standardized_value}.
|
||
"""
|
||
unique_values = set(values_to_remap) # Process only unique values
|
||
|
||
unmapped_values = []
|
||
for value in unique_values:
|
||
if pd.isna(value): # Handle NaN values
|
||
self.remap_dict[value] = "unknown"
|
||
continue
|
||
|
||
cleaned_value = self.clean_string(value)
|
||
|
||
# Rule-Based Check (Predefined Mapping)
|
||
if cleaned_value in self.standard_map or value in self.standard_map:
|
||
self.remap_dict[value] = (
|
||
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
|
||
)
|
||
continue
|
||
|
||
if value.lower() in self.standard_map:
|
||
self.remap_dict[value] = self.standard_map[value.lower()]
|
||
continue
|
||
|
||
# Exact Match in Standard Values
|
||
if cleaned_value in self.standard_values:
|
||
self.remap_dict[value] = cleaned_value
|
||
continue
|
||
|
||
# Fuzzy Matching
|
||
fuzzy_match = self.fuzzy_match(cleaned_value)
|
||
if fuzzy_match:
|
||
self.remap_dict[value] = fuzzy_match
|
||
continue
|
||
|
||
# Capture anything that wasn't mapped
|
||
unmapped_values.append(value)
|
||
|
||
# AI Model - remap anything unmapped (batch request)
|
||
ai_mapping = self.ai_standardize(unmapped_values)
|
||
self.remap_dict.update(ai_mapping)
|
||
|
||
return self.remap_dict
|
||
|
||
def report_usage(self):
|
||
"""Prints a summary of token usage and cost."""
|
||
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
|
||
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
|
||
|
||
|
||
class AssetList:
|
||
"""
|
||
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
|
||
"""
|
||
|
||
EPC_API_DATA_NAMES = {
|
||
"uprn": "epc_os_uprn",
|
||
"address1": "epc_address1",
|
||
"address": "epc_address",
|
||
"postcode": "epc_postcode",
|
||
"inspection-date": "epc_inspection_date",
|
||
"current-energy-efficiency": "epc_sap_score_on_register",
|
||
"current-energy-rating": "epc_rating_on_register",
|
||
"property-type": "epc_property_type",
|
||
"built-form": "epc_archetype",
|
||
"total-floor-area": "epc_total_floor_area",
|
||
"construction-age-band": "epc_age_band",
|
||
"floor-height": "epc_floor_height",
|
||
"number-habitable-rooms": "epc_number_habitable_rooms",
|
||
"walls-description": "epc_wall_construction",
|
||
"roof-description": "epc_roof_construction",
|
||
"floor-description": "epc_floor_construction",
|
||
"mainheat-description": "epc_heating_type",
|
||
'mainheatcont-description': "epc_heating_controls",
|
||
"secondheat-description": "epc_secondary_heating",
|
||
"transaction-type": "epc_reason",
|
||
"energy-consumption-current": "epc_heat_demand",
|
||
"photo-supply": "epc_photo_supply",
|
||
"estimated": "estimated"
|
||
}
|
||
FIND_EPC_DATA_NAMES = {
|
||
"heating_text": "epc_estiamted_heating_kwh",
|
||
"hot_water_text": "epc_estimated_hotwater_kwh",
|
||
'Assessor’s name': "epc_assessor_name",
|
||
"Assessor's Telephone": "epc_assessor_telephone",
|
||
"Assessor's Email": "epc_assessor_email",
|
||
"Accreditation scheme": "epc_assessor_accreditation",
|
||
"Assessor’s ID": "epc_assessor_id",
|
||
"Solar photovoltaics": "epc_solar_pv"
|
||
}
|
||
|
||
DATETIME_REMAP = {
|
||
"Pre 1900": datetime(year=1899, month=12, day=31),
|
||
}
|
||
|
||
# These are the accepted methods we have for cleaning the address1 column
|
||
ADDRESS_1_CLEANING_METHODS = [
|
||
"first_two_words", # This method will split on the fist two words, where the separator is a space
|
||
"first_word", # This method will split on the first word, where the separator is a space
|
||
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
|
||
# "address1_extraction" # This method will use the NLP model to extract address1
|
||
]
|
||
|
||
# Standard column Names
|
||
STANDARD_ADDRESS_1 = "domna_address_1"
|
||
STANDARD_POSTCODE = "domna_postcode"
|
||
STANDARD_FULL_ADDRESS = "domna_full_address"
|
||
STANDARD_YEAR_BUILT = "landlord_year_built"
|
||
STANDARD_UPRN = "ordnance_survey_uprn"
|
||
STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id"
|
||
STANDARD_PROPERTY_TYPE = "landlord_property_type"
|
||
STANDARD_BUILT_FORM = "landlord_built_form"
|
||
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
|
||
STANDARD_ROOF_CONSTRUCTION = "landlord_roof_construction"
|
||
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
|
||
STANDARD_EXISTING_PV = "landlord_existing_pv"
|
||
STANDARD_SAP = "landlord_sap_rating"
|
||
|
||
DOMNA_PROPERTY_ID = "domna_property_id"
|
||
|
||
# Regular expression for identifying if the address might point to multiple units
|
||
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
|
||
|
||
# List of columns relating to the non-intrusive data
|
||
NON_INTRUSIVES_COLNAMES = [
|
||
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
|
||
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
|
||
"Any further surveyor notes", 'Surveyors Name'
|
||
]
|
||
|
||
NON_INTRUSIVES_ELIGIBILITY_COLUMN = "Eligibility (Red/Yellow/Green)"
|
||
|
||
OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ['WFT Findings', 'ECO Eligibility']
|
||
|
||
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
|
||
FILLED_CAVITY_SAP_THRESHOLD = 75
|
||
# This SAP the
|
||
EMPTY_CAVITY_SAP_THRESHOLD = 75
|
||
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
|
||
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
|
||
|
||
# Properties before this year are more likely to have lower EPC ratings and more likely to qualify
|
||
EMPTY_CAVITY_YEAR_THRESHOLD = 2002
|
||
|
||
# Attributes - these are columns that we produce, calcualted based on other pieces of data
|
||
ATTRIBUTE_HAS_SOLAR = "attribute_has_solar"
|
||
ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors"
|
||
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
|
||
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
|
||
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
|
||
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
|
||
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}"
|
||
|
||
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
|
||
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
|
||
"cavity wall, as built, no insulation (assumed)",
|
||
"cavity wall, as built, partial insulation (assumed)",
|
||
"cavity wall, as built, partial insulation",
|
||
"cavity wall, as built, no insulation",
|
||
]
|
||
|
||
# List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated
|
||
EPC_INSULATED_WALLS_SUBSTRINGS = [
|
||
", insulated", "with external insulation", "with internal insulation", "filled cavity"
|
||
]
|
||
|
||
# List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated
|
||
EPC_INSULATED_ROOF_SUBSTRINGS = [
|
||
"(another dwelling above)", ", insulated", ", insulated (assumed) ",
|
||
", ceiling insulated",
|
||
]
|
||
|
||
# List of strings we look for in the EPC data, where substrings indicate that the cavity is empty
|
||
UNINSULATED_CAVITY_SUBSTRINGS = [
|
||
"cavity wall, as built, no insulation (assumed)",
|
||
"cavity wall, as built, no insulation",
|
||
"cavity wall, as built, partial insulation (assumed)",
|
||
"cavity wall, as built, partial insulation",
|
||
]
|
||
|
||
def __init__(
|
||
self,
|
||
local_filepath,
|
||
sheet_name,
|
||
address1_colname,
|
||
postcode_colname,
|
||
full_address_colname,
|
||
landlord_property_id=None,
|
||
full_address_cols_to_concat=None,
|
||
missing_postcodes_method=None,
|
||
address1_extraction_method=None,
|
||
landlord_year_built=None,
|
||
landlord_uprn=None,
|
||
landlord_property_type=None,
|
||
landlord_built_form=None,
|
||
landlord_wall_construction=None,
|
||
landlord_roof_construction=None,
|
||
landlord_heating_system=None,
|
||
landlord_existing_pv=None,
|
||
landlord_sap=None,
|
||
phase=False,
|
||
header=0
|
||
):
|
||
self.local_filepath = local_filepath
|
||
self.sheet_name = sheet_name
|
||
# Read in the data
|
||
if local_filepath.endswith(".xlsx"):
|
||
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
|
||
else:
|
||
self.raw_asset_list = pd.read_csv(local_filepath)
|
||
self.standardised_asset_list = self.raw_asset_list.copy()
|
||
# Will be used to store aggregated figures against the various work types
|
||
self.work_type_figures = {}
|
||
self.flat_data = None
|
||
self.duplicated_addresses = None
|
||
self.contact_details = None
|
||
self.contact_detail_fields = None
|
||
self.outcomes = None
|
||
self.outcomes_no_match = pd.DataFrame()
|
||
self.outcomes_for_output = pd.DataFrame()
|
||
self.master_surveyed = None
|
||
self.unmatched_submissions = pd.DataFrame()
|
||
self.ecosurv = None
|
||
self.ecosurv_no_match = pd.DataFrame()
|
||
|
||
# When this is True, we intend to break the programme into multiple phases. We may need to review
|
||
# how this is structured in the future, as depending on how we get future data, we may need to
|
||
# remove some existing phases from the reporting, or specifically highlight the phase (1 to n-1)
|
||
# properties, assuming the current phase is n.
|
||
self.phase = phase
|
||
|
||
# We detect the presence of the non-intrusive columns
|
||
self.non_intrusives_present = "CIGA Check Required" in self.raw_asset_list.columns
|
||
# We detect if we have the old format of non-intruvies
|
||
self.old_format_non_intrusives_present = "WFT Findings" in self.raw_asset_list.columns
|
||
|
||
self.non_intrusives_eligibility = "Eligibility (Red/Yellow/Green)" in self.raw_asset_list.columns
|
||
|
||
# Names of columns
|
||
self.landlord_property_id = landlord_property_id
|
||
self.address1_colname = address1_colname
|
||
self.postcode_colname = postcode_colname
|
||
self.full_address_colname = full_address_colname
|
||
self.landlord_year_built = landlord_year_built
|
||
self.landlord_uprn = landlord_uprn
|
||
self.landlord_property_type = landlord_property_type
|
||
self.landlord_built_form = landlord_built_form
|
||
self.landlord_wall_construction = landlord_wall_construction
|
||
self.landlord_roof_construction = landlord_roof_construction
|
||
self.landlord_heating_system = landlord_heating_system
|
||
self.landlord_existing_pv = landlord_existing_pv
|
||
self.landlord_sap = landlord_sap
|
||
|
||
# parameters for cleaning
|
||
self.full_address_cols_to_concat = full_address_cols_to_concat
|
||
self.missing_postcodes_method = missing_postcodes_method
|
||
self.address1_extraction_method = address1_extraction_method
|
||
|
||
self.debug_information = {
|
||
"property_type": None,
|
||
"wall_construction": None,
|
||
"heating_system": None,
|
||
"existing_pv": None
|
||
}
|
||
|
||
self.variable_mappings = {}
|
||
self.hubspot_data = None
|
||
|
||
self.rename_map = {}
|
||
self.keep_variables = []
|
||
|
||
# Finally, we handle the case where the landlord's property ID is actually the OS UPRN
|
||
if (self.landlord_uprn == self.landlord_property_id) and (self.landlord_property_id is not None):
|
||
self.standardised_asset_list[self.STANDARD_UPRN] = self.standardised_asset_list[self.landlord_uprn].copy()
|
||
# Update the reference to landlord UPRn
|
||
self.landlord_uprn = self.STANDARD_UPRN
|
||
|
||
# Handle the case when full address and address 1 are the same
|
||
if self.full_address_colname == self.address1_colname:
|
||
self.full_address_colname = self.STANDARD_FULL_ADDRESS
|
||
self.standardised_asset_list[self.full_address_colname] = (
|
||
self.standardised_asset_list[self.address1_colname].copy()
|
||
)
|
||
|
||
# Handle the case where the property type column and built form are missing
|
||
if self.landlord_property_type is None and self.landlord_built_form is None:
|
||
if "Archetype" in self.raw_asset_list.columns:
|
||
# We use the non-intrusives as our property type and built form
|
||
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_property_type] = (
|
||
self.standardised_asset_list["Archetype"].copy()
|
||
)
|
||
self.standardised_asset_list[self.landlord_built_form] = (
|
||
self.standardised_asset_list["Archetype"].copy()
|
||
)
|
||
else:
|
||
# We use the EPC data as our property type and built form
|
||
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_property_type] = None
|
||
self.standardised_asset_list[self.landlord_built_form] = None
|
||
|
||
# Handle the case where the property type column is the same as the built type
|
||
if self.landlord_property_type == self.landlord_built_form:
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_built_form] = (
|
||
self.standardised_asset_list[self.landlord_property_type].copy()
|
||
)
|
||
|
||
# If landlord built form is None (which it often is) we use the built for from inspections
|
||
if (self.landlord_built_form is None) and self.non_intrusives_present:
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_built_form] = (
|
||
self.standardised_asset_list["Archetype"].copy()
|
||
)
|
||
|
||
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
|
||
|
||
if method not in self.ADDRESS_1_CLEANING_METHODS:
|
||
raise ValueError(f"Method {method} for producing address1 not recognized")
|
||
|
||
if method == "first_two_words":
|
||
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
|
||
return asset_list
|
||
|
||
if method == "first_word":
|
||
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0]
|
||
return asset_list
|
||
|
||
if method == "house_number_extraction":
|
||
asset_list[self.address1_colname] = asset_list.apply(
|
||
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
|
||
axis=1
|
||
)
|
||
|
||
for _, x in asset_list.iterrows():
|
||
SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col])
|
||
return asset_list
|
||
|
||
raise ValueError(f"Method {method} not recognized")
|
||
|
||
@staticmethod
|
||
def _address1_extraction(x):
|
||
pass
|
||
|
||
def create_property_id(self):
|
||
"""
|
||
This function creates the domna property ID, which is simply a hash of the full address and postcode
|
||
We want all figures to be positive
|
||
:return:
|
||
"""
|
||
|
||
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
|
||
|
||
def _make_hash(value):
|
||
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
|
||
# Normalize and remove special characters for cleaner ID
|
||
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
|
||
|
||
# Generate SHA-256 hash and truncate it
|
||
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
|
||
|
||
return f"{cleaned_value}-{short_hash}"
|
||
|
||
# Apply transformation
|
||
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
|
||
self.standardised_asset_list[self.full_address_colname] +
|
||
self.standardised_asset_list[self.postcode_colname]
|
||
).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash)
|
||
|
||
@staticmethod
|
||
def _strip_postcode_from_full_address(full_address, postcode):
|
||
cleaned = full_address.replace(postcode, "")
|
||
# Remove any trailing commas and spaces
|
||
cleaned = cleaned.rstrip(", ").strip(",").strip()
|
||
return cleaned
|
||
|
||
@classmethod
|
||
def _identify_multi_address(cls, address):
|
||
# We check if the address is comma separated
|
||
if "," in address:
|
||
address1_section = address.split(",")[0]
|
||
# We look for string in the form (x-y)
|
||
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
|
||
|
||
@staticmethod
|
||
def _convert_uprn(x):
|
||
"""
|
||
Used to convert UPRNS to integer strings
|
||
:param x: uprn to convert
|
||
:return: converted uprn
|
||
"""
|
||
|
||
if pd.isnull(x):
|
||
return x
|
||
|
||
# check if numeric
|
||
if np.isreal(x):
|
||
return str(int(x))
|
||
|
||
if str(x).isdigit():
|
||
return str(int(x))
|
||
return x
|
||
|
||
@staticmethod
|
||
def _clean_postcode(postcode):
|
||
# Remove double spaces
|
||
postcode = postcode.replace(" ", " ")
|
||
if " " not in postcode:
|
||
# Restructure it
|
||
return " ".join(
|
||
[postcode[:-3], postcode[-3:]]
|
||
)
|
||
|
||
return postcode
|
||
|
||
def init_standardise(self):
|
||
"""
|
||
This function is used to standardise the asset list
|
||
:return: standardised asset list
|
||
"""
|
||
|
||
# Remove rows without a postcode
|
||
if self.postcode_colname is not None:
|
||
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
|
||
# We also clean postcode columns where if there is not space, we create one
|
||
self.standardised_asset_list[self.postcode_colname] = self.standardised_asset_list[
|
||
self.postcode_colname
|
||
].apply(self._clean_postcode)
|
||
|
||
# We clean up portential non-breaking spaces, and double spaces
|
||
for col in [
|
||
c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if
|
||
c is not None
|
||
]:
|
||
self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str)
|
||
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False)
|
||
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False)
|
||
|
||
if self.address1_colname is None:
|
||
if self.address1_extraction_method is None:
|
||
raise ValueError("Missing address 1 - please specify an extraction method")
|
||
self.address1_colname = self.STANDARD_ADDRESS_1
|
||
# If we do not have this, we produce it
|
||
self.standardised_asset_list = self._extract_address1(
|
||
asset_list=self.standardised_asset_list,
|
||
full_address_col=self.full_address_colname,
|
||
postcode_col=self.postcode_colname,
|
||
method=self.address1_extraction_method
|
||
)
|
||
|
||
if self.full_address_colname is None:
|
||
if not self.full_address_cols_to_concat:
|
||
raise ValueError("Missing full address - please specify columns to concatenate")
|
||
self.full_address_colname = self.STANDARD_FULL_ADDRESS
|
||
self.standardised_asset_list[self.full_address_colname] = (
|
||
self.standardised_asset_list[self.full_address_cols_to_concat].apply(
|
||
lambda x: ", ".join([y for y in x if not pd.isnull(y)]),
|
||
axis=1
|
||
)
|
||
)
|
||
else:
|
||
|
||
# Make sure to strip the postcode out of the full address
|
||
self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply(
|
||
lambda x: self._strip_postcode_from_full_address(
|
||
full_address=x[self.full_address_colname],
|
||
postcode=x[self.postcode_colname]
|
||
),
|
||
axis=1
|
||
)
|
||
|
||
# We create the domna property id
|
||
self.create_property_id()
|
||
|
||
# Clean up the UPRN column, if the landlord has provided them
|
||
if self.landlord_uprn is not None:
|
||
self.standardised_asset_list[self.landlord_uprn] = (
|
||
self.standardised_asset_list[self.landlord_uprn].apply(self._convert_uprn)
|
||
)
|
||
|
||
# We keep just the columns we care about and will work through the various columns and standardise
|
||
variables = [
|
||
self.landlord_property_id,
|
||
self.DOMNA_PROPERTY_ID,
|
||
self.address1_colname,
|
||
self.postcode_colname,
|
||
self.full_address_colname,
|
||
self.landlord_uprn,
|
||
self.landlord_property_type,
|
||
self.landlord_built_form,
|
||
self.landlord_year_built,
|
||
self.landlord_wall_construction,
|
||
self.landlord_roof_construction,
|
||
self.landlord_heating_system,
|
||
self.landlord_existing_pv,
|
||
self.landlord_sap,
|
||
]
|
||
# Keep just non-null variables (e.g landlord may not provide uprn
|
||
self.keep_variables = [v for v in variables if v is not None]
|
||
self.rename_map = {
|
||
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
self.address1_colname: self.STANDARD_ADDRESS_1,
|
||
self.postcode_colname: self.STANDARD_POSTCODE,
|
||
self.full_address_colname: self.STANDARD_FULL_ADDRESS,
|
||
self.landlord_uprn: self.STANDARD_UPRN,
|
||
self.landlord_property_type: self.STANDARD_PROPERTY_TYPE,
|
||
self.landlord_built_form: self.STANDARD_BUILT_FORM,
|
||
self.landlord_year_built: self.STANDARD_YEAR_BUILT,
|
||
self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION,
|
||
self.landlord_roof_construction: self.STANDARD_ROOF_CONSTRUCTION,
|
||
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
|
||
self.landlord_existing_pv: self.STANDARD_EXISTING_PV,
|
||
self.landlord_sap: self.STANDARD_SAP,
|
||
}
|
||
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
|
||
|
||
non_intrusive_columns = []
|
||
if self.non_intrusives_present:
|
||
non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES
|
||
|
||
if self.non_intrusives_eligibility:
|
||
non_intrusive_columns.append(self.NON_INTRUSIVES_ELIGIBILITY_COLUMN)
|
||
|
||
if self.old_format_non_intrusives_present:
|
||
# We check if we have the ECO Eligibility column, which we might not have
|
||
non_intrusive_columns = [
|
||
c for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES if c in self.standardised_asset_list.columns
|
||
]
|
||
|
||
if "Warmfront Finding" in self.standardised_asset_list.columns:
|
||
non_intrusive_columns.append("Warmfront Finding")
|
||
|
||
self.keep_variables += non_intrusive_columns
|
||
|
||
self.rename_map = {
|
||
**self.rename_map,
|
||
**dict(
|
||
zip(non_intrusive_columns, ["non-intrusives: " + c for c in non_intrusive_columns])
|
||
)
|
||
}
|
||
|
||
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
|
||
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
|
||
self.full_address_colname
|
||
].apply(lambda x: self._identify_multi_address(x))
|
||
|
||
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
|
||
# we see instances of "average thermal transmittance" in the description
|
||
if self.landlord_wall_construction is not None:
|
||
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
|
||
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
|
||
"average thermal transmittance"
|
||
) == True,
|
||
"new build - average thermal transmittance",
|
||
self.standardised_asset_list[self.landlord_wall_construction]
|
||
)
|
||
else:
|
||
# We want to make sure that we have a column for wall construction
|
||
self.landlord_wall_construction = self.STANDARD_WALL_CONSTRUCTION
|
||
self.standardised_asset_list[self.landlord_wall_construction] = None
|
||
|
||
if self.landlord_roof_construction is None:
|
||
self.landlord_roof_construction = self.STANDARD_ROOF_CONSTRUCTION
|
||
self.standardised_asset_list[self.landlord_roof_construction] = None
|
||
|
||
# Clear our build year column
|
||
# We attempt to process the year built column
|
||
if self.landlord_year_built is not None:
|
||
# We check if we have a datetime - year built has not been renamed
|
||
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
|
||
# We treat any string columns - with common values we see
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
|
||
)
|
||
|
||
no_data_codes = {"No Data": None}
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].replace(no_data_codes)
|
||
)
|
||
|
||
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
|
||
self.standardised_asset_list[self.landlord_year_built]
|
||
)
|
||
# Convert this to year
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].dt.year
|
||
)
|
||
else:
|
||
# We attempt to convert the year built to a datetime, by detecting the format and converting
|
||
|
||
def extract_year(date_str):
|
||
"""
|
||
Extracts the year from a date string in the format '01-Jul-YYYY'.
|
||
Returns the extracted year as an integer or None if the format is incorrect.
|
||
"""
|
||
known_errors = [
|
||
"#MULTIVALUE",
|
||
"This cell has an external reference that can't be shown or edited. Editing this cell will "
|
||
"remove the external reference.",
|
||
"ND",
|
||
'PIMSS EMPTY',
|
||
"UNKNOWN"
|
||
]
|
||
|
||
if pd.isnull(date_str) or date_str in known_errors or (date_str == 0):
|
||
return None
|
||
|
||
if isinstance(date_str, str):
|
||
match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str)
|
||
if match:
|
||
return int(match.group(1)) # Extract the year and convert to integer
|
||
if "-" in date_str:
|
||
|
||
# Count the number of times we have "-", as we've seen double ranges
|
||
# (when we have extensions) so the format is like this:
|
||
# 'G: 1983-1990, H: 1991-1995'
|
||
if date_str.count("-") == 2:
|
||
# We have a range
|
||
return int(date_str.split("-")[1].split(",")[0])
|
||
# We probably have a range
|
||
return int(date_str.split("-")[1].strip())
|
||
|
||
if isinstance(date_str, datetime):
|
||
return date_str.year
|
||
|
||
if isinstance(date_str, float):
|
||
if str(int(date_str)).isdigit() & (len(str(int(date_str))) == 4):
|
||
return int(date_str)
|
||
|
||
# Check if date_str is a year itself
|
||
if str(date_str).isdigit() & (len(str(date_str)) == 4):
|
||
return int(date_str)
|
||
|
||
# Remove any non-numeric characters
|
||
date_str = re.sub(r"\D", "", str(date_str))
|
||
if str(date_str).isdigit() & (len(str(date_str)) == 4):
|
||
return int(date_str)
|
||
|
||
raise NotImplementedError(f"Unhandled format for year built, value is {date_str} - implement me")
|
||
|
||
self.standardised_asset_list[self.landlord_year_built] = self.standardised_asset_list[
|
||
self.landlord_year_built
|
||
].apply(extract_year)
|
||
|
||
# We now create standard lookups
|
||
to_remap = {
|
||
self.landlord_property_type: {
|
||
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
|
||
"standard_map": property_type_mappings.PROPERTY_MAPPING
|
||
},
|
||
self.landlord_built_form: {
|
||
"standard_values": built_form_mappings.STANDARD_BUILT_FORMS,
|
||
"standard_map": built_form_mappings.BUILT_FORM_MAPPINGS
|
||
},
|
||
self.landlord_wall_construction: {
|
||
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
|
||
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
|
||
},
|
||
self.landlord_heating_system: {
|
||
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
|
||
"standard_map": heating_mappings.HEATING_MAPPINGS
|
||
},
|
||
self.landlord_existing_pv: {
|
||
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
|
||
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
|
||
},
|
||
self.landlord_roof_construction: {
|
||
"standard_values": roof_mappings.STANDARD_ROOF_CONSTRUCTIONS,
|
||
"standard_map": roof_mappings.ROOF_CONSTRUCTION_MAPPINGS
|
||
}
|
||
}
|
||
# Keep just entries where the key is not None
|
||
to_remap = {k: v for k, v in to_remap.items() if k is not None}
|
||
|
||
for variable, config in to_remap.items():
|
||
logger.info("Standardising variable: %s", variable)
|
||
# Strip each of these columns
|
||
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].str.strip()
|
||
values_to_remap = self.standardised_asset_list[variable].unique()
|
||
# We want to map this to our standardised list of property types we're interested in
|
||
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
|
||
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
|
||
self.variable_mappings[variable] = remap_dictionary
|
||
|
||
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
|
||
# asset list is returned
|
||
for variable, mapping in self.variable_mappings.items():
|
||
pprint(f"Variable: {variable}")
|
||
pprint(mapping)
|
||
# Print a space
|
||
print("\n")
|
||
pprint("=======================================")
|
||
|
||
def apply_standardiation(self, override_empty_mappings=False):
|
||
"""
|
||
This function applies the standardisation to the asset list
|
||
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
|
||
if there are no categories which need remapping which is highly unlikely
|
||
:return:
|
||
"""
|
||
|
||
if self.phase:
|
||
# We filter on just the properties that have had an inspection
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
~self.standardised_asset_list['Surveyors Name'].isin(["YET TO BE SURVEYED"])
|
||
]
|
||
|
||
if not self.variable_mappings and not override_empty_mappings:
|
||
raise ValueError("Please run init_standardise first")
|
||
|
||
logger.info("Applying standardisation to asset list")
|
||
|
||
for variable, mapping in self.variable_mappings.items():
|
||
self.standardised_asset_list[variable + "_original_from_landlord"] = (
|
||
self.standardised_asset_list[variable].copy()
|
||
)
|
||
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping)
|
||
|
||
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
# Drop the dupes
|
||
pprint(
|
||
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
|
||
f"addresses - dropping"
|
||
)
|
||
|
||
# Keep a record of duplicates
|
||
self.duplicated_addresses = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
|
||
][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy()
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
|
||
]
|
||
|
||
# Apply renames to our standard names
|
||
# Perform final variable selection and renaming:
|
||
|
||
# We add the original columns to the keep variables
|
||
self.keep_variables += [
|
||
k + "_original_from_landlord" for k in self.variable_mappings.keys()
|
||
]
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename(
|
||
columns=self.rename_map
|
||
)
|
||
|
||
# We fill any standard columns that are not in the data because they were not provided by the landlord
|
||
missing_variables = [
|
||
v for v in [
|
||
self.STANDARD_EXISTING_PV,
|
||
self.STANDARD_HEATING_SYSTEM,
|
||
self.STANDARD_UPRN,
|
||
self.STANDARD_PROPERTY_TYPE,
|
||
self.STANDARD_YEAR_BUILT,
|
||
self.STANDARD_WALL_CONSTRUCTION,
|
||
self.STANDARD_HEATING_SYSTEM,
|
||
self.STANDARD_EXISTING_PV
|
||
] if v not in self.standardised_asset_list.columns
|
||
]
|
||
for v in missing_variables:
|
||
self.standardised_asset_list[v] = None
|
||
|
||
# Convert to string
|
||
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = (
|
||
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str)
|
||
)
|
||
|
||
def merge_data(self, df: pd.DataFrame):
|
||
"""
|
||
Used to insert data into the standardised asset list, based on the domna property id
|
||
:return:
|
||
"""
|
||
if self.DOMNA_PROPERTY_ID not in df.columns:
|
||
raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}")
|
||
|
||
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
df = df.drop_duplicates(
|
||
subset=[self.DOMNA_PROPERTY_ID],
|
||
keep="first"
|
||
)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
df, how="left", on=self.DOMNA_PROPERTY_ID
|
||
)
|
||
|
||
def extract_attributes(self, pull_epc=True):
|
||
# Used to extracty the typical attributes that we use to identify viable work
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR] = (
|
||
self.standardised_asset_list[self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]] |
|
||
~self.standardised_asset_list[self.EPC_API_DATA_NAMES["photo-supply"]].isin(["0.0", 0, None, "", np.nan])
|
||
)
|
||
|
||
accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"]
|
||
|
||
# The logic here is:
|
||
# 1) Take the property type provided by the HA themselves
|
||
# 2) In absence of that, take the EPC property type
|
||
# 3) Otherwise use None
|
||
self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = self.standardised_asset_list.apply(
|
||
lambda x: estimate_number_of_floors(
|
||
property_type=(
|
||
str(x[self.STANDARD_PROPERTY_TYPE]).title() if
|
||
str(x[self.STANDARD_PROPERTY_TYPE]).title() in accepted_epc_property_types else (
|
||
x[self.EPC_API_DATA_NAMES["property-type"]] if not
|
||
pd.isnull(x[self.EPC_API_DATA_NAMES["property-type"]]) else None
|
||
)
|
||
)
|
||
),
|
||
axis=1
|
||
)
|
||
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].astype(float)
|
||
)
|
||
# Replace "" value with None
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].replace("", None)
|
||
)
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].astype(float)
|
||
)
|
||
|
||
# Estimate the perimeter
|
||
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply(
|
||
lambda x: estimate_perimeter(
|
||
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
|
||
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
|
||
), axis=1
|
||
)
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = self.standardised_asset_list.apply(
|
||
lambda x: estimate_external_wall_area(
|
||
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
|
||
floor_height=(
|
||
float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if
|
||
x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5
|
||
),
|
||
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
|
||
built_form=x[self.EPC_API_DATA_NAMES["built-form"]]
|
||
),
|
||
axis=1
|
||
)
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = self.standardised_asset_list.apply(
|
||
lambda x: RoofAttributes(description=x[self.EPC_API_DATA_NAMES["roof-description"]]).process()[
|
||
"insulation_thickness"] if not pd.isnull(
|
||
x[self.EPC_API_DATA_NAMES["roof-description"]]) else None,
|
||
axis=1
|
||
)
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = (
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].str.replace("+", "")
|
||
)
|
||
|
||
# We produce some additional fields
|
||
# 1) Is the SAP rating below C75
|
||
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]].astype(float) <=
|
||
self.FILLED_CAVITY_SAP_THRESHOLD
|
||
)
|
||
# 2) Flag anything where the EPC is older than 5 years
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
|
||
pd.to_datetime(
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
|
||
).dt.year < self.EPC_YEAR_THRESHOLD
|
||
)
|
||
|
||
self.process_age_band()
|
||
|
||
def process_age_band(self):
|
||
processed_age_band = []
|
||
for _, x in self.standardised_asset_list.iterrows():
|
||
|
||
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
|
||
x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES
|
||
):
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": None,
|
||
"epc_year_upper_bound": None,
|
||
"does_age_band_match_epc_age_band": "No EPC Age Band"
|
||
}
|
||
)
|
||
continue
|
||
|
||
# We exatract the upper and lower bounds
|
||
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
|
||
"England and Wales: 2007 onwards", "England and Wales: 2012 onwards"
|
||
]:
|
||
year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[
|
||
"construction-age-band"]] == "England and Wales: 2007 onwards" else 2012
|
||
|
||
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
|
||
else "EPC Age Band is older than Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": year_lower_bound,
|
||
"epc_year_upper_bound": None,
|
||
"does_age_band_match_epc_age_band": age_band_matches
|
||
}
|
||
)
|
||
continue
|
||
|
||
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900":
|
||
|
||
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900
|
||
else "EPC Age Band is newer than Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": None,
|
||
"epc_year_upper_bound": 1899,
|
||
"does_age_band_match_epc_age_band": age_band_matches
|
||
}
|
||
)
|
||
continue
|
||
|
||
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
|
||
|
||
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(
|
||
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
|
||
)
|
||
else "EPC Age Band is different from Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
|
||
"epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]),
|
||
"does_age_band_match_epc_age_band": age_band_matches
|
||
}
|
||
)
|
||
continue
|
||
|
||
# Oherwise, we extract the upper and lower bounds
|
||
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1]
|
||
lower_date, upper_date = age_band.split("-")
|
||
|
||
if not x[self.STANDARD_YEAR_BUILT]:
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (
|
||
x[self.STANDARD_YEAR_BUILT] <= float(upper_date)
|
||
)
|
||
else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
|
||
else "EPC Age Band is newer than Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": int(lower_date),
|
||
"epc_year_upper_bound": int(upper_date),
|
||
"does_age_band_match_epc_age_band": age_band_matches
|
||
}
|
||
)
|
||
|
||
processed_age_band = pd.DataFrame(processed_age_band)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
processed_age_band, how="left"
|
||
)
|
||
|
||
def identify_worktypes(self, cleaned):
|
||
|
||
if self.landlord_sap is not None:
|
||
# We add a SAP category for all work type identification
|
||
self.standardised_asset_list["SAP Category"] = np.where(
|
||
(
|
||
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54) |
|
||
(self.standardised_asset_list[self.STANDARD_SAP] <= 54)
|
||
),
|
||
"SAP Rating 54 or less",
|
||
np.where(
|
||
(
|
||
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68) |
|
||
(self.standardised_asset_list[self.STANDARD_SAP] <= 68)
|
||
),
|
||
"SAP Rating 55-68",
|
||
np.where(
|
||
(
|
||
(
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
|
||
self.EMPTY_CAVITY_SAP_THRESHOLD
|
||
) | (self.standardised_asset_list[self.STANDARD_SAP] <= self.EMPTY_CAVITY_SAP_THRESHOLD)
|
||
),
|
||
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
|
||
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
|
||
),
|
||
)
|
||
)
|
||
|
||
else:
|
||
# We add a SAP category for all work type identification
|
||
# We break into 4 categories (54 or less, 55-68, 69-74, 75 or more)
|
||
|
||
self.standardised_asset_list["SAP Category"] = np.where(
|
||
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54),
|
||
"SAP Rating 54 or less",
|
||
np.where(
|
||
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68),
|
||
"SAP Rating 55-68",
|
||
np.where(
|
||
(
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <=
|
||
self.EMPTY_CAVITY_SAP_THRESHOLD
|
||
),
|
||
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
|
||
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more"
|
||
),
|
||
)
|
||
)
|
||
|
||
# Before we being, we identify if a property has solar already as we use this
|
||
# for identifying cavity jobs
|
||
if self.non_intrusives_present:
|
||
existing_solar_non_intrusives_check = (
|
||
self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF"
|
||
)
|
||
elif self.old_format_non_intrusives_present:
|
||
existing_solar_non_intrusives_check = (
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
|
||
["solar pv on roof"]
|
||
)
|
||
)
|
||
else:
|
||
# We don't have an indication
|
||
existing_solar_non_intrusives_check = False
|
||
|
||
self.standardised_asset_list["property_has_solar"] = (
|
||
(self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") |
|
||
existing_solar_non_intrusives_check |
|
||
(self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
|
||
)
|
||
|
||
# If we have non-intrusives completed, we can use this to identify work types
|
||
######################################################
|
||
# Empty cavity:
|
||
######################################################
|
||
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
|
||
# 2) The age is before 1995
|
||
# 3) We don't remove anything that haas access issues yet
|
||
|
||
if self.non_intrusives_present:
|
||
non_intrusives_wall_filter = (
|
||
(self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") &
|
||
self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"])
|
||
)
|
||
elif self.old_format_non_intrusives_present:
|
||
non_intrusives_wall_filter = (
|
||
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
|
||
["empty cavity", "partial fill", "empty", "EMPTY CAVITY 70MM", "partial"]
|
||
) | (
|
||
(
|
||
self.standardised_asset_list['non-intrusives: WFT Findings']
|
||
.str.lower().str.strip().str.contains("empty cavity|partial fill|empty|partial") &
|
||
~self.standardised_asset_list['non-intrusives: WFT Findings']
|
||
.astype(str).str.lower().str.strip().str.contains("major access issues")
|
||
)
|
||
)
|
||
)
|
||
else:
|
||
# We set the filter to False, as we have no non-intrusives
|
||
non_intrusives_wall_filter = False
|
||
|
||
if self.landlord_year_built is None:
|
||
year_built_filter = self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
else:
|
||
year_built_filter = (
|
||
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) |
|
||
(self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD)
|
||
)
|
||
|
||
# Criteria:
|
||
# The property isn't a bedsit
|
||
# Non-intrusives indicate it needs a fill
|
||
# The EPC year is before 2002
|
||
# We also flag where the property has solar on the roof, because this is a signal of a high EPC rating
|
||
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
|
||
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
|
||
non_intrusives_wall_filter &
|
||
year_built_filter &
|
||
(
|
||
~self.standardised_asset_list["property_has_solar"]
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] = (
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
|
||
non_intrusives_wall_filter &
|
||
year_built_filter &
|
||
(
|
||
# If the property has solar, there's a chance it won't qualify
|
||
self.standardised_asset_list["property_has_solar"]
|
||
)
|
||
)
|
||
|
||
# We also add a filter on anything that was generally identified by the non-intrusives
|
||
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_year_filter"] = (
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] &
|
||
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
|
||
non_intrusives_wall_filter
|
||
)
|
||
|
||
if (not self.non_intrusives_eligibility) and (not self.old_format_non_intrusives_present):
|
||
# If we have NO inspections data, we capture all of the wall types and don't filter on age of the EPC
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
|
||
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
|
||
) & (
|
||
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
) & (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
|
||
)
|
||
)
|
||
else:
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
|
||
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
|
||
) & (
|
||
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
) & (
|
||
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
|
||
) & (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = (
|
||
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) &
|
||
(
|
||
(self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) |
|
||
(self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD)
|
||
) & (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
|
||
)
|
||
)
|
||
|
||
# Finally, we create a flag to indicate that the cavity is empty, based on the criteria above
|
||
self.standardised_asset_list["cavity_is_empty"] = (
|
||
non_intrusives_wall_filter |
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
|
||
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
|
||
) |
|
||
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"])
|
||
)
|
||
|
||
######################################################
|
||
# Extraction
|
||
######################################################
|
||
# as needing a CIGA check. What is the logic we should be applying here?
|
||
|
||
if self.non_intrusives_present:
|
||
|
||
extraction_wall_filter = (
|
||
(self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") &
|
||
(self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) &
|
||
(~self.standardised_asset_list['non-intrusives: Material'].isin(
|
||
["GREY LOOSE BEAD", "COMPACTED BEAD", "FIBRE BATT NO CAVITY", "EMPTY NARROW BELOW 30mm"]
|
||
))
|
||
)
|
||
|
||
if self.non_intrusives_eligibility:
|
||
# If we have the eligibility column, we check if the wall is eligible
|
||
extraction_wall_filter = (
|
||
extraction_wall_filter &
|
||
~self.standardised_asset_list["non-intrusives: Eligibility (Red/Yellow/Green)"].isin(
|
||
["RED"]
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
|
||
extraction_wall_filter & year_built_filter
|
||
)
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = (
|
||
extraction_wall_filter & ~year_built_filter
|
||
)
|
||
|
||
elif self.old_format_non_intrusives_present:
|
||
print("Review these categories!!!!")
|
||
extraction_wall_filter = (
|
||
self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
|
||
["retro drilled", "retro filled", "fibre from build", "polybead", "retro drilled and filled",
|
||
"retro drilled & filled", "blown in white wool", "blown in yellow wool"]
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
|
||
extraction_wall_filter
|
||
)
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
|
||
|
||
else:
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = False
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
|
||
|
||
######################################################
|
||
# Solar
|
||
######################################################
|
||
# Criteria:
|
||
# Check 1: Does the property have a valid heating system?
|
||
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = (
|
||
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
|
||
[
|
||
"air source heat pump",
|
||
"ground source heat pump",
|
||
"high heat retention storage heaters",
|
||
"electric boiler"
|
||
]
|
||
)
|
||
)
|
||
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] = (
|
||
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
|
||
["electric storage heaters", "room heaters", "electric radiators", "no heating", "electric fuel"]
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = (
|
||
(
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
|
||
.str.lower().str.contains("air source heat pump|ground source heat pump|boiler and radiators, electric")
|
||
) | (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
|
||
"electric storage heaters"
|
||
) & (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES[
|
||
"mainheatcont-description"]] == "Controls for high heat retention storage heaters"
|
||
)
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains(
|
||
"electric storage heaters|room heaters"
|
||
) & (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheatcont-description"]
|
||
] != "Controls for high heat retention storage heaters"
|
||
)
|
||
)
|
||
|
||
# Basic check - both of the previous two shouldn't be true simultaneously
|
||
if (
|
||
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] &
|
||
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
|
||
).sum():
|
||
raise ValueError("Both heating system checks are true - this should not be possible")
|
||
|
||
# Check 3: Does the property meet the fabric condition
|
||
# Solar PV installs are subject to the minimum insulation requirements which means:
|
||
# 1) one of the following insulation measures must be installed as part of the same
|
||
# ECO4 project:
|
||
# • roof insulation (flat roof, pitched roof, room-in-roof)
|
||
# • exterior facing wall insulation (cavity wall, solid wall)
|
||
# • party cavity wall insulation
|
||
# • floor insulation (solid and underfloor)
|
||
#
|
||
# OR
|
||
#
|
||
# all measures (except any exempted measure referred to in paragraph 4.28)
|
||
# listed in paragraph a) must already be installed
|
||
#
|
||
# With this in mind, we look for 2 clases
|
||
# 1) The property is fully insulated apart from the loft (<200mm insulation)
|
||
# 2) THe property is fully insulated
|
||
|
||
print("Should we include cavity properties where they might be uninsulated?")
|
||
self.standardised_asset_list["solar_landlord_walls_insulated"] = (
|
||
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
|
||
[
|
||
"filled cavity", "insulated solid brick", "insulated timber frame",
|
||
]
|
||
)
|
||
)
|
||
|
||
if self.non_intrusives_present:
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
|
||
self.standardised_asset_list["non-intrusives: Insulated"].isin(
|
||
["EWI", "RETRO DRILLED", "FILLED AT BUILD"]
|
||
)
|
||
)
|
||
elif self.old_format_non_intrusives_present:
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin(
|
||
[
|
||
"retro drilled", "retro filled", "ewi", "retro drilled/ solid", "retro drilled and filled",
|
||
]
|
||
) |
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().str.contains(
|
||
"retro drilled"
|
||
)
|
||
)
|
||
else:
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False
|
||
|
||
# We merge on the u-value for average thermal transmittance
|
||
walls_uvalue_data = pd.DataFrame(cleaned["walls-description"])
|
||
walls_uvalue_data = walls_uvalue_data[
|
||
~pd.isnull(walls_uvalue_data["thermal_transmittance"])
|
||
][["original_description", "thermal_transmittance"]].rename(
|
||
columns={
|
||
"original_description": self.EPC_API_DATA_NAMES["walls-description"],
|
||
"thermal_transmittance": "walls_u_value"
|
||
}
|
||
)
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
walls_uvalue_data, how="left", on=self.EPC_API_DATA_NAMES["walls-description"]
|
||
)
|
||
|
||
self.standardised_asset_list["solar_epc_walls_insulated"] = (
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["walls-description"]].str.lower().str.contains(
|
||
"|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS)
|
||
)
|
||
) | (
|
||
self.standardised_asset_list["walls_u_value"].apply(lambda x: x <= 0.7 if not pd.isnull(x) else False)
|
||
)
|
||
)
|
||
|
||
# We merge on the u-value for average thermal transmittance
|
||
roof_data = pd.DataFrame(cleaned["roof-description"])[
|
||
["original_description", "thermal_transmittance", "is_pitched", "is_loft"]
|
||
].rename(
|
||
columns={
|
||
"original_description": self.EPC_API_DATA_NAMES["roof-description"],
|
||
"thermal_transmittance": "roof_u_value",
|
||
}
|
||
)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
|
||
)
|
||
|
||
# If the u-value of a roof is less than 0.7 we consider it insulated
|
||
self.standardised_asset_list["solar_epc_roof_insulated"] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]].str.lower().str.contains(
|
||
"|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS),
|
||
) | (
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
|
||
lambda x: int(x) >= 200 if str(x).isdigit() else False
|
||
)
|
||
) | (
|
||
self.standardised_asset_list["roof_u_value"].apply(
|
||
lambda x: x <= 0.7 if not pd.isnull(x) else False
|
||
)
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["solar_epc_loft_needs_topup"] = (
|
||
self.standardised_asset_list[
|
||
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply(
|
||
lambda x: int(x) < 200 if str(x).isdigit() else False
|
||
) | (
|
||
(
|
||
self.standardised_asset_list["is_loft"] | self.standardised_asset_list["is_pitched"]
|
||
) & (
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].isin(
|
||
["below average", "none"]
|
||
)
|
||
)
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["epc_has_floor_recommendation"] = (
|
||
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
|
||
)
|
||
|
||
# Check if the boiler is electric
|
||
# We check if it contains both the terms boiler & electric
|
||
self.standardised_asset_list["has_electric_boiler"] = (
|
||
(
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]]
|
||
.str.lower().isin(
|
||
["boiler and radiators, electric"])
|
||
) | (
|
||
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM] == "electric boiler"
|
||
)
|
||
)
|
||
|
||
####################################
|
||
# Check solar eligibility
|
||
####################################
|
||
|
||
# Set up the filters to stop repetition
|
||
correct_heating_system = (
|
||
self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] |
|
||
self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] |
|
||
self.standardised_asset_list["has_electric_boiler"]
|
||
)
|
||
|
||
needs_heating_upgrade = (
|
||
self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] |
|
||
self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"]
|
||
)
|
||
|
||
# The requirements for walls are:
|
||
# 1) walls are insulated
|
||
# 2) property is a cavity (can be done insulated or not)
|
||
|
||
walls_meet_solar_requirements = (
|
||
# The landlord is saying the walls are insulated
|
||
self.standardised_asset_list["solar_landlord_walls_insulated"] |
|
||
# EPC data is saying the walls are insulated
|
||
self.standardised_asset_list["solar_epc_walls_insulated"] |
|
||
# Non-intrusives are saying the walls are insulated
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] |
|
||
# It's empty cavity
|
||
self.standardised_asset_list["cavity_is_empty"] |
|
||
# It's a cavity wall
|
||
(self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].str.contains("cavity"))
|
||
)
|
||
|
||
not_a_flat = (
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat"
|
||
)
|
||
|
||
solar_roof_meets_criteria = (
|
||
self.standardised_asset_list["solar_epc_roof_insulated"] |
|
||
self.standardised_asset_list["solar_epc_loft_needs_topup"]
|
||
)
|
||
|
||
self.standardised_asset_list["solar_eligible"] = (
|
||
# Property isn't a flag
|
||
not_a_flat &
|
||
# Landlord data or EPC data indicates the heating system is appropriate
|
||
correct_heating_system &
|
||
# The property doesn't currently have solar
|
||
~self.standardised_asset_list["property_has_solar"] &
|
||
# The walls are insulated
|
||
walls_meet_solar_requirements &
|
||
# Roof meets criteria
|
||
solar_roof_meets_criteria
|
||
)
|
||
|
||
# With heating upgrade
|
||
self.standardised_asset_list["solar_eligible_needs_heating_upgrade"] = (
|
||
not_a_flat &
|
||
# Needs heating upgrade
|
||
needs_heating_upgrade &
|
||
# The property doesn't currently have solar
|
||
~self.standardised_asset_list["property_has_solar"] &
|
||
# The walls are insulated
|
||
walls_meet_solar_requirements &
|
||
# Roof meets criteria
|
||
solar_roof_meets_criteria
|
||
)
|
||
|
||
# We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E
|
||
# or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables
|
||
self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = (
|
||
not_a_flat &
|
||
# Landlord data or EPC data indicates the heating system is appropriate - in this case, we can also take
|
||
# electric boilers
|
||
correct_heating_system &
|
||
# The property doesn't currently have solar
|
||
~self.standardised_asset_list["property_has_solar"] &
|
||
# The walls are uninsulated solid
|
||
~walls_meet_solar_requirements &
|
||
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 57)
|
||
)
|
||
|
||
# Drop anything we don't need
|
||
self.standardised_asset_list = self.standardised_asset_list.drop(
|
||
columns=["walls_u_value", "roof_u_value"]
|
||
)
|
||
|
||
# Adjust flagged extraction jobs to remove anything for solar
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
|
||
~self.standardised_asset_list["solar_eligible"]
|
||
)
|
||
|
||
# Finally, we note why each property has been flagged
|
||
self.standardised_asset_list["cavity_reason"] = None
|
||
|
||
empty_cavity_map = {
|
||
"non_intrusive_indicates_empty_cavity": "Non-Intrusive Data Shows Empty Cavity: ",
|
||
"non_intrusive_indicates_empty_cavity_has_solar": "Non-Intrusive Data Shows Empty Cavity - property "
|
||
"already has solar: ",
|
||
"non_intrusive_indicates_empty_cavity_no_year_filter": f"Non-Intrusive Data Shows Empty Cavity, "
|
||
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
|
||
|
||
}
|
||
for variable, description in empty_cavity_map.items():
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
self.standardised_asset_list[variable] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"]),
|
||
description + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
# We break the cavity reason into a few different categories, when the EPC is different from inspections
|
||
if self.old_format_non_intrusives_present:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
(self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin(
|
||
[
|
||
"retro drilled and filled", "retro drilled", "retro filled", "retro drilled & filled",
|
||
]
|
||
)) &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[
|
||
"SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
self.standardised_asset_list['non_intrusive_indicates_cavity_extraction'] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"EPC Shows Empty Cavity, inspections show filled or other: " + self.standardised_asset_list[
|
||
"SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
elif self.non_intrusives_present:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
(self.standardised_asset_list['non-intrusives: Insulated'] == "RETRO DRILLED") &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[
|
||
"SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
(self.standardised_asset_list['non-intrusives: Insulated'] == "FILLED AT BUILD") &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"EPC Shows Empty Cavity, inspections show filled at build: " + self.standardised_asset_list[
|
||
"SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
else:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"EPC Shows Empty Cavity: " + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"EPC Shows Empty Cavity, inspections show non-cavity build: " + self.standardised_asset_list[
|
||
"SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
# Landlord data: The landlord's data indicates that the wall is an uninsulated cavity wall, but EPC and
|
||
# inspections show filled
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
|
||
~self.standardised_asset_list["epc_indicates_empty_cavity"] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"Landlord Data Shows Empty Cavity, EPC & Inspections Shows Filled or Non-cavity: " +
|
||
self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
# Flag extraction
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
"Non-Intrusive Data Shows Cavity Extraction: " + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] &
|
||
pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"Non-Intrusive Data Shows Cavity Extraction, built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: " +
|
||
self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
######################################################
|
||
# Flag solar
|
||
######################################################
|
||
self.standardised_asset_list["solar_reason"] = None
|
||
|
||
# Map of variables and fill values for the solar_reason variable
|
||
# ordering of this map is important, where we flag our prioritised work types first
|
||
solar_reason_map = {
|
||
"solar_eligible": "Solar Eligible: ",
|
||
"solar_eligible_solid_wall_uninsulated": "Solar Eligible, Solid Wall Uninsulated, EPC E or Below: ",
|
||
"solar_eligible_needs_heating_upgrade": (
|
||
"Solar Eligible, Needs Heating Upgrade: "
|
||
)
|
||
}
|
||
|
||
for variable, reason in solar_reason_map.items():
|
||
self.standardised_asset_list["solar_reason"] = np.where(
|
||
self.standardised_asset_list[variable] & pd.isnull(self.standardised_asset_list["solar_reason"]),
|
||
reason + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["solar_reason"]
|
||
)
|
||
|
||
# Finally, anything flagged for solar should not be flagged for cavity - make them None
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
~pd.isnull(self.standardised_asset_list["solar_reason"]) &
|
||
~pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
None,
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
|
||
# Flag anything that has existing outcomes
|
||
if (self.outcomes is not None) and ("surveyed" in self.standardised_asset_list.columns):
|
||
|
||
if "installer refusal" not in self.standardised_asset_list.columns:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
(self.standardised_asset_list["surveyed"] > 0)
|
||
),
|
||
None,
|
||
self.standardised_asset_list["cavity_reason"]
|
||
)
|
||
else:
|
||
for col in ["cavity_reason", "solar_reason"]:
|
||
self.standardised_asset_list[col] = np.where(
|
||
(
|
||
(self.standardised_asset_list["surveyed"] > 0) |
|
||
(self.standardised_asset_list["installer refusal"] > 0)
|
||
),
|
||
None,
|
||
self.standardised_asset_list[col]
|
||
)
|
||
|
||
if self.master_surveyed is not None:
|
||
for col in ["cavity_reason", "solar_reason"]:
|
||
self.standardised_asset_list[col] = np.where(
|
||
(
|
||
(~pd.isnull(self.standardised_asset_list["submission_date"]))
|
||
),
|
||
None,
|
||
self.standardised_asset_list[col]
|
||
)
|
||
|
||
if self.ecosurv is not None:
|
||
for col in ["cavity_reason", "solar_reason"]:
|
||
self.standardised_asset_list[col] = np.where(
|
||
(
|
||
(~pd.isnull(self.standardised_asset_list["ecosurv_reference"]))
|
||
),
|
||
None,
|
||
self.standardised_asset_list[col]
|
||
)
|
||
|
||
blocks_of_flats = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats"
|
||
]
|
||
|
||
non_blocks_of_flats = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats"
|
||
]
|
||
|
||
# Produce some aggregate figures
|
||
self.work_type_figures = {
|
||
**non_blocks_of_flats["cavity_reason"].value_counts().to_dict(),
|
||
**{
|
||
k + " (Block of flats)": v for k, v in
|
||
blocks_of_flats["solar_reason"].value_counts().to_dict().items()
|
||
},
|
||
**self.standardised_asset_list["solar_reason"].value_counts().to_dict()
|
||
}
|
||
|
||
# We prepare outcomes for output
|
||
if self.outcomes is not None:
|
||
logger.info("Preparing outcomes for output")
|
||
identified_work = self.standardised_asset_list[
|
||
~pd.isnull(self.standardised_asset_list["cavity_reason"]) |
|
||
~pd.isnull(self.standardised_asset_list["solar_reason"])
|
||
][self.DOMNA_PROPERTY_ID].values
|
||
|
||
if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
|
||
self.outcomes_for_output = self.outcomes[
|
||
self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work)
|
||
]
|
||
|
||
def flat_analysis(self):
|
||
|
||
# We need to deduce the building name - we strip out the house number
|
||
|
||
# We want to deduce if flats have 50% of the properties below C75
|
||
# We group by postcode and property type
|
||
grouped = self.standardised_asset_list.groupby(
|
||
[self.STANDARD_POSTCODE, self.STANDARD_PROPERTY_TYPE]
|
||
)
|
||
|
||
flat_data = []
|
||
for _, group in grouped:
|
||
if "flat" in group[self.STANDARD_PROPERTY_TYPE].values:
|
||
num_flats = group[self.STANDARD_PROPERTY_TYPE].shape[0]
|
||
num_below_c75 = group[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
].lt(self.FILLED_CAVITY_SAP_THRESHOLD).sum()
|
||
# Check if any flats are below C69
|
||
num_flats_below_c69 = group[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
].lt(69).sum()
|
||
|
||
flat_data.append(
|
||
{
|
||
"Postcode": group[self.STANDARD_POSTCODE].iloc[0],
|
||
"Property Type": "Flat",
|
||
"Number of Flats with EPC": num_flats,
|
||
"Number of Flats below C75": num_below_c75,
|
||
"Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats),
|
||
"Number of Flats Below C69": num_flats_below_c69,
|
||
}
|
||
)
|
||
|
||
flat_data = pd.DataFrame(flat_data)
|
||
|
||
self.flat_data = flat_data
|
||
|
||
@staticmethod
|
||
def split_full_name(x):
|
||
if pd.isnull(x):
|
||
return None, None, None
|
||
x = x.lower()
|
||
titles = ["mr", "mrs", "ms", "miss", "dr", "prof"]
|
||
# Remove titles
|
||
detected_title = [title for title in titles if x.startswith(title)]
|
||
if detected_title:
|
||
for title in detected_title:
|
||
x = x.replace(title, "")
|
||
x = x.strip()
|
||
first_name, last_name = x.split(" ")[0], x.split(" ")[-1]
|
||
title = detected_title[0].title() if detected_title else None
|
||
return title, first_name.title(), last_name.title()
|
||
|
||
def load_contact_details(
|
||
self,
|
||
local_filepath,
|
||
sheet_name,
|
||
landlord_property_id,
|
||
phone_number_column=None,
|
||
email_column=None,
|
||
fullname_column=None,
|
||
firstname_column=None,
|
||
lastname_column=None
|
||
):
|
||
|
||
self.contact_detail_fields = {
|
||
"landlord_property_id": landlord_property_id,
|
||
"phone_number": phone_number_column,
|
||
"email": email_column,
|
||
"fullname": fullname_column,
|
||
"firstname": firstname_column,
|
||
"lastname": lastname_column
|
||
}
|
||
|
||
details_colnames = [
|
||
phone_number_column, email_column, fullname_column, firstname_column, lastname_column
|
||
]
|
||
# We'll fill them
|
||
none_details = [x for x in details_colnames if x is None]
|
||
details_colnames = [x for x in details_colnames if x is not None]
|
||
|
||
contact_details = pd.read_excel(
|
||
local_filepath, sheet_name=sheet_name
|
||
)[[self.contact_detail_fields["landlord_property_id"]] + details_colnames]
|
||
contact_details = contact_details[
|
||
~pd.isnull(contact_details[self.contact_detail_fields["landlord_property_id"]])
|
||
]
|
||
# Fill anything we don't have
|
||
for detail in none_details:
|
||
contact_details[detail] = None
|
||
|
||
if fullname_column and not (firstname_column and lastname_column):
|
||
contact_details["title"], contact_details["first_name"], contact_details["last_name"] = zip(
|
||
*contact_details[fullname_column].apply(self.split_full_name)
|
||
)
|
||
else:
|
||
raise NotImplementedError("Implement me")
|
||
|
||
self.contact_details = contact_details
|
||
|
||
def prepare_for_crm(self, company_domain, crm_pipeline_name, first_dealstage, assigned_surveyors):
|
||
"""
|
||
This function prepares the data for upload into Hubspot
|
||
:return:
|
||
"""
|
||
# This is a placeholder for now
|
||
|
||
# This maps the opportunities as we reference them, to the product data as stored in Hubspot
|
||
product_lookup_table = {
|
||
"Non-Intrusive Data Showed Cavity Extraction": {
|
||
"name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500
|
||
},
|
||
"Non-Intrusive Data Showed Empty Cavity": {
|
||
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
|
||
},
|
||
"Non-Intrusive Data Showed Empty Cavity but all SAP scores allowed": {
|
||
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
|
||
},
|
||
"Non-Intrusive Data Showed Cavity Extraction but all SAP scores allowed": {
|
||
"name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500
|
||
},
|
||
"EPC Data Showed Empty Cavity": {
|
||
"name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000
|
||
},
|
||
"Solid Floor, Insulated, No Solar": {
|
||
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
|
||
},
|
||
"Solid Floor, Insulated, Needs Loft": {
|
||
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
|
||
},
|
||
"Other Floor, Insulated, No Solar": {
|
||
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
|
||
},
|
||
"Other Floor, Insulated, Needs Loft": {
|
||
"name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608
|
||
}
|
||
}
|
||
# We check if all products are covered in the lookup table
|
||
cavity_products = self.standardised_asset_list["cavity_reason"].unique()
|
||
solar_products = self.standardised_asset_list["solar_reason"].unique()
|
||
# Check if there any options not in out lookup table
|
||
if (
|
||
any(x for x in cavity_products if x not in product_lookup_table) or
|
||
any(x for x in solar_products if x not in product_lookup_table)
|
||
):
|
||
raise ValueError("We have products not referenced in the lookup table - check this")
|
||
|
||
programme_data = self.standardised_asset_list.copy()
|
||
|
||
# Exclusions - these are properties we won't treat for the moment
|
||
product_exclusions = [
|
||
"Other Floor, Insulated, No Solar",
|
||
"Other Floor, Insulated, Needs Loft"
|
||
]
|
||
if product_exclusions:
|
||
logger.warning("Excluding products: %s", product_exclusions)
|
||
|
||
programme_data = programme_data[programme_data["solar_reason"].isin(product_exclusions) == False]
|
||
|
||
# Merge on the contact details
|
||
programme_data = programme_data.merge(
|
||
self.contact_details,
|
||
how="left",
|
||
left_on=self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
right_on=self.landlord_property_id,
|
||
)
|
||
|
||
programme_data["Company Domain Name <COMPANY domain>"] = company_domain
|
||
# Append the product data onto the programme data
|
||
programme_data["cavity_product"] = programme_data["cavity_reason"].map(
|
||
lambda x: product_lookup_table.get(x, {"name": None})["name"]
|
||
)
|
||
programme_data["solar_product"] = programme_data["solar_reason"].map(
|
||
lambda x: product_lookup_table.get(x, {"name": None})["name"]
|
||
)
|
||
|
||
programme_data["domna_product"] = programme_data["solar_reason"].copy()
|
||
programme_data["domna_product"] = np.where(
|
||
pd.isnull(programme_data["domna_product"]),
|
||
programme_data["solar_product"],
|
||
programme_data["domna_product"]
|
||
)
|
||
# We filter just on rows where we have a product
|
||
programme_data = programme_data[
|
||
~pd.isnull(programme_data["domna_product"])
|
||
]
|
||
programme_data = programme_data.drop(columns=["solar_product", "cavity_product"])
|
||
|
||
product_df = (
|
||
pd.DataFrame(product_lookup_table).T[["name", "id", "unit_price"]]
|
||
.reset_index()
|
||
.rename(
|
||
columns={
|
||
"name": "Name <LINE_ITEM name>",
|
||
"id": 'Product ID <LINE_ITEM hs_product_id>',
|
||
"unit_price": 'Unit price <LINE_ITEM price>',
|
||
"index": "domna_product"
|
||
}
|
||
)
|
||
)
|
||
|
||
product_df['Quantity <LINE_ITEM quantity>'] = 1
|
||
|
||
# Append on the product data
|
||
programme_data = programme_data.merge(
|
||
product_df,
|
||
how="left",
|
||
on="domna_product",
|
||
)
|
||
|
||
# Add in deal and pipeline information
|
||
programme_data["dealname"] = programme_data[self.STANDARD_FULL_ADDRESS] + " : " + programme_data[
|
||
"domna_product"]
|
||
programme_data['Pipeline <DEAL pipeline>'] = crm_pipeline_name
|
||
programme_data['Deal Stage <DEAL dealstage>'] = first_dealstage
|
||
programme_data['Associations: Listing'] = "Property Owner"
|
||
|
||
programme_data = programme_data.merge(
|
||
assigned_surveyors.rename(
|
||
columns={self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID}
|
||
), how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
|
||
)
|
||
|
||
# This maps the hubspot schema to the template. Anything that is not covered in this will be flagged
|
||
schema_mappings = {
|
||
'Name <LISTING hs_name>': self.DOMNA_PROPERTY_ID, # TODO: Maybe change this?
|
||
'Company Domain Name <COMPANY domain>': 'Company Domain Name <COMPANY domain>',
|
||
'Email <CONTACT email>': (
|
||
self.contact_detail_fields["email"] if self.contact_detail_fields["email"] else None
|
||
), # TODO: Review
|
||
'First Name <CONTACT firstname>': (
|
||
self.contact_detail_fields["firstname"] if self.contact_detail_fields["firstname"] else None
|
||
), # TODO: Review
|
||
'Last Name <CONTACT lastname>': (
|
||
self.contact_detail_fields["lastname"] if self.contact_detail_fields["lastname"] else None
|
||
), # TODO: Review
|
||
'Phone <CONTACT phone>': (
|
||
self.contact_detail_fields["phone_number"] if self.contact_detail_fields["phone_number"] else None
|
||
), # TODO: Review
|
||
'Full Address <LISTING full_address>': self.STANDARD_FULL_ADDRESS,
|
||
'Address 1 <LISTING hs_address_1>': self.STANDARD_ADDRESS_1,
|
||
'Address 2 <LISTING hs_address_2>': None, # TODO: Don't have this for the moment
|
||
'Postcode <LISTING hs_zip>': self.STANDARD_POSTCODE,
|
||
'Property Type <LISTING property_type>': self.STANDARD_PROPERTY_TYPE,
|
||
'Property Sub Type <LISTING property_sub_type>': None, # TODO: Don't have this for the moment
|
||
'Bedroom(s) <LISTING hs_bedrooms>': None, # TODO: Don't have this for the moment
|
||
'Domna Property ID <LISTING domna_property_id>': self.DOMNA_PROPERTY_ID,
|
||
'National UPRN <LISTING national_uprn>': (
|
||
self.STANDARD_UPRN if self.STANDARD_UPRN is not None else self.EPC_API_DATA_NAMES["uprn"]
|
||
),
|
||
'Owner Property ID <LISTING owner_property_id>': self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
'Wall Construction <LISTING wall_construction>': self.STANDARD_WALL_CONSTRUCTION,
|
||
'Heating System <LISTING heating_system>': self.STANDARD_HEATING_SYSTEM,
|
||
'Year Built <LISTING hs_year_built>': self.STANDARD_YEAR_BUILT,
|
||
'Boiler Make <LISTING boiler_make>': None, # TODO: Don't have this for the moment
|
||
'Boiler Model <LISTING boiler_model>': None, # TODO: Don't have this for the moment
|
||
'Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>': None,
|
||
# TODO: Don't have this for the moment
|
||
'Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>': (
|
||
"non-intrusives: Construction" if self.non_intrusives_present else None
|
||
),
|
||
'Non-intrusives: Insulation <LISTING non_intrusives__insulation>': (
|
||
"non-intrusives: Insulated" if self.non_intrusives_present else None
|
||
),
|
||
'Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>': (
|
||
"non-intrusives: Material" if self.non_intrusives_present else None
|
||
),
|
||
'Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>': (
|
||
'non-intrusives: CIGA Check Required' if self.non_intrusives_present else None
|
||
),
|
||
'Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>': (
|
||
'non-intrusives: PV, ACCESS ISSUE, SEE NOTES' if self.non_intrusives_present else None
|
||
),
|
||
'Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>': (
|
||
'non-intrusives: OFF GAS - ROOF ORIENTATION' if self.non_intrusives_present else None
|
||
),
|
||
'Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>': (
|
||
'non-intrusives: Any further surveyor notes' if self.non_intrusives_present else None
|
||
),
|
||
'Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>': (
|
||
'non-intrusives: Surveyors Name' if self.non_intrusives_present else None
|
||
),
|
||
'CIGA: Date Requested <LISTING ciga__date_requested>': None, # TODO: Don't have this for the moment
|
||
'CIGA: Cavity Guarantee Found <LISTING ciga__cavity_guarantee_found>': None,
|
||
'Last EPC: Is Estimated <LISTING last_epc__is_estimated>': self.EPC_API_DATA_NAMES["estimated"],
|
||
'Last EPC: EPC Rating <LISTING last_epc__epc_rating>': self.EPC_API_DATA_NAMES["current-energy-rating"],
|
||
'Last EPC: SAP Rating <LISTING last_epc__sap_rating>': self.EPC_API_DATA_NAMES["current-energy-efficiency"],
|
||
'Last EPC: Main Heating Description <LISTING last_epc__main_heating_description>': self.EPC_API_DATA_NAMES[
|
||
"mainheat-description"],
|
||
'Last EPC: Heating Controls <LISTING last_epc__heating_controls>': self.EPC_API_DATA_NAMES[
|
||
"mainheatcont-description"],
|
||
'Last EPC: Lodgement Date <LISTING last_epc__lodgement_date>': self.EPC_API_DATA_NAMES["inspection-date"],
|
||
'Last EPC: Floor Area <LISTING last_epc__floor_area>': self.EPC_API_DATA_NAMES["total-floor-area"],
|
||
'Last EPC: Wall <LISTING last_epc__wall>': self.EPC_API_DATA_NAMES["walls-description"],
|
||
'Last EPC: Roof <LISTING last_epc__roof>': self.EPC_API_DATA_NAMES["roof-description"],
|
||
'Last EPC: Floor <LISTING last_epc__floor>': self.EPC_API_DATA_NAMES["floor-description"],
|
||
'Last EPC: Room Height <LISTING last_epc__room_height>': self.EPC_API_DATA_NAMES["floor-height"],
|
||
'Last EPC: Age Band <LISTING last_epc__age_band>': self.EPC_API_DATA_NAMES["construction-age-band"],
|
||
'Deal Stage <DEAL dealstage>': 'Deal Stage <DEAL dealstage>',
|
||
'Pipeline <DEAL pipeline>': 'Pipeline <DEAL pipeline>',
|
||
'Expected Commencement Date <DEAL expected_commencement_date>': None, # TODO: Need to set this,
|
||
'Deal Name <DEAL dealname>': "dealname", # Need to create this,
|
||
'Product ID <LINE_ITEM hs_product_id>': 'Product ID <LINE_ITEM hs_product_id>',
|
||
'Name <LINE_ITEM name>': 'Name <LINE_ITEM name>',
|
||
'Unit price <LINE_ITEM price>': 'Unit price <LINE_ITEM price>',
|
||
'Quantity <LINE_ITEM quantity>': 'Quantity <LINE_ITEM quantity>',
|
||
'Deal Owner': 'surveyor_email',
|
||
'Amount <DEAL amount>': 'Unit price <LINE_ITEM price>',
|
||
}
|
||
|
||
# We now create the finalised dataset to be uploaded into Hubspot
|
||
variables_required = list(schema_mappings.values())
|
||
variables_required = [v for v in variables_required if v is not None]
|
||
# We now flag anything that has a none value, which is information we haven't got right now
|
||
none_variables = [k for k, v in schema_mappings.items() if v is None]
|
||
# We'll add placeholder columns for the None variables
|
||
programme_data = programme_data[variables_required]
|
||
for col in none_variables:
|
||
programme_data[col] = None
|
||
|
||
programme_data = programme_data.rename(
|
||
columns={v: k for k, v in schema_mappings.items() if v is not None}
|
||
)
|
||
|
||
self.hubspot_data = programme_data
|
||
|
||
def flag_ecosurv(self, ecosurv_landlords=None, landlords_to_ignore=None):
|
||
|
||
"""
|
||
This class will match ecosurv data to the asset list
|
||
:return:
|
||
"""
|
||
if ecosurv_landlords is None:
|
||
return
|
||
|
||
# TODO: Fetch from Sharepoint
|
||
ecosurv_filepath = "/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/07.05.2025.csv"
|
||
logger.info("Getting Ecosurv data from %s", ecosurv_filepath)
|
||
self.ecosurv = pd.read_csv(ecosurv_filepath, encoding="cp437")
|
||
|
||
landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False)
|
||
landlord_references = landlords[
|
||
landlords["Landlord"].str.lower().str.contains(ecosurv_landlords)
|
||
]
|
||
|
||
landlord_ecosurv_data = self.ecosurv[
|
||
self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values)
|
||
]
|
||
|
||
if landlords_to_ignore is not None:
|
||
landlord_ecosurv_data = landlord_ecosurv_data[
|
||
~landlord_ecosurv_data["Landlord"].isin(landlords_to_ignore)
|
||
]
|
||
|
||
# Try and match to asset list
|
||
matched = []
|
||
unmatched = []
|
||
for _, row in tqdm(landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0]):
|
||
postcode = row["Postcode"].lower()
|
||
df = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE].str.replace(" ", "").str.lower() ==
|
||
postcode
|
||
)
|
||
].copy()
|
||
|
||
if df.empty:
|
||
unmatched.append(row["Reference"])
|
||
continue
|
||
|
||
if df.shape[0] > 1:
|
||
house_no = SearchEpc.get_house_number(row["Address Line 1"], row["Postcode"])
|
||
df["house_no"] = df.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE]
|
||
),
|
||
axis=1
|
||
)
|
||
df = df[df["house_no"] == house_no]
|
||
|
||
if df.shape[0] > 1:
|
||
# We compare address line 1 to full address
|
||
if any(
|
||
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
|
||
row["Address Line 1"].lower(), na=False)
|
||
):
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
|
||
row["Address Line 1"].lower(), na=False
|
||
)
|
||
]
|
||
|
||
if df.shape[0] > 1:
|
||
df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"]
|
||
|
||
if df.shape[0] == 1:
|
||
matched.append(
|
||
{
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
|
||
"ecosurv_reference": row["Reference"],
|
||
"ecosurv_address1": row["Address Line 1"],
|
||
"ecosurv_postcode": row["Postcode"],
|
||
}
|
||
)
|
||
continue
|
||
|
||
if df.shape[0] > 1:
|
||
unmatched.append(row["Reference"])
|
||
continue
|
||
|
||
logger.info("Matched %s properties to ecosurv data", len(matched))
|
||
logger.info("%s properties in Ecosurv remain unmatched", len(unmatched))
|
||
|
||
# We now match
|
||
matched = pd.DataFrame(matched)
|
||
# We'll possibly have duplicates here, where properties have been sold twice. Ww de-dupe
|
||
if matched[self.STANDARD_LANDLORD_PROPERTY_ID].duplicated().sum():
|
||
# It doesn't matter too much which record we take
|
||
matched = matched.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
matched,
|
||
how="left",
|
||
on=self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
)
|
||
|
||
# We keep a record of submissions that were NOT matches
|
||
self.ecosurv_no_match = self.ecosurv[
|
||
self.ecosurv["Reference"].isin(unmatched)
|
||
].copy()
|
||
|
||
def flag_outcomes(
|
||
self,
|
||
outcomes_filepaths,
|
||
outcomes_sheetname,
|
||
outcomes_address,
|
||
outcomes_postcode,
|
||
outcomes_houseno,
|
||
outcomes_id
|
||
):
|
||
if not outcomes_filepaths:
|
||
return
|
||
|
||
self.outcomes = []
|
||
outcomes_no_match = []
|
||
lookup = []
|
||
for idx, outcomes_filepath in enumerate(outcomes_filepaths):
|
||
outcomes = pd.read_excel(outcomes_filepath, sheet_name=outcomes_sheetname[idx])
|
||
outcomes["row_id"] = outcomes.index
|
||
|
||
if outcomes_houseno[idx] is None:
|
||
outcomes_houseno = "houseno"
|
||
outcomes["houseno"] = outcomes[outcomes_address[idx]].apply(
|
||
lambda x: SearchEpc.get_house_number(x, outcomes[outcomes_postcode])
|
||
)
|
||
|
||
# We handle an edge case that occured for LHP
|
||
if "Notes / Outcomes" in outcomes.columns and "Outcome" not in outcomes.columns:
|
||
# We use the re-mapper to handle this:
|
||
outcomes["Notes / Outcomes"] = outcomes["Notes / Outcomes"].str.strip()
|
||
values_to_remap = outcomes["Notes / Outcomes"].unique()
|
||
# We want to map this to our standardised list of property types we're interested in
|
||
remapper = DataRemapper(
|
||
standard_values=outcomes_mappings.outcomes_values, standard_map=outcomes_mappings.outcomes_map
|
||
)
|
||
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
|
||
# Perform the remap
|
||
outcomes["Outcome"] = outcomes["Notes / Outcomes"].map(remap_dictionary)
|
||
|
||
outcomes["Outcome"] = outcomes["Outcome"].str.lower()
|
||
|
||
logger.info("Matching outcomes to asset list")
|
||
# Merge the outcomes onto the asset list - we check we're able to match sufficiently well
|
||
lookup_i = []
|
||
nomatch_i = []
|
||
for _, x in tqdm(outcomes.iterrows(), total=len(outcomes)):
|
||
|
||
if pd.isnull(x[outcomes_address[idx]]) or not x[outcomes_address[idx]]:
|
||
continue
|
||
|
||
# Check if we have an id
|
||
oid = x[outcomes_id[idx]] if outcomes_id[idx] is not None else None
|
||
|
||
if oid is not None:
|
||
matched = self.standardised_asset_list[
|
||
(self.standardised_asset_list[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].str.strip() == oid)
|
||
]
|
||
|
||
if matched.shape[0] == 1:
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
|
||
}
|
||
)
|
||
continue
|
||
|
||
address_clean = x[outcomes_address[idx]].lower().replace(",", "").replace(" ", " ")
|
||
|
||
matched = self.standardised_asset_list[
|
||
(self.standardised_asset_list[
|
||
self.STANDARD_FULL_ADDRESS
|
||
].str.lower().str.replace(",", "").str.replace(" ", " ") == address_clean)
|
||
]
|
||
|
||
if matched.shape[0] == 1:
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
|
||
}
|
||
)
|
||
continue
|
||
|
||
matched = self.standardised_asset_list[
|
||
(self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip() == x[outcomes_postcode[idx]])
|
||
].copy()
|
||
if not matched.empty:
|
||
matched["houseno"] = matched.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
|
||
),
|
||
axis=1
|
||
)
|
||
|
||
if pd.isnull(x[outcomes_houseno[idx]]):
|
||
house_no_to_match = SearchEpc.get_house_number(
|
||
str(x[outcomes_address[idx]]), str(x[outcomes_postcode[idx]])
|
||
)
|
||
if isinstance(house_no_to_match, str):
|
||
house_no_to_match = house_no_to_match.lower()
|
||
else:
|
||
house_no_to_match = str(x[outcomes_houseno[idx]]).strip()
|
||
|
||
matched = matched[matched["houseno"].astype(str) == house_no_to_match]
|
||
if matched.shape[0] == 1:
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
|
||
}
|
||
)
|
||
continue
|
||
elif not matched.empty:
|
||
# Use levenstein distance to match
|
||
matched["address"] = (
|
||
matched[self.STANDARD_ADDRESS_1] + " " + matched[self.STANDARD_POSTCODE]
|
||
)
|
||
|
||
best_match = process.extractOne(
|
||
x[outcomes_address[idx]], matched[self.STANDARD_FULL_ADDRESS].values
|
||
)[0]
|
||
matched = matched[matched[self.STANDARD_FULL_ADDRESS] == best_match]
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0]
|
||
}
|
||
)
|
||
continue
|
||
|
||
nomatch_i.append(x["row_id"])
|
||
|
||
outcomes_no_match_i = outcomes[outcomes["row_id"].isin(nomatch_i)]
|
||
lookup_i = pd.DataFrame(lookup_i)
|
||
|
||
outcomes_no_match.append(outcomes_no_match_i)
|
||
lookup.append(lookup_i)
|
||
self.outcomes.append(outcomes)
|
||
|
||
lookup = pd.concat(lookup)
|
||
self.outcomes_no_match = pd.concat(outcomes_no_match)
|
||
self.outcomes = pd.concat(self.outcomes)
|
||
|
||
if lookup.empty:
|
||
return
|
||
|
||
# We will have duplicated domna property IDs, where a surveyor has been to a property multiple times
|
||
# Where we have multiple rows, we want to make a call on what the action should be. For example,
|
||
# there may be properties that have been visited multiple times where the outcome was "See notes" implying
|
||
# that the surveyor had a detailed explanation as to why they couldn't gain access so if this has
|
||
# happened multiple times, in this case we judge that the work may not be viable
|
||
|
||
if "Week Commencing" in self.outcomes.columns:
|
||
date_col = "Week Commencing"
|
||
elif "Survey Date" in self.outcomes.columns:
|
||
date_col = "Survey Date"
|
||
elif "Date letters sent" in self.outcomes.columns:
|
||
date_col = "Date letters sent"
|
||
elif "Date Letter sent" in self.outcomes.columns:
|
||
date_col = "Date Letter sent"
|
||
else:
|
||
raise NotImplementedError("Invalid date in outcomes - implement me")
|
||
|
||
notes_col = "Notes" if "Notes" in outcomes.columns else "Notes / Outcomes"
|
||
|
||
lookup = lookup.merge(
|
||
self.outcomes[["row_id", "Outcome", notes_col, date_col]], how="left", on="row_id"
|
||
)
|
||
|
||
visit_counts = (
|
||
lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"]
|
||
.count()
|
||
.reset_index()
|
||
.rename(columns={"row_id": "visit_count"})
|
||
.sort_values("visit_count", ascending=False)
|
||
)
|
||
|
||
def extract_date(s):
|
||
if isinstance(s, str):
|
||
match = re.search(r"(\d{2}\.\d{2}\.\d{4})", s)
|
||
if match:
|
||
return pd.to_datetime(match.group(1), format="%d.%m.%Y", errors="coerce")
|
||
return pd.NaT
|
||
|
||
lookup['parsed_date'] = lookup[date_col].apply(extract_date)
|
||
|
||
def get_latest_note(group):
|
||
surveyed = group[group['Outcome'] == 'surveyed']
|
||
if not surveyed.empty:
|
||
return surveyed.sort_values('parsed_date', ascending=False).iloc[0]
|
||
else:
|
||
return group.sort_values('parsed_date', ascending=False).iloc[0]
|
||
|
||
latest_note = (
|
||
lookup.groupby('domna_property_id', group_keys=False).
|
||
apply(get_latest_note).
|
||
reset_index(drop=True)
|
||
)
|
||
latest_note = latest_note[["domna_property_id", notes_col]]
|
||
|
||
pivot_df = lookup.groupby(["domna_property_id", "Outcome"]).size().unstack(fill_value=0).reset_index()
|
||
pivot_df = pivot_df.merge(
|
||
visit_counts, how="left", on="domna_property_id"
|
||
)
|
||
|
||
# We want the latest note
|
||
|
||
if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
raise Exception("We have duplicated property IDs in the outcomes data")
|
||
|
||
# We merge this data onto outcomes
|
||
self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin(lookup["row_id"].values)
|
||
self.outcomes = self.outcomes.merge(lookup[["row_id", "domna_property_id"]], how="left", on="row_id")
|
||
|
||
# We merge out pivoted outcomes onto the asset list
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
pivot_df, how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id"
|
||
)
|
||
# Merge the latest note
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
latest_note.rename(columns={notes_col: "Latest Route March Note"}),
|
||
how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id"
|
||
)
|
||
|
||
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
raise ValueError("Duplicates appreared - something went wrong")
|
||
|
||
self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False)
|
||
|
||
def flag_survey_master(
|
||
self,
|
||
master_filepaths,
|
||
master_to_asset_list_filepath=None
|
||
):
|
||
# TODO: This probably needs further expansion
|
||
|
||
if not master_filepaths:
|
||
return
|
||
|
||
if master_to_asset_list_filepath is not None:
|
||
id_map = pd.read_csv(master_to_asset_list_filepath)
|
||
else:
|
||
id_map = pd.DataFrame()
|
||
|
||
logger.info("Getting masters and merging onto asset list")
|
||
master_surveyed = []
|
||
unmatched_submissions = []
|
||
for filepath in master_filepaths:
|
||
master_data = pd.read_csv(filepath)
|
||
# Strip columns
|
||
master_data.columns = [c.strip() for c in master_data.columns]
|
||
master_data.columns = [re.sub(r'\s+', ' ', c) for c in master_data.columns]
|
||
# Drop any unnamed columns
|
||
unnamed_columns = [c for c in master_data.columns if "Unnamed:" in c]
|
||
master_data = master_data.drop(columns=unnamed_columns)
|
||
|
||
if not id_map.empty:
|
||
master_data = master_data.merge(
|
||
id_map, how="left", on=['NO.', 'Street / Block Name', 'Post Code']
|
||
)
|
||
|
||
if "INSTALLED OR CANCELLED" in master_data.columns:
|
||
install_col = "INSTALLED OR CANCELLED"
|
||
elif "INSTALL / CANCELLATION DATE" in master_data.columns:
|
||
install_col = "INSTALL / CANCELLATION DATE"
|
||
elif 'INSTALL/ CANCELLATION DATE' in master_data.columns:
|
||
install_col = 'INSTALL/ CANCELLATION DATE'
|
||
else:
|
||
raise ValueError("No install or cancellation date")
|
||
|
||
submission_col = (
|
||
"SUBMISSION DATE" if "SUBMISSION DATE" in master_data.columns else "SUBMISSION DATE TO INSTALLERS"
|
||
)
|
||
|
||
# if "UPRN" in master_data.columns:
|
||
# # We just need to check if any were cancelled
|
||
# master_to_append = master_data[
|
||
# ["UPRN", install_col, submission_col]
|
||
# ].rename(
|
||
# columns={
|
||
# "UPRN": self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
# install_col: "survey_status",
|
||
# submission_col: "submission_date"
|
||
# }
|
||
# )
|
||
# master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel")
|
||
#
|
||
# master_surveyed.append(master_to_append)
|
||
# continue
|
||
|
||
master_data["row_id"] = master_data.index
|
||
|
||
self.standardised_asset_list["house_no"] = self.standardised_asset_list.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
|
||
),
|
||
axis=1
|
||
)
|
||
|
||
scheme_col = (
|
||
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" if
|
||
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns else "AFFORDABLE WARMTH"
|
||
)
|
||
postcode_col = "POSTCODE" if "POSTCODE" in master_data.columns else "Post Code"
|
||
house_no_col = 'NO.' if 'NO.' in master_data.columns else "NO"
|
||
property_type_col = (
|
||
"PROPERTY TYPE As per table emailed" if
|
||
"PROPERTY TYPE As per table emailed" in
|
||
master_data.columns else "PROPERTY TYPE As per table emailed"
|
||
)
|
||
measure_mix_col = "MEASURE COMBO"
|
||
|
||
# Otherwise, we need to match algorithmically
|
||
has_property_id = "UPRN" in master_data.columns
|
||
logger.info("Matching master data to asset list")
|
||
matched = []
|
||
unmatched = []
|
||
for _, row in tqdm(master_data.iterrows(), total=len(master_data)):
|
||
|
||
original_house_no = row[house_no_col]
|
||
original_street = row["Street / Block Name"]
|
||
original_postcode = row[postcode_col]
|
||
|
||
if pd.isnull(row[postcode_col]):
|
||
continue
|
||
|
||
# if has_property_id:
|
||
# submission_uprn = row["UPRN"]
|
||
#
|
||
# if not pd.isnull(submission_uprn):
|
||
# df = self.standardised_asset_list[
|
||
# self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] == submission_uprn
|
||
# ]
|
||
|
||
postcode_no_space = row[postcode_col].strip().replace(" ", "").lower()
|
||
|
||
df = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE]
|
||
.str.strip().str.lower().str.replace(" ", "") == postcode_no_space
|
||
)
|
||
]
|
||
|
||
house_no = row[house_no_col]
|
||
if isinstance(house_no, (float, int)):
|
||
house_no = str(int(house_no))
|
||
|
||
if house_no not in df["house_no"].values:
|
||
# Handle postcode errors
|
||
postal_region = row[postcode_col].split(" ")[0].lower()
|
||
df = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE]
|
||
.str.strip().str.lower().str.startswith(postal_region)
|
||
)
|
||
]
|
||
|
||
if house_no not in df["house_no"].values:
|
||
unmatched.append(row["row_id"])
|
||
continue
|
||
df = df[df["house_no"] == house_no]
|
||
if df.shape[0] > 1:
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(row["Street / Block Name"].lower())
|
||
]
|
||
if df.shape[0] == 0:
|
||
unmatched.append(row["row_id"])
|
||
continue
|
||
matched.append(
|
||
{
|
||
"row_id": row["row_id"],
|
||
"original_house_no": original_house_no,
|
||
"original_street": original_street,
|
||
"original_postcode": original_postcode,
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
|
||
}
|
||
)
|
||
|
||
if house_no in df["house_no"].values:
|
||
df = df[df["house_no"] == house_no]
|
||
if df.shape[0] != 1:
|
||
# Levenstein distance
|
||
|
||
if any(df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])):
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])
|
||
]
|
||
else:
|
||
# Levenstein distance
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS].str.lower().apply(
|
||
lambda x: process.extractOne(
|
||
" ".join([row[house_no_col], row["Street / Block Name"], row["TOWN"]]).lower(),
|
||
x
|
||
)[1]
|
||
) > 90
|
||
]
|
||
|
||
if df.shape[0] == 0:
|
||
unmatched.append(row["row_id"])
|
||
continue
|
||
|
||
if any(df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
|
||
" ".join([row[house_no_col], row["Street / Block Name"]]).lower()
|
||
)):
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(
|
||
" ".join([row[house_no_col], row["Street / Block Name"]]).lower()
|
||
)
|
||
]
|
||
|
||
if any(
|
||
df[self.STANDARD_PROPERTY_TYPE].str.contains(row[property_type_col].split(" ")[-1].lower())
|
||
):
|
||
# We ignore "block of flats" entries
|
||
df = df[
|
||
df[self.STANDARD_PROPERTY_TYPE].str.contains(
|
||
row[property_type_col].split(" ")[-1].lower()
|
||
) & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
|
||
]
|
||
|
||
if df.shape[0] != 1:
|
||
# We have multiple matches
|
||
raise NotImplementedError("FIX ME")
|
||
matched.append(
|
||
{
|
||
"row_id": row["row_id"],
|
||
"original_house_no": original_house_no,
|
||
"original_street": original_street,
|
||
"original_postcode": original_postcode,
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
|
||
}
|
||
)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.drop(columns="house_no")
|
||
|
||
# We match the "UPRN" which is the landlords ID, onto the master sheet
|
||
matched = pd.DataFrame(matched)
|
||
master_to_append = master_data[[scheme_col, "row_id", install_col, submission_col, measure_mix_col]].merge(
|
||
matched, how="left", on="row_id"
|
||
).rename(
|
||
columns={
|
||
scheme_col: "funding_scheme",
|
||
measure_mix_col: "measure_mix",
|
||
install_col: "survey_status",
|
||
submission_col: "submission_date"
|
||
}
|
||
)
|
||
master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel")
|
||
master_surveyed.append(master_to_append)
|
||
unmatched_df = master_data[
|
||
master_data["row_id"].isin(unmatched)
|
||
]
|
||
|
||
# The columns are massively different - we take just a few
|
||
unmatched_df = unmatched_df[
|
||
[
|
||
scheme_col, house_no_col, "Street / Block Name", postcode_col, install_col, submission_col
|
||
]
|
||
].rename(
|
||
columns={
|
||
scheme_col: "Funding Scheme",
|
||
house_no_col: "House Number",
|
||
postcode_col: "Postcode",
|
||
install_col: "survey_status",
|
||
submission_col: "submission_date"
|
||
}
|
||
)
|
||
|
||
unmatched_submissions.append(unmatched_df)
|
||
|
||
master_surveyed = pd.concat(master_surveyed)
|
||
master_surveyed = master_surveyed[~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID])]
|
||
master_surveyed = master_surveyed[
|
||
~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin(
|
||
["NOT ON ASSET LIST", "Missing From Asset List"]
|
||
)
|
||
]
|
||
|
||
master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].astype(str)
|
||
|
||
# We de-dupe crudely on landlord property id
|
||
self.master_surveyed = master_surveyed.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
|
||
)
|
||
|
||
# Finally, we keep a record of the unmatched
|
||
if unmatched_submissions:
|
||
self.unmatched_submissions = pd.concat(
|
||
unmatched_submissions
|
||
)
|