mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
4614 lines
189 KiB
Python
4614 lines
189 KiB
Python
import hashlib
|
||
import os
|
||
import re
|
||
import tiktoken
|
||
from pprint import pprint
|
||
from datetime import datetime
|
||
import asset_list.hubspot.config as hubspot_config
|
||
|
||
from openai import OpenAI
|
||
import numpy as np
|
||
import pandas as pd
|
||
from tqdm import tqdm
|
||
from thefuzz import process
|
||
from utils.logger import setup_logger
|
||
from backend.SearchEpc import SearchEpc
|
||
from BaseUtility import Definitions
|
||
import asset_list.mappings.property_type as property_type_mappings
|
||
import asset_list.mappings.walls as walls_mappings
|
||
import asset_list.mappings.heating_systems as heating_mappings
|
||
import asset_list.mappings.exising_pv as existing_pv_mappings
|
||
import asset_list.mappings.built_form as built_form_mappings
|
||
import asset_list.mappings.roof as roof_mappings
|
||
import asset_list.mappings.outcomes as outcomes_mappings
|
||
|
||
from recommendations.recommendation_utils import (
|
||
estimate_perimeter,
|
||
estimate_external_wall_area,
|
||
estimate_number_of_floors,
|
||
)
|
||
|
||
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
|
||
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
|
||
|
||
from dotenv import load_dotenv
|
||
|
||
logger = setup_logger()
|
||
load_dotenv(dotenv_path="../backend/.env")
|
||
|
||
|
||
# OpenAI API Key (set this in your environment variables for security)
|
||
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "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")
|
||
|
||
|
||
class DataRemapper:
|
||
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
|
||
print(f"{OPENAI_API_KEY}")
|
||
"""
|
||
Initialize the remapper with standard values and a predefined mapping.
|
||
|
||
:param standard_values: Set of allowed standardized values.
|
||
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
|
||
"""
|
||
self.standard_values = standard_values
|
||
self.standard_map = standard_map
|
||
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
|
||
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
|
||
|
||
# Tokenizer for counting tokens
|
||
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
|
||
|
||
# Track token usage and remap dictionary
|
||
self.total_tokens_used = 0
|
||
self.total_cost = 0
|
||
self.remap_dict = {} # {original_value: standardized_value}
|
||
self.max_tokens = max_tokens # Limit for OpenAI API
|
||
|
||
# Memoization for AI calls
|
||
self.ai_cache = (
|
||
{}
|
||
) # {tuple(unmapped_values): {original_value: standardized_value}}
|
||
# Capture the reponse for debugging
|
||
self.ai_response = None
|
||
|
||
# OpenAI pricing (as of Feb 2024)
|
||
self.pricing = {
|
||
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
|
||
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
|
||
}
|
||
|
||
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
|
||
|
||
@staticmethod
|
||
def clean_string(text):
|
||
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
|
||
if not isinstance(text, str):
|
||
return None
|
||
text = text.strip().lower()
|
||
text = re.sub(r"[^\w\s]", "", text) # Remove punctuation
|
||
# Replace double strings
|
||
text = re.sub(r"\s+", " ", text)
|
||
return text
|
||
|
||
def fuzzy_match(self, text):
|
||
"""Use fuzzy matching to find the closest standard value."""
|
||
match, score = (
|
||
process.extractOne(text, self.standard_values) if text else (None, 0)
|
||
)
|
||
return match if score >= self.fuzzy_threshold else None
|
||
|
||
def count_tokens(self, text):
|
||
"""Estimate the number of tokens in a given text."""
|
||
return len(self.tokenizer.encode(text)) if text else 0
|
||
|
||
def ai_standardize(self, unmapped_values):
|
||
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
|
||
if not unmapped_values:
|
||
return {}
|
||
|
||
unmapped_tuple = tuple(
|
||
sorted(unmapped_values)
|
||
) # Ensure consistency for memoization
|
||
if unmapped_tuple in self.ai_cache:
|
||
return self.ai_cache[unmapped_tuple] # Return memoized result
|
||
|
||
prompt = f"""
|
||
You are an expert in data classification. Standardize each of these values into one of the categories:
|
||
{list(self.standard_values)}.
|
||
|
||
Return only a JSON dictionary where:
|
||
- The keys are the original values.
|
||
- The values are the standardized ones.
|
||
|
||
Strictly return JSON **without markdown formatting** or extra text.
|
||
|
||
Example Output:
|
||
{{
|
||
"BLKHOUS": "block house",
|
||
"BEDSIT": "bedsit"
|
||
}}
|
||
|
||
Values to standardize:
|
||
{unmapped_values}
|
||
"""
|
||
|
||
# Count input tokens
|
||
input_tokens = self.count_tokens(prompt)
|
||
if input_tokens > self.max_tokens:
|
||
raise ValueError("Input tokens exceed the maximum limit.")
|
||
|
||
logger.info("Calling OpenAI API for standardization...")
|
||
response = self.openai_client.chat.completions.create(
|
||
model=self.ai_model,
|
||
messages=[{"role": "user", "content": prompt}],
|
||
max_tokens=self.max_tokens,
|
||
temperature=0.1,
|
||
)
|
||
|
||
output_text = response.choices[0].message.content.strip()
|
||
output_tokens = self.count_tokens(output_text) # Count output tokens
|
||
|
||
# Track total token usage
|
||
self.total_tokens_used += input_tokens + output_tokens
|
||
|
||
# Estimate cost
|
||
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
|
||
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
|
||
self.total_cost += input_cost + output_cost
|
||
|
||
try:
|
||
# Parse response as dictionary
|
||
mapping = eval(output_text) # OpenAI should return a valid dictionary
|
||
except:
|
||
mapping = {val: "unknown" for val in unmapped_values} # Fallback
|
||
|
||
# Memoize the AI response
|
||
self.ai_cache[unmapped_tuple] = mapping
|
||
# We store the raw AI response for debugging
|
||
logger.debug(f"AI Response: {mapping}")
|
||
self.ai_response = output_text
|
||
|
||
return mapping
|
||
|
||
def standardize_list(self, values_to_remap):
|
||
"""
|
||
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
|
||
|
||
:param values_to_remap: List of raw values to standardize.
|
||
:return: Dictionary {original_value: standardized_value}.
|
||
"""
|
||
unique_values = set(values_to_remap) # Process only unique values
|
||
|
||
unmapped_values = []
|
||
for value in unique_values:
|
||
if pd.isna(value): # Handle NaN values
|
||
self.remap_dict[value] = "unknown"
|
||
continue
|
||
|
||
cleaned_value = self.clean_string(value)
|
||
|
||
# Rule-Based Check (Predefined Mapping)
|
||
if cleaned_value in self.standard_map or value in self.standard_map:
|
||
self.remap_dict[value] = (
|
||
self.standard_map[cleaned_value]
|
||
if cleaned_value in self.standard_map
|
||
else self.standard_map[value]
|
||
)
|
||
continue
|
||
|
||
if value.lower() in self.standard_map:
|
||
self.remap_dict[value] = self.standard_map[value.lower()]
|
||
continue
|
||
|
||
# Exact Match in Standard Values
|
||
if cleaned_value in self.standard_values:
|
||
self.remap_dict[value] = cleaned_value
|
||
continue
|
||
|
||
# Fuzzy Matching
|
||
fuzzy_match = self.fuzzy_match(cleaned_value)
|
||
if fuzzy_match:
|
||
self.remap_dict[value] = fuzzy_match
|
||
continue
|
||
|
||
# Capture anything that wasn't mapped
|
||
unmapped_values.append(value)
|
||
|
||
# AI Model - remap anything unmapped (batch request)
|
||
ai_mapping = self.ai_standardize(unmapped_values)
|
||
self.remap_dict.update(ai_mapping)
|
||
|
||
return self.remap_dict
|
||
|
||
def report_usage(self):
|
||
"""Prints a summary of token usage and cost."""
|
||
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
|
||
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
|
||
|
||
|
||
class AssetList:
|
||
"""
|
||
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
|
||
"""
|
||
|
||
EPC_API_DATA_NAMES = {
|
||
"uprn": "epc_os_uprn",
|
||
"address1": "epc_address1",
|
||
"address": "epc_address",
|
||
"postcode": "epc_postcode",
|
||
"inspection-date": "epc_inspection_date",
|
||
"current-energy-efficiency": "epc_sap_score_on_register",
|
||
"current-energy-rating": "epc_rating_on_register",
|
||
"property-type": "epc_property_type",
|
||
"built-form": "epc_archetype",
|
||
"total-floor-area": "epc_total_floor_area",
|
||
"construction-age-band": "epc_age_band",
|
||
"floor-height": "epc_floor_height",
|
||
"number-habitable-rooms": "epc_number_habitable_rooms",
|
||
"walls-description": "epc_wall_construction",
|
||
"roof-description": "epc_roof_construction",
|
||
"floor-description": "epc_floor_construction",
|
||
"mainheat-description": "epc_heating_type",
|
||
"mainheatcont-description": "epc_heating_controls",
|
||
"secondheat-description": "epc_secondary_heating",
|
||
"transaction-type": "epc_reason",
|
||
"energy-consumption-current": "epc_heat_demand",
|
||
"photo-supply": "epc_photo_supply",
|
||
"estimated": "estimated",
|
||
}
|
||
FIND_EPC_DATA_NAMES = {
|
||
"heating_text": "epc_estiamted_heating_kwh",
|
||
"hot_water_text": "epc_estimated_hotwater_kwh",
|
||
"Assessor’s name": "epc_assessor_name",
|
||
"Assessor's Telephone": "epc_assessor_telephone",
|
||
"Assessor's Email": "epc_assessor_email",
|
||
"Accreditation scheme": "epc_assessor_accreditation",
|
||
"Assessor’s ID": "epc_assessor_id",
|
||
"Solar photovoltaics": "epc_solar_pv",
|
||
}
|
||
|
||
DATETIME_REMAP = {
|
||
"Pre 1900": datetime(year=1899, month=12, day=31),
|
||
}
|
||
|
||
# These are the accepted methods we have for cleaning the address1 column
|
||
ADDRESS_1_CLEANING_METHODS = [
|
||
"first_two_words", # This method will split on the fist two words, where the separator is a space
|
||
"first_word", # This method will split on the first word, where the separator is a space
|
||
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
|
||
# "address1_extraction" # This method will use the NLP model to extract address1
|
||
]
|
||
|
||
# Standard column Names
|
||
STANDARD_ADDRESS_1 = "domna_address_1"
|
||
STANDARD_POSTCODE = "domna_postcode"
|
||
STANDARD_FULL_ADDRESS = "domna_full_address"
|
||
STANDARD_YEAR_BUILT = "landlord_year_built"
|
||
STANDARD_UPRN = "ordnance_survey_uprn"
|
||
STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id"
|
||
STANDARD_PROPERTY_TYPE = "landlord_property_type"
|
||
STANDARD_BUILT_FORM = "landlord_built_form"
|
||
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
|
||
STANDARD_ROOF_CONSTRUCTION = "landlord_roof_construction"
|
||
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
|
||
STANDARD_EXISTING_PV = "landlord_existing_pv"
|
||
STANDARD_SAP = "landlord_sap_rating"
|
||
STANDARD_BLOCK_REFERENCE = "landlord_block_reference"
|
||
|
||
DOMNA_PROPERTY_ID = "domna_property_id"
|
||
|
||
# Regular expression for identifying if the address might point to multiple units
|
||
MULTI_UNIT_REGEX = re.compile(r"\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b")
|
||
|
||
# List of columns relating to the non-intrusive data
|
||
NON_INTRUSIVES_COLNAMES = [
|
||
"Archetype",
|
||
"Construction",
|
||
"Insulated",
|
||
"Material",
|
||
"CIGA Check Required",
|
||
"PV, ACCESS ISSUE, SEE NOTES",
|
||
"OFF GAS - ROOF ORIENTATION",
|
||
"Any further surveyor notes",
|
||
"Surveyors Name",
|
||
]
|
||
|
||
NON_INTRUSIVES_NEW_FORMAT_COLNAMES = [
|
||
"Has the property been re-walled?",
|
||
"Is the property tile hung?",
|
||
"Does the property have a render?",
|
||
"Does the property have cladding?",
|
||
"Gable Wall Obstructions",
|
||
"Does the property have foliage that needs removal?",
|
||
"Potential unsafe environment",
|
||
"Date of Inspection",
|
||
"Borescoped?",
|
||
]
|
||
|
||
# Another version of non-intrusives:
|
||
NON_INTRUSIVES_NEW_FORMAT_COLNAMES_V2 = [
|
||
"Archetype",
|
||
"Archetype 2",
|
||
"Construction",
|
||
"Insulated",
|
||
"Material",
|
||
"Borescoped?",
|
||
"CIGA Check Required",
|
||
"ROOF ORIENTATION",
|
||
"TILE HUNG",
|
||
"RENDERED",
|
||
"CLADDING",
|
||
"ACCESS ISSUES",
|
||
"FURTHER SURVEYOR NOTES",
|
||
"DATE",
|
||
"NAME OF SURVEYOR",
|
||
]
|
||
|
||
# Solar non-intrusive fields
|
||
NON_INTRUSIVES_SOLAR_COLNAMES = [
|
||
"PV, ACCESS ISSUE, SEE NOTES",
|
||
"ROOF ORIENTATION",
|
||
"AREA (m²) OF ROOF WHERE PV WILL BE SITUATED ",
|
||
"SHADING",
|
||
"Roof Tiles - CONCRETE/SLATE/ROSEMARY",
|
||
"NO. OF PANELS (Typical size of 420W panel is 1mx1.7m and need 30cm all the way around panels)",
|
||
"SCAFFOLD REQUIRED? IF YES, ARE THERE ANY SURROUNDING ACCESS ISSUES - PLEASE DESCRIBE",
|
||
"IF PANELS ARE GOING ON REAR PLEASE CHECK FOR SPACE FOR SCAFFOLDING - DESCRIBE ANY ISSUES BELOW",
|
||
"DATE",
|
||
"NAME OF SURVEYOR",
|
||
]
|
||
|
||
NON_INTRUSIVES_ELIGIBILITY_COLUMN = "Eligibility (Red/Yellow/Green)"
|
||
|
||
OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ["WFT Findings", "ECO Eligibility"]
|
||
|
||
# This SAP threshold is a key search criteria for properties that may be eligible for extraction
|
||
FILLED_CAVITY_SAP_THRESHOLD = 75
|
||
# This SAP the
|
||
EMPTY_CAVITY_SAP_THRESHOLD = 75
|
||
# Any EPC deemed to have been conducted prior to this year is deemed to be unreliable
|
||
EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5
|
||
|
||
# Properties before this year are more likely to have lower EPC ratings and more likely to qualify
|
||
EMPTY_CAVITY_YEAR_THRESHOLD = 2002
|
||
|
||
# Attributes - these are columns that we produce, calcualted based on other pieces of data
|
||
ATTRIBUTE_HAS_SOLAR = "attribute_has_solar"
|
||
ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors"
|
||
ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter"
|
||
ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area"
|
||
ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness"
|
||
ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = (
|
||
f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below"
|
||
)
|
||
ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}"
|
||
|
||
# These are the descriptions that we look for in the EPC data that are indicative of no insulation
|
||
EPC_NO_WALL_INSULATION_DESCRIPTIONS = [
|
||
"cavity wall, as built, no insulation (assumed)",
|
||
"cavity wall, as built, partial insulation (assumed)",
|
||
"cavity wall, as built, partial insulation",
|
||
"cavity wall, as built, no insulation",
|
||
]
|
||
|
||
# List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated
|
||
EPC_INSULATED_WALLS_SUBSTRINGS = [
|
||
", insulated",
|
||
"with external insulation",
|
||
"with internal insulation",
|
||
"filled cavity",
|
||
]
|
||
|
||
# List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated
|
||
EPC_INSULATED_ROOF_SUBSTRINGS = [
|
||
"(another dwelling above)",
|
||
", insulated",
|
||
", insulated (assumed) ",
|
||
", ceiling insulated",
|
||
]
|
||
|
||
# List of strings we look for in the EPC data, where substrings indicate that the cavity is empty
|
||
UNINSULATED_CAVITY_SUBSTRINGS = [
|
||
"cavity wall, as built, no insulation (assumed)",
|
||
"cavity wall, as built, no insulation",
|
||
"cavity wall, as built, partial insulation (assumed)",
|
||
"cavity wall, as built, partial insulation",
|
||
]
|
||
|
||
# Work type prefixes:
|
||
# Empties
|
||
EMPTY_CAVITY_NON_INTRUSIVE = "Non-Intrusive Data Shows Empty Cavity"
|
||
EMPTY_CAVITY_NON_INTRUSIVE_YEAR = (
|
||
"Non-Intrusive Data Shows Empty Cavity, built after 2002"
|
||
)
|
||
EPC_EMPTY_INSPECTIONS_RETRO_DRILLED = (
|
||
"EPC Shows Empty Cavity, inspections show retro drilled"
|
||
)
|
||
EPC_EMPTY_INSPECTIONS_FILLED = (
|
||
"EPC Shows Empty Cavity, inspections show filled or other"
|
||
)
|
||
EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD = (
|
||
"EPC Shows Empty Cavity, inspections show filled at build"
|
||
)
|
||
EPC_EMPTY_INSPECTIONS_NON_CAVITY = (
|
||
"EPC Shows Empty Cavity, inspections show non-cavity build"
|
||
)
|
||
EPC_EMPTY = "EPC Shows Empty Cavity"
|
||
LANDLORD_EMPTY_INSPECTIONS_OTHER = (
|
||
"Landlord Data Shows Empty Cavity, EPC & Inspections Shows Filled or "
|
||
"Non-cavity"
|
||
)
|
||
# Extraction
|
||
EXTRACTION_NON_INTRUSIVE = "Non-Intrusive Data Shows Cavity Extraction"
|
||
|
||
# Solar
|
||
SOLAR_ELIGIBLE = "Solar Eligible"
|
||
SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED = (
|
||
"Solar Eligible, Solid Wall Uninsulated, EPC E or Below"
|
||
)
|
||
SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE = "Solar Eligible, Needs Heating Upgrade"
|
||
|
||
CRM_HISTORICAL_CAVITY_PRODUCT = {
|
||
"id": 156989182176,
|
||
"unit_price": 0,
|
||
"name": "Historical ECO Cavity",
|
||
}
|
||
|
||
CRM_PRODUCTS = {
|
||
"Empty Cavity - ECO4": {
|
||
"id": 82733738177,
|
||
"unit_price": 1000,
|
||
"name": "Empty Cavity - ECO4",
|
||
},
|
||
"Extract & Fill - ECO4": {
|
||
"id": 100307905778,
|
||
"unit_price": 500,
|
||
"name": "Extract & Fill - ECO4",
|
||
},
|
||
"Solar PV - ECO4": {
|
||
"id": 82623589564,
|
||
"unit_price": 1608,
|
||
"name": "Solar PV - ECO4",
|
||
},
|
||
"Solar PV + HHRSH - ECO4": {
|
||
"id": 155529972924,
|
||
"unit_price": 1608,
|
||
"name": "Solar PV + HHRSH - ECO4",
|
||
},
|
||
"Solar PV + Heating Upgrade - ECO4": {
|
||
"id": 109265426665,
|
||
"unit_price": 1608,
|
||
"name": "Solar PV + Heating Upgrade - ECO4",
|
||
},
|
||
"Historical ECO Cavity": CRM_HISTORICAL_CAVITY_PRODUCT,
|
||
}
|
||
|
||
def __init__(
|
||
self,
|
||
local_filepath,
|
||
sheet_name,
|
||
address1_colname,
|
||
postcode_colname,
|
||
full_address_colname,
|
||
landlord_property_id=None,
|
||
full_address_cols_to_concat=None,
|
||
missing_postcodes_method=None,
|
||
address1_extraction_method=None,
|
||
landlord_year_built=None,
|
||
landlord_uprn=None,
|
||
landlord_property_type=None,
|
||
landlord_built_form=None,
|
||
landlord_wall_construction=None,
|
||
landlord_roof_construction=None,
|
||
landlord_heating_system=None,
|
||
landlord_existing_pv=None,
|
||
landlord_sap=None,
|
||
landlord_block_reference=None,
|
||
phase=False,
|
||
header=0,
|
||
):
|
||
self.local_filepath = local_filepath
|
||
self.sheet_name = sheet_name
|
||
# Read in the data
|
||
if local_filepath.endswith(".xlsx"):
|
||
self.raw_asset_list = pd.read_excel(
|
||
local_filepath, header=header, sheet_name=sheet_name
|
||
)
|
||
else:
|
||
self.raw_asset_list = pd.read_csv(local_filepath)
|
||
self.standardised_asset_list = self.raw_asset_list.copy()
|
||
# Will be used to store aggregated figures against the various work types
|
||
self.work_type_figures = {}
|
||
self.block_analysis_df = None
|
||
self.duplicated_addresses = None
|
||
self.contact_details = None
|
||
self.contact_detail_fields = None
|
||
self.outcomes = None
|
||
self.outcomes_no_match = pd.DataFrame()
|
||
self.outcomes_for_output = pd.DataFrame()
|
||
self.master_surveyed = None
|
||
self.unmatched_submissions = pd.DataFrame()
|
||
self.ecosurv = None
|
||
self.ecosurv_no_match = pd.DataFrame()
|
||
self.geographical_areas = pd.DataFrame()
|
||
|
||
# When this is True, we intend to break the programme into multiple phases. We may need to review
|
||
# how this is structured in the future, as depending on how we get future data, we may need to
|
||
# remove some existing phases from the reporting, or specifically highlight the phase (1 to n-1)
|
||
# properties, assuming the current phase is n.
|
||
self.phase = phase
|
||
|
||
# We detect the presence of the non-intrusive columns
|
||
self.non_intrusives_present = (
|
||
"CIGA Check Required" in self.raw_asset_list.columns
|
||
)
|
||
# We detect if we have the old format of non-intruvies
|
||
self.old_format_non_intrusives_present = (
|
||
"WFT Findings" in self.raw_asset_list.columns
|
||
)
|
||
if self.old_format_non_intrusives_present:
|
||
self.non_intrusives_present = False
|
||
|
||
self.non_intrusives_eligibility = (
|
||
"Eligibility (Red/Yellow/Green)" in self.raw_asset_list.columns
|
||
)
|
||
|
||
self.new_format_non_insturives_present = (
|
||
"Has the property been re-walled?" in self.raw_asset_list.columns
|
||
)
|
||
|
||
self.new_format_non_insturives_present_v2 = (
|
||
"TILE HUNG" in self.raw_asset_list.columns
|
||
)
|
||
|
||
self.solar_non_intrusives_present = (
|
||
"AREA (m²) OF ROOF WHERE PV WILL BE SITUATED" in self.raw_asset_list.columns
|
||
)
|
||
|
||
# Names of columns
|
||
self.landlord_property_id = landlord_property_id
|
||
self.address1_colname = address1_colname
|
||
self.postcode_colname = postcode_colname
|
||
self.full_address_colname = full_address_colname
|
||
self.landlord_year_built = landlord_year_built
|
||
self.landlord_uprn = landlord_uprn
|
||
self.landlord_property_type = landlord_property_type
|
||
self.landlord_built_form = landlord_built_form
|
||
self.landlord_wall_construction = landlord_wall_construction
|
||
self.landlord_roof_construction = landlord_roof_construction
|
||
self.landlord_heating_system = landlord_heating_system
|
||
self.landlord_existing_pv = landlord_existing_pv
|
||
self.landlord_sap = landlord_sap
|
||
self.landlord_block_reference = landlord_block_reference
|
||
|
||
# parameters for cleaning
|
||
self.full_address_cols_to_concat = full_address_cols_to_concat
|
||
self.missing_postcodes_method = missing_postcodes_method
|
||
self.address1_extraction_method = address1_extraction_method
|
||
|
||
self.debug_information = {
|
||
"property_type": None,
|
||
"wall_construction": None,
|
||
"heating_system": None,
|
||
"existing_pv": None,
|
||
}
|
||
|
||
self.variable_mappings = {}
|
||
self.hubspot_data = None
|
||
|
||
self.rename_map = {}
|
||
self.keep_variables = []
|
||
|
||
# Finally, we handle the case where the landlord's property ID is actually the OS UPRN
|
||
if (self.landlord_uprn == self.landlord_property_id) and (
|
||
self.landlord_property_id is not None
|
||
):
|
||
self.standardised_asset_list[self.STANDARD_UPRN] = (
|
||
self.standardised_asset_list[self.landlord_uprn].copy()
|
||
)
|
||
# Update the reference to landlord UPRn
|
||
self.landlord_uprn = self.STANDARD_UPRN
|
||
|
||
# Handle the case when full address and address 1 are the same
|
||
if self.full_address_colname == self.address1_colname:
|
||
self.full_address_colname = self.STANDARD_FULL_ADDRESS
|
||
self.standardised_asset_list[self.full_address_colname] = (
|
||
self.standardised_asset_list[self.address1_colname].copy()
|
||
)
|
||
|
||
# Handle the case where the property type column and built form are missing
|
||
if self.landlord_property_type is None and self.landlord_built_form is None:
|
||
if "Archetype" in self.raw_asset_list.columns:
|
||
# We use the non-intrusives as our property type and built form
|
||
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_property_type] = (
|
||
self.standardised_asset_list["Archetype"].copy()
|
||
)
|
||
self.standardised_asset_list[self.landlord_built_form] = (
|
||
self.standardised_asset_list["Archetype"].copy()
|
||
)
|
||
else:
|
||
# We use the EPC data as our property type and built form
|
||
self.landlord_property_type = self.STANDARD_PROPERTY_TYPE
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_property_type] = None
|
||
self.standardised_asset_list[self.landlord_built_form] = None
|
||
|
||
# Handle the case where the property type column is the same as the built type
|
||
if self.landlord_property_type == self.landlord_built_form:
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_built_form] = (
|
||
self.standardised_asset_list[self.landlord_property_type].copy()
|
||
)
|
||
|
||
# If landlord built form is None (which it often is) we use the built for from inspections
|
||
if (self.landlord_built_form is None) and self.non_intrusives_present:
|
||
self.landlord_built_form = self.STANDARD_BUILT_FORM
|
||
self.standardised_asset_list[self.landlord_built_form] = (
|
||
self.standardised_asset_list["Archetype"].copy()
|
||
)
|
||
|
||
self.prefixes_to_products = {
|
||
# Empty
|
||
self.EMPTY_CAVITY_NON_INTRUSIVE: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
|
||
self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED: self.CRM_PRODUCTS[
|
||
"Empty Cavity - ECO4"
|
||
],
|
||
self.EPC_EMPTY_INSPECTIONS_FILLED: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
|
||
self.EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD: self.CRM_PRODUCTS[
|
||
"Empty Cavity - ECO4"
|
||
],
|
||
self.EPC_EMPTY_INSPECTIONS_NON_CAVITY: self.CRM_PRODUCTS[
|
||
"Empty Cavity - ECO4"
|
||
],
|
||
self.EPC_EMPTY: self.CRM_PRODUCTS["Empty Cavity - ECO4"],
|
||
self.LANDLORD_EMPTY_INSPECTIONS_OTHER: self.CRM_PRODUCTS[
|
||
"Empty Cavity - ECO4"
|
||
],
|
||
# Extraction
|
||
self.EXTRACTION_NON_INTRUSIVE: self.CRM_PRODUCTS["Extract & Fill - ECO4"],
|
||
# Solar
|
||
self.SOLAR_ELIGIBLE: self.CRM_PRODUCTS["Solar PV - ECO4"],
|
||
self.SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED: self.CRM_PRODUCTS[
|
||
"Solar PV - ECO4"
|
||
],
|
||
self.SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE: self.CRM_PRODUCTS[
|
||
"Solar PV + Heating Upgrade - ECO4"
|
||
],
|
||
}
|
||
|
||
def _extract_address1(
|
||
self, asset_list, full_address_col, postcode_col, method="first_two_words"
|
||
):
|
||
|
||
if method not in self.ADDRESS_1_CLEANING_METHODS:
|
||
raise ValueError(f"Method {method} for producing address1 not recognized")
|
||
|
||
if method == "first_two_words":
|
||
asset_list[self.address1_colname] = (
|
||
asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
|
||
)
|
||
return asset_list
|
||
|
||
if method == "first_word":
|
||
asset_list[self.address1_colname] = (
|
||
asset_list[full_address_col].str.split(" ").str[0]
|
||
)
|
||
return asset_list
|
||
|
||
if method == "house_number_extraction":
|
||
asset_list[self.address1_colname] = asset_list.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
address=x[full_address_col], postcode=x[postcode_col]
|
||
),
|
||
axis=1,
|
||
)
|
||
|
||
for _, x in asset_list.iterrows():
|
||
SearchEpc.get_house_number(
|
||
address=x[full_address_col], postcode=x[postcode_col]
|
||
)
|
||
return asset_list
|
||
|
||
raise ValueError(f"Method {method} not recognized")
|
||
|
||
@staticmethod
|
||
def _address1_extraction(x):
|
||
pass
|
||
|
||
def create_property_id(self):
|
||
"""
|
||
This function creates the domna property ID, which is simply a hash of the full address and postcode
|
||
We want all figures to be positive
|
||
:return:
|
||
"""
|
||
|
||
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
|
||
|
||
def _make_hash(value):
|
||
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
|
||
# Normalize and remove special characters for cleaner ID
|
||
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
|
||
|
||
# Generate SHA-256 hash and truncate it
|
||
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
|
||
|
||
return f"{cleaned_value}-{short_hash}"
|
||
|
||
# Apply transformation
|
||
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
|
||
(
|
||
self.standardised_asset_list[self.full_address_colname]
|
||
+ self.standardised_asset_list[self.postcode_colname]
|
||
)
|
||
.str.strip()
|
||
.str.replace(r"[^\w\s]", "", regex=True)
|
||
.str.replace(" ", "")
|
||
.str.lower()
|
||
.apply(_make_hash)
|
||
)
|
||
|
||
@staticmethod
|
||
def _strip_postcode_from_full_address(full_address, postcode):
|
||
cleaned = full_address.replace(postcode, "")
|
||
# Remove any trailing commas and spaces
|
||
cleaned = cleaned.rstrip(", ").strip(",").strip()
|
||
return cleaned
|
||
|
||
@classmethod
|
||
def _identify_multi_address(cls, address):
|
||
# We check if the address is comma separated
|
||
if "," in address:
|
||
address1_section = address.split(",")[0]
|
||
# We look for string in the form (x-y)
|
||
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
|
||
|
||
@staticmethod
|
||
def _convert_uprn(x):
|
||
"""
|
||
Used to convert UPRNS to integer strings
|
||
:param x: uprn to convert
|
||
:return: converted uprn
|
||
"""
|
||
|
||
if pd.isnull(x):
|
||
return x
|
||
|
||
# check if numeric
|
||
if np.isreal(x):
|
||
return str(int(x))
|
||
|
||
if str(x).isdigit():
|
||
return str(int(x))
|
||
return x
|
||
|
||
@staticmethod
|
||
def _clean_postcode(postcode):
|
||
# Remove double spaces
|
||
postcode = postcode.replace(" ", " ")
|
||
if " " not in postcode:
|
||
# Restructure it
|
||
return " ".join([postcode[:-3], postcode[-3:]])
|
||
|
||
return postcode
|
||
|
||
def init_standardise(self):
|
||
"""
|
||
This function is used to standardise the asset list
|
||
:return: standardised asset list
|
||
"""
|
||
|
||
# Remove rows without a postcode
|
||
if self.postcode_colname is not None:
|
||
self.standardised_asset_list = self.standardised_asset_list.dropna(
|
||
subset=[self.postcode_colname]
|
||
)
|
||
# We also clean postcode columns where if there is not space, we create one
|
||
self.standardised_asset_list[self.postcode_colname] = (
|
||
self.standardised_asset_list[self.postcode_colname].apply(
|
||
self._clean_postcode
|
||
)
|
||
)
|
||
|
||
# We clean up portential non-breaking spaces, and double spaces
|
||
for col in [
|
||
c
|
||
for c in [
|
||
self.postcode_colname,
|
||
self.full_address_colname,
|
||
self.address1_colname,
|
||
]
|
||
if c is not None
|
||
]:
|
||
self.standardised_asset_list[col] = self.standardised_asset_list[
|
||
col
|
||
].astype(str)
|
||
self.standardised_asset_list[col] = self.standardised_asset_list[
|
||
col
|
||
].str.replace("\xa0", " ", regex=False)
|
||
self.standardised_asset_list[col] = self.standardised_asset_list[
|
||
col
|
||
].str.replace(" ", " ", regex=False)
|
||
|
||
if self.address1_colname is None:
|
||
if self.address1_extraction_method is None:
|
||
raise ValueError(
|
||
"Missing address 1 - please specify an extraction method"
|
||
)
|
||
self.address1_colname = self.STANDARD_ADDRESS_1
|
||
# If we do not have this, we produce it
|
||
self.standardised_asset_list = self._extract_address1(
|
||
asset_list=self.standardised_asset_list,
|
||
full_address_col=self.full_address_colname,
|
||
postcode_col=self.postcode_colname,
|
||
method=self.address1_extraction_method,
|
||
)
|
||
|
||
if self.full_address_colname is None:
|
||
if not self.full_address_cols_to_concat:
|
||
raise ValueError(
|
||
"Missing full address - please specify columns to concatenate"
|
||
)
|
||
self.full_address_colname = self.STANDARD_FULL_ADDRESS
|
||
self.standardised_asset_list[self.full_address_colname] = (
|
||
self.standardised_asset_list[self.full_address_cols_to_concat].apply(
|
||
lambda x: ", ".join([y for y in x if not pd.isnull(y)]), axis=1
|
||
)
|
||
)
|
||
else:
|
||
|
||
# Make sure to strip the postcode out of the full address
|
||
self.standardised_asset_list[self.full_address_colname] = (
|
||
self.standardised_asset_list.apply(
|
||
lambda x: self._strip_postcode_from_full_address(
|
||
full_address=x[self.full_address_colname],
|
||
postcode=x[self.postcode_colname],
|
||
),
|
||
axis=1,
|
||
)
|
||
)
|
||
|
||
# We create the domna property id
|
||
self.create_property_id()
|
||
|
||
# Clean up the UPRN column, if the landlord has provided them
|
||
if self.landlord_uprn is not None:
|
||
self.standardised_asset_list[self.landlord_uprn] = (
|
||
self.standardised_asset_list[self.landlord_uprn].apply(
|
||
self._convert_uprn
|
||
)
|
||
)
|
||
|
||
# We keep just the columns we care about and will work through the various columns and standardise
|
||
variables = [
|
||
self.landlord_property_id,
|
||
self.DOMNA_PROPERTY_ID,
|
||
self.address1_colname,
|
||
self.postcode_colname,
|
||
self.full_address_colname,
|
||
self.landlord_uprn,
|
||
self.landlord_property_type,
|
||
self.landlord_built_form,
|
||
self.landlord_year_built,
|
||
self.landlord_wall_construction,
|
||
self.landlord_roof_construction,
|
||
self.landlord_heating_system,
|
||
self.landlord_existing_pv,
|
||
self.landlord_sap,
|
||
self.landlord_block_reference,
|
||
]
|
||
# Keep just non-null variables (e.g landlord may not provide uprn
|
||
self.keep_variables = [v for v in variables if v is not None]
|
||
self.rename_map = {
|
||
self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
self.address1_colname: self.STANDARD_ADDRESS_1,
|
||
self.postcode_colname: self.STANDARD_POSTCODE,
|
||
self.full_address_colname: self.STANDARD_FULL_ADDRESS,
|
||
self.landlord_uprn: self.STANDARD_UPRN,
|
||
self.landlord_property_type: self.STANDARD_PROPERTY_TYPE,
|
||
self.landlord_built_form: self.STANDARD_BUILT_FORM,
|
||
self.landlord_year_built: self.STANDARD_YEAR_BUILT,
|
||
self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION,
|
||
self.landlord_roof_construction: self.STANDARD_ROOF_CONSTRUCTION,
|
||
self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM,
|
||
self.landlord_existing_pv: self.STANDARD_EXISTING_PV,
|
||
self.landlord_sap: self.STANDARD_SAP,
|
||
self.landlord_block_reference: self.STANDARD_BLOCK_REFERENCE,
|
||
}
|
||
self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None}
|
||
|
||
non_intrusive_columns = []
|
||
if (
|
||
self.non_intrusives_present
|
||
and not self.new_format_non_insturives_present_v2
|
||
):
|
||
non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES
|
||
|
||
if self.non_intrusives_eligibility:
|
||
non_intrusive_columns.append(self.NON_INTRUSIVES_ELIGIBILITY_COLUMN)
|
||
|
||
if self.new_format_non_insturives_present:
|
||
non_intrusive_columns += self.NON_INTRUSIVES_NEW_FORMAT_COLNAMES
|
||
|
||
if self.new_format_non_insturives_present_v2:
|
||
non_intrusive_columns += self.NON_INTRUSIVES_NEW_FORMAT_COLNAMES_V2
|
||
|
||
if self.solar_non_intrusives_present:
|
||
non_intrusive_columns += self.NON_INTRUSIVES_SOLAR_COLNAMES
|
||
|
||
if self.old_format_non_intrusives_present:
|
||
# We check if we have the ECO Eligibility column, which we might not have
|
||
non_intrusive_columns = [
|
||
c
|
||
for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES
|
||
if c in self.standardised_asset_list.columns
|
||
]
|
||
|
||
if "Warmfront Finding" in self.standardised_asset_list.columns:
|
||
non_intrusive_columns.append("Warmfront Finding")
|
||
|
||
self.keep_variables += non_intrusive_columns
|
||
|
||
self.rename_map = {
|
||
**self.rename_map,
|
||
**dict(
|
||
zip(
|
||
non_intrusive_columns,
|
||
["non-intrusives: " + c for c in non_intrusive_columns],
|
||
)
|
||
),
|
||
}
|
||
|
||
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
|
||
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
|
||
self.full_address_colname
|
||
].apply(lambda x: self._identify_multi_address(x))
|
||
|
||
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
|
||
# we see instances of "average thermal transmittance" in the description
|
||
if self.landlord_wall_construction is not None:
|
||
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
|
||
self.standardised_asset_list[self.landlord_wall_construction]
|
||
.str.lower()
|
||
.str.contains("average thermal transmittance")
|
||
== True,
|
||
"new build - average thermal transmittance",
|
||
self.standardised_asset_list[self.landlord_wall_construction],
|
||
)
|
||
else:
|
||
# We want to make sure that we have a column for wall construction
|
||
self.landlord_wall_construction = self.STANDARD_WALL_CONSTRUCTION
|
||
self.standardised_asset_list[self.landlord_wall_construction] = None
|
||
|
||
if self.landlord_roof_construction is None:
|
||
self.landlord_roof_construction = self.STANDARD_ROOF_CONSTRUCTION
|
||
self.standardised_asset_list[self.landlord_roof_construction] = None
|
||
|
||
# Clear our build year column
|
||
# We attempt to process the year built column
|
||
if self.landlord_year_built is not None:
|
||
# We check if we have a datetime - year built has not been renamed
|
||
if isinstance(
|
||
self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime
|
||
):
|
||
# We treat any string columns - with common values we see
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].replace(
|
||
self.DATETIME_REMAP
|
||
)
|
||
)
|
||
|
||
no_data_codes = {"No Data": None}
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].replace(
|
||
no_data_codes
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
|
||
self.standardised_asset_list[self.landlord_year_built]
|
||
)
|
||
# Convert this to year
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].dt.year
|
||
)
|
||
else:
|
||
# We attempt to convert the year built to a datetime, by detecting the format and converting
|
||
|
||
def extract_year(date_str):
|
||
known_errors = {
|
||
"#MULTIVALUE",
|
||
"ND",
|
||
"PIMSS EMPTY",
|
||
"UNKNOWN",
|
||
"This cell has an external reference that can't be shown or edited. Editing this cell will "
|
||
"remove the external reference.",
|
||
0,
|
||
}
|
||
|
||
if pd.isnull(date_str) or date_str in known_errors:
|
||
return None
|
||
|
||
# Handle datetime
|
||
if isinstance(date_str, datetime):
|
||
return date_str.year
|
||
|
||
# Handle numeric year (float or int)
|
||
if isinstance(date_str, (int, float, np.int_)):
|
||
if 1000 <= int(date_str) <= 2100:
|
||
return int(date_str)
|
||
|
||
# Now handle string-based logic
|
||
if isinstance(date_str, str):
|
||
# Direct date match e.g. 01-Jul-2021
|
||
match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str)
|
||
if match:
|
||
return int(match.group(1))
|
||
|
||
# Find all 4-digit years in string
|
||
years = [
|
||
int(y) for y in re.findall(r"\b(?:19|20)\d{2}\b", date_str)
|
||
]
|
||
if years:
|
||
return max(years) # Return most recent year
|
||
|
||
# If only numbers are present without format
|
||
numeric_str = re.sub(r"\D", "", date_str)
|
||
if len(numeric_str) == 4 and numeric_str.isdigit():
|
||
return int(numeric_str)
|
||
|
||
raise NotImplementedError(
|
||
f"Unhandled format for year built, value is {date_str} - implement me"
|
||
)
|
||
|
||
self.standardised_asset_list[self.landlord_year_built] = (
|
||
self.standardised_asset_list[self.landlord_year_built].apply(
|
||
extract_year
|
||
)
|
||
)
|
||
|
||
# We now create standard lookups
|
||
to_remap = {
|
||
self.landlord_property_type: {
|
||
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
|
||
"standard_map": property_type_mappings.PROPERTY_MAPPING,
|
||
},
|
||
self.landlord_built_form: {
|
||
"standard_values": built_form_mappings.STANDARD_BUILT_FORMS,
|
||
"standard_map": built_form_mappings.BUILT_FORM_MAPPINGS,
|
||
},
|
||
self.landlord_wall_construction: {
|
||
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
|
||
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS,
|
||
},
|
||
self.landlord_heating_system: {
|
||
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
|
||
"standard_map": heating_mappings.HEATING_MAPPINGS,
|
||
},
|
||
self.landlord_existing_pv: {
|
||
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
|
||
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS,
|
||
},
|
||
self.landlord_roof_construction: {
|
||
"standard_values": roof_mappings.STANDARD_ROOF_CONSTRUCTIONS,
|
||
"standard_map": roof_mappings.ROOF_CONSTRUCTION_MAPPINGS,
|
||
},
|
||
}
|
||
# Keep just entries where the key is not None
|
||
to_remap = {k: v for k, v in to_remap.items() if k is not None}
|
||
|
||
for variable, config in to_remap.items():
|
||
logger.info("Standardising variable: %s", variable)
|
||
# Strip each of these columns
|
||
self.standardised_asset_list[variable] = self.standardised_asset_list[
|
||
variable
|
||
].str.strip()
|
||
values_to_remap = self.standardised_asset_list[variable].unique()
|
||
# We want to map this to our standardised list of property types we're interested in
|
||
remapper = DataRemapper(
|
||
standard_values=config["standard_values"],
|
||
standard_map=config["standard_map"],
|
||
)
|
||
remap_dictionary = remapper.standardize_list(
|
||
values_to_remap=values_to_remap.tolist()
|
||
)
|
||
self.variable_mappings[variable] = remap_dictionary
|
||
|
||
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
|
||
# asset list is returned
|
||
for variable, mapping in self.variable_mappings.items():
|
||
pprint(f"Variable: {variable}")
|
||
pprint(mapping)
|
||
# Print a space
|
||
print("\n")
|
||
pprint("=======================================")
|
||
|
||
def apply_standardiation(self, override_empty_mappings=False):
|
||
"""
|
||
This function applies the standardisation to the asset list
|
||
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
|
||
if there are no categories which need remapping which is highly unlikely
|
||
:return:
|
||
"""
|
||
|
||
if self.phase:
|
||
# We filter on just the properties that have had an inspection
|
||
if (
|
||
self.new_format_non_insturives_present_v2
|
||
or self.solar_non_intrusives_present
|
||
):
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
~self.standardised_asset_list["NAME OF SURVEYOR"].isin(
|
||
["YET TO BE SURVEYED", "", None]
|
||
)
|
||
]
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
~pd.isnull(self.standardised_asset_list["NAME OF SURVEYOR"])
|
||
]
|
||
else:
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
~self.standardised_asset_list["Surveyors Name"].isin(
|
||
["YET TO BE SURVEYED"]
|
||
)
|
||
]
|
||
|
||
if not self.variable_mappings and not override_empty_mappings:
|
||
raise ValueError("Please run init_standardise first")
|
||
|
||
logger.info("Applying standardisation to asset list")
|
||
|
||
for variable, mapping in self.variable_mappings.items():
|
||
self.standardised_asset_list[variable + "_original_from_landlord"] = (
|
||
self.standardised_asset_list[variable].copy()
|
||
)
|
||
self.standardised_asset_list[variable] = self.standardised_asset_list[
|
||
variable
|
||
].map(mapping)
|
||
|
||
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
# Drop the dupes
|
||
pprint(
|
||
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
|
||
f"addresses - dropping"
|
||
)
|
||
|
||
# Keep a record of duplicates
|
||
self.duplicated_addresses = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
|
||
][
|
||
[
|
||
self.DOMNA_PROPERTY_ID,
|
||
self.full_address_colname,
|
||
self.address1_colname,
|
||
self.postcode_colname,
|
||
]
|
||
].copy()
|
||
|
||
df = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.DOMNA_PROPERTY_ID].isin(
|
||
self.duplicated_addresses[self.DOMNA_PROPERTY_ID]
|
||
)
|
||
][
|
||
[
|
||
self.landlord_property_id,
|
||
self.DOMNA_PROPERTY_ID,
|
||
self.full_address_colname,
|
||
self.address1_colname,
|
||
self.postcode_colname,
|
||
]
|
||
].copy()
|
||
|
||
df = df.sort_values(by=[self.DOMNA_PROPERTY_ID])
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
|
||
]
|
||
|
||
# Apply renames to our standard names
|
||
# Perform final variable selection and renaming:
|
||
|
||
# We add the original columns to the keep variables
|
||
self.keep_variables += [
|
||
k + "_original_from_landlord" for k in self.variable_mappings.keys()
|
||
]
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
self.keep_variables
|
||
].rename(columns=self.rename_map)
|
||
|
||
# We fill any standard columns that are not in the data because they were not provided by the landlord
|
||
missing_variables = [
|
||
v
|
||
for v in [
|
||
self.STANDARD_EXISTING_PV,
|
||
self.STANDARD_HEATING_SYSTEM,
|
||
self.STANDARD_UPRN,
|
||
self.STANDARD_PROPERTY_TYPE,
|
||
self.STANDARD_YEAR_BUILT,
|
||
self.STANDARD_WALL_CONSTRUCTION,
|
||
self.STANDARD_HEATING_SYSTEM,
|
||
self.STANDARD_BLOCK_REFERENCE,
|
||
]
|
||
if v not in self.standardised_asset_list.columns
|
||
]
|
||
for v in missing_variables:
|
||
self.standardised_asset_list[v] = None
|
||
|
||
# Convert to string
|
||
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = (
|
||
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str)
|
||
)
|
||
|
||
# CLean up the standard SAP column, that can be problematic
|
||
if self.landlord_sap is not None:
|
||
self.standardised_asset_list[self.STANDARD_SAP] = (
|
||
self.standardised_asset_list[self.STANDARD_SAP]
|
||
.astype(str)
|
||
.str.replace("\xa0", " ", regex=False)
|
||
.str.strip()
|
||
)
|
||
self.standardised_asset_list[self.STANDARD_SAP] = np.where(
|
||
self.standardised_asset_list[self.STANDARD_SAP] == "",
|
||
None,
|
||
self.standardised_asset_list[self.STANDARD_SAP],
|
||
)
|
||
self.standardised_asset_list[self.STANDARD_SAP] = (
|
||
self.standardised_asset_list[self.STANDARD_SAP].astype(float)
|
||
)
|
||
# If it's zero, we set it to None
|
||
self.standardised_asset_list[self.STANDARD_SAP] = np.where(
|
||
self.standardised_asset_list[self.STANDARD_SAP] == 0,
|
||
None,
|
||
self.standardised_asset_list[self.STANDARD_SAP],
|
||
)
|
||
|
||
has_blocks_of_flats = (
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
|
||
== "block of flats"
|
||
).sum()
|
||
|
||
# Perform block splitting, ahead of fetching the EPC data
|
||
# If we blocks of flats, without a landlord block reference, we create this
|
||
self.fill_landlord_block_reference(has_blocks_of_flats)
|
||
|
||
# If we have blocks of flats, we split these out into individual units.
|
||
self.split_blocks()
|
||
|
||
def merge_data(self, df: pd.DataFrame):
|
||
"""
|
||
Used to insert data into the standardised asset list, based on the domna property id
|
||
:return:
|
||
"""
|
||
if self.DOMNA_PROPERTY_ID not in df.columns:
|
||
raise ValueError(
|
||
f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}"
|
||
)
|
||
|
||
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
df = df.drop_duplicates(subset=[self.DOMNA_PROPERTY_ID], keep="first")
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
df, how="left", on=self.DOMNA_PROPERTY_ID
|
||
)
|
||
|
||
def extract_attributes(self, pull_epc=True):
|
||
# Used to extracty the typical attributes that we use to identify viable work
|
||
|
||
self.standardised_asset_list[
|
||
self.ATTRIBUTE_HAS_SOLAR
|
||
] = self.standardised_asset_list[
|
||
self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]
|
||
] | ~self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["photo-supply"]
|
||
].isin(
|
||
["0.0", 0, None, "", np.nan]
|
||
)
|
||
|
||
accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"]
|
||
|
||
# The logic here is:
|
||
# 1) Take the property type provided by the HA themselves
|
||
# 2) In absence of that, take the EPC property type
|
||
# 3) Otherwise use None
|
||
self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = (
|
||
self.standardised_asset_list.apply(
|
||
lambda x: estimate_number_of_floors(
|
||
property_type=(
|
||
str(x[self.STANDARD_PROPERTY_TYPE]).title()
|
||
if str(x[self.STANDARD_PROPERTY_TYPE]).title()
|
||
in accepted_epc_property_types
|
||
else (
|
||
x[self.EPC_API_DATA_NAMES["property-type"]]
|
||
if not pd.isnull(
|
||
x[self.EPC_API_DATA_NAMES["property-type"]]
|
||
)
|
||
else None
|
||
)
|
||
)
|
||
),
|
||
axis=1,
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["total-floor-area"]
|
||
].astype(float)
|
||
)
|
||
# Replace "" value with None
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["number-habitable-rooms"]
|
||
] = self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["number-habitable-rooms"]
|
||
].replace(
|
||
"", None
|
||
)
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["number-habitable-rooms"]
|
||
] = self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["number-habitable-rooms"]
|
||
].astype(
|
||
float
|
||
)
|
||
|
||
# Estimate the perimeter
|
||
# Handle funky edge case
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = (
|
||
np.where(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["total-floor-area"]
|
||
]
|
||
== 0
|
||
),
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["total-floor-area"]
|
||
].mean(),
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["total-floor-area"]
|
||
],
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = (
|
||
self.standardised_asset_list.apply(
|
||
lambda x: estimate_perimeter(
|
||
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]]
|
||
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
|
||
num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]]
|
||
/ x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
|
||
),
|
||
axis=1,
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = (
|
||
self.standardised_asset_list.apply(
|
||
lambda x: estimate_external_wall_area(
|
||
num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
|
||
floor_height=(
|
||
float(x[self.EPC_API_DATA_NAMES["floor-height"]])
|
||
if not pd.isnull(x[self.EPC_API_DATA_NAMES["floor-height"]])
|
||
else 2.5
|
||
),
|
||
perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER],
|
||
built_form=x[self.EPC_API_DATA_NAMES["built-form"]],
|
||
),
|
||
axis=1,
|
||
)
|
||
)
|
||
|
||
col = self.EPC_API_DATA_NAMES["roof-description"]
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = (
|
||
self.standardised_asset_list.apply(
|
||
lambda x: (
|
||
RoofAttributes(description=x[col]).process()["insulation_thickness"]
|
||
if not pd.isnull(x[col])
|
||
else None
|
||
),
|
||
axis=1,
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = (
|
||
self.standardised_asset_list[
|
||
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
|
||
].str.replace("+", "")
|
||
)
|
||
|
||
# We produce some additional fields
|
||
# 1) Is the SAP rating below C75
|
||
self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
].astype(float)
|
||
<= self.FILLED_CAVITY_SAP_THRESHOLD
|
||
)
|
||
# 2) Flag anything where the EPC is older than 5 years
|
||
self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = (
|
||
pd.to_datetime(
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]]
|
||
).dt.year
|
||
< self.EPC_YEAR_THRESHOLD
|
||
)
|
||
|
||
self.process_age_band()
|
||
|
||
def process_age_band(self):
|
||
processed_age_band = []
|
||
for _, x in self.standardised_asset_list.iterrows():
|
||
|
||
if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or (
|
||
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
|
||
in Definitions.DATA_ANOMALY_MATCHES
|
||
):
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": None,
|
||
"epc_year_upper_bound": None,
|
||
"does_age_band_match_epc_age_band": "No EPC Age Band",
|
||
}
|
||
)
|
||
continue
|
||
|
||
# We exatract the upper and lower bounds
|
||
if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [
|
||
"England and Wales: 2007 onwards",
|
||
"England and Wales: 2012 onwards",
|
||
]:
|
||
year_lower_bound = (
|
||
2007
|
||
if x[self.EPC_API_DATA_NAMES["construction-age-band"]]
|
||
== "England and Wales: 2007 onwards"
|
||
else 2012
|
||
)
|
||
|
||
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built"
|
||
if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound
|
||
else "EPC Age Band is older than Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": year_lower_bound,
|
||
"epc_year_upper_bound": None,
|
||
"does_age_band_match_epc_age_band": age_band_matches,
|
||
}
|
||
)
|
||
continue
|
||
|
||
if (
|
||
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
|
||
== "England and Wales: before 1900"
|
||
):
|
||
|
||
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built"
|
||
if x[self.STANDARD_YEAR_BUILT] < 1900
|
||
else "EPC Age Band is newer than Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": None,
|
||
"epc_year_upper_bound": 1899,
|
||
"does_age_band_match_epc_age_band": age_band_matches,
|
||
}
|
||
)
|
||
continue
|
||
|
||
if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit():
|
||
|
||
if pd.isnull(x[self.STANDARD_YEAR_BUILT]):
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built"
|
||
if x[self.STANDARD_YEAR_BUILT]
|
||
== int(x[self.EPC_API_DATA_NAMES["construction-age-band"]])
|
||
else "EPC Age Band is different from Year Built"
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": int(
|
||
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
|
||
),
|
||
"epc_year_upper_bound": int(
|
||
x[self.EPC_API_DATA_NAMES["construction-age-band"]]
|
||
),
|
||
"does_age_band_match_epc_age_band": age_band_matches,
|
||
}
|
||
)
|
||
continue
|
||
|
||
# Oherwise, we extract the upper and lower bounds
|
||
age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[
|
||
1
|
||
]
|
||
lower_date, upper_date = age_band.split("-")
|
||
|
||
if not x[self.STANDARD_YEAR_BUILT]:
|
||
age_band_matches = "No Year Built From Landlord"
|
||
else:
|
||
age_band_matches = (
|
||
"EPC Age Band Matches Year Built"
|
||
if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date))
|
||
and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date))
|
||
else (
|
||
"EPC Age Band is older than Year Built"
|
||
if x[self.STANDARD_YEAR_BUILT] > float(upper_date)
|
||
else "EPC Age Band is newer than Year Built"
|
||
)
|
||
)
|
||
|
||
processed_age_band.append(
|
||
{
|
||
self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID],
|
||
"epc_year_lower_bound": int(lower_date),
|
||
"epc_year_upper_bound": int(upper_date),
|
||
"does_age_band_match_epc_age_band": age_band_matches,
|
||
}
|
||
)
|
||
|
||
processed_age_band = pd.DataFrame(processed_age_band)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
processed_age_band, how="left"
|
||
)
|
||
|
||
def identify_worktypes(self):
|
||
|
||
if self.landlord_sap is not None:
|
||
# We add a SAP category for all work type identification
|
||
self.standardised_asset_list["SAP Category"] = np.where(
|
||
(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= 54
|
||
)
|
||
| (self.standardised_asset_list[self.STANDARD_SAP] <= 54)
|
||
),
|
||
"SAP Rating 54 or less",
|
||
np.where(
|
||
(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= 68
|
||
)
|
||
| (self.standardised_asset_list[self.STANDARD_SAP] <= 68)
|
||
),
|
||
"SAP Rating 55-68",
|
||
np.where(
|
||
(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= self.EMPTY_CAVITY_SAP_THRESHOLD
|
||
)
|
||
| (
|
||
self.standardised_asset_list[self.STANDARD_SAP]
|
||
<= self.EMPTY_CAVITY_SAP_THRESHOLD
|
||
)
|
||
),
|
||
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
|
||
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more",
|
||
),
|
||
),
|
||
)
|
||
|
||
self.standardised_asset_list["SAP Category"] = np.where(
|
||
pd.isnull(self.standardised_asset_list[self.STANDARD_SAP])
|
||
& pd.isnull(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
),
|
||
"SAP Unknown",
|
||
self.standardised_asset_list["SAP Category"],
|
||
)
|
||
|
||
else:
|
||
# We add a SAP category for all work type identification
|
||
# We break into 4 categories (54 or less, 55-68, 69-74, 75 or more)
|
||
|
||
self.standardised_asset_list["SAP Category"] = np.where(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= 54
|
||
),
|
||
"SAP Rating 54 or less",
|
||
np.where(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= 68
|
||
),
|
||
"SAP Rating 55-68",
|
||
np.where(
|
||
(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= self.EMPTY_CAVITY_SAP_THRESHOLD
|
||
),
|
||
f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}",
|
||
f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more",
|
||
),
|
||
),
|
||
)
|
||
self.standardised_asset_list["SAP Category"] = np.where(
|
||
pd.isnull(
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
),
|
||
"SAP Unknown",
|
||
self.standardised_asset_list["SAP Category"],
|
||
)
|
||
|
||
# Before we being, we identify if a property has solar already as we use this
|
||
# for identifying cavity jobs
|
||
if self.non_intrusives_present and not self.old_format_non_intrusives_present:
|
||
|
||
if (
|
||
self.new_format_non_insturives_present_v2
|
||
or self.solar_non_intrusives_present
|
||
):
|
||
existing_solar_non_intrusives_check = (
|
||
self.standardised_asset_list["non-intrusives: ROOF ORIENTATION"]
|
||
.str.strip()
|
||
.isin(["ALREADY HAS SOLAR PV", "ALREADY HAS PV"])
|
||
)
|
||
else:
|
||
existing_solar_non_intrusives_check = (
|
||
self.standardised_asset_list[
|
||
"non-intrusives: PV, ACCESS ISSUE, SEE NOTES"
|
||
]
|
||
== "SOLAR PV ON ROOF"
|
||
)
|
||
elif self.old_format_non_intrusives_present:
|
||
existing_solar_non_intrusives_check = (
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"]
|
||
.str.lower()
|
||
.str.strip()
|
||
.isin(["solar pv on roof"])
|
||
)
|
||
else:
|
||
# We don't have an indication
|
||
existing_solar_non_intrusives_check = False
|
||
|
||
self.standardised_asset_list["property_has_solar"] = (
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_EXISTING_PV]
|
||
== "already has PV"
|
||
)
|
||
| existing_solar_non_intrusives_check
|
||
| (self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR])
|
||
)
|
||
|
||
# If we have non-intrusives completed, we can use this to identify work types
|
||
######################################################
|
||
# Empty cavity:
|
||
######################################################
|
||
# 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled
|
||
# 2) The age is before 1995
|
||
# 3) We don't remove anything that haas access issues yet
|
||
|
||
if self.non_intrusives_present:
|
||
if self.new_format_non_insturives_present_v2:
|
||
non_intrusives_wall_filter = (
|
||
self.standardised_asset_list["non-intrusives: Construction"]
|
||
== "CAVITY"
|
||
) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
|
||
["EMPTY", "PARTIAL", "EMPTY CAVITY"]
|
||
)
|
||
else:
|
||
non_intrusives_wall_filter = (
|
||
self.standardised_asset_list["non-intrusives: Construction"]
|
||
== "CAVITY"
|
||
) & self.standardised_asset_list["non-intrusives: Insulated"].isin(
|
||
["EMPTY", "PARTIAL"]
|
||
)
|
||
elif self.old_format_non_intrusives_present:
|
||
non_intrusives_wall_filter = self.standardised_asset_list[
|
||
"non-intrusives: WFT Findings"
|
||
].str.lower().str.strip().isin(
|
||
[
|
||
"empty cavity",
|
||
"partial fill",
|
||
"empty",
|
||
"EMPTY CAVITY 70MM",
|
||
"partial",
|
||
"empty cav",
|
||
]
|
||
) | (
|
||
(
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"]
|
||
.str.lower()
|
||
.str.strip()
|
||
.str.contains("empty cavity|partial fill")
|
||
& ~self.standardised_asset_list["non-intrusives: WFT Findings"]
|
||
.astype(str)
|
||
.str.lower()
|
||
.str.strip()
|
||
.str.contains("major access issues")
|
||
)
|
||
)
|
||
else:
|
||
# We set the filter to False, as we have no non-intrusives
|
||
non_intrusives_wall_filter = False
|
||
|
||
if self.landlord_year_built is None:
|
||
year_built_filter = (
|
||
self.standardised_asset_list["epc_year_upper_bound"]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
)
|
||
else:
|
||
year_built_filter = (
|
||
self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
) | (
|
||
self.standardised_asset_list["epc_year_upper_bound"]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
)
|
||
|
||
# Criteria:
|
||
# The property isn't a bedsit
|
||
# Non-intrusives indicate it needs a fill
|
||
# The EPC year is before 2002
|
||
# We also flag where the property has solar on the roof, because this is a signal of a high EPC rating
|
||
self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = (
|
||
(
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
|
||
["bedsit"]
|
||
)
|
||
)
|
||
& non_intrusives_wall_filter
|
||
& year_built_filter
|
||
& (~self.standardised_asset_list["property_has_solar"])
|
||
)
|
||
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity_has_solar"
|
||
] = (
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
|
||
& (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
|
||
["bedsit"]
|
||
)
|
||
)
|
||
& non_intrusives_wall_filter
|
||
& year_built_filter
|
||
& (
|
||
# If the property has solar, there's a chance it won't qualify
|
||
self.standardised_asset_list["property_has_solar"]
|
||
)
|
||
)
|
||
|
||
# We also add a filter on anything that was generally identified by the non-intrusives
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity_no_year_filter"
|
||
] = (
|
||
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity_has_solar"
|
||
]
|
||
& (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
|
||
["bedsit"]
|
||
)
|
||
)
|
||
& non_intrusives_wall_filter
|
||
)
|
||
|
||
if (not self.non_intrusives_eligibility) and (
|
||
not self.old_format_non_intrusives_present
|
||
):
|
||
# If we have NO inspections data, we capture all of the wall types and don't filter on age of the EPC
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["walls-description"]
|
||
]
|
||
.str.lower()
|
||
.isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS)
|
||
& (
|
||
self.standardised_asset_list["epc_year_upper_bound"]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
)
|
||
& (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
|
||
["bedsit"]
|
||
)
|
||
)
|
||
)
|
||
else:
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["walls-description"]
|
||
]
|
||
.str.lower()
|
||
.isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS)
|
||
& (
|
||
self.standardised_asset_list["epc_year_upper_bound"]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
)
|
||
& (~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD])
|
||
& (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
|
||
["bedsit"]
|
||
)
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = (
|
||
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
|
||
["uninsulated cavity"]
|
||
)
|
||
& (
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_YEAR_BUILT]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
)
|
||
| (
|
||
self.standardised_asset_list["epc_year_upper_bound"]
|
||
<= self.EMPTY_CAVITY_YEAR_THRESHOLD
|
||
)
|
||
)
|
||
& (
|
||
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(
|
||
["bedsit"]
|
||
)
|
||
)
|
||
)
|
||
|
||
# Finally, we create a flag to indicate that the cavity is empty, based on the criteria above
|
||
self.standardised_asset_list["cavity_is_empty"] = (
|
||
non_intrusives_wall_filter
|
||
| self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]]
|
||
.str.lower()
|
||
.isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS)
|
||
| self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
|
||
["uninsulated cavity"]
|
||
)
|
||
)
|
||
|
||
######################################################
|
||
# Extraction
|
||
######################################################
|
||
# as needing a CIGA check. What is the logic we should be applying here?
|
||
|
||
if self.non_intrusives_present:
|
||
|
||
extraction_wall_filter = (
|
||
(
|
||
self.standardised_asset_list["non-intrusives: Construction"]
|
||
== "CAVITY"
|
||
)
|
||
& (
|
||
self.standardised_asset_list["non-intrusives: Insulated"].isin(
|
||
["RETRO DRILLED", "FILLED AT BUILD"]
|
||
)
|
||
)
|
||
& (
|
||
~self.standardised_asset_list["non-intrusives: Material"].isin(
|
||
[
|
||
"GREY LOOSE BEAD",
|
||
"COMPACTED BEAD",
|
||
"FIBRE BATT NO CAVITY",
|
||
"EMPTY NARROW BELOW 30mm",
|
||
]
|
||
)
|
||
)
|
||
)
|
||
|
||
if self.non_intrusives_eligibility:
|
||
# If we have the eligibility column, we check if the wall is eligible
|
||
extraction_wall_filter = (
|
||
extraction_wall_filter
|
||
& ~self.standardised_asset_list[
|
||
"non-intrusives: Eligibility (Red/Yellow/Green)"
|
||
].isin(["RED"])
|
||
)
|
||
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction"
|
||
] = (extraction_wall_filter & year_built_filter)
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction_no_year_filter"
|
||
] = (extraction_wall_filter & ~year_built_filter)
|
||
|
||
elif self.old_format_non_intrusives_present:
|
||
print("Review these categories!!!!")
|
||
extraction_wall_filter = (
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"]
|
||
.str.lower()
|
||
.str.strip()
|
||
.isin(
|
||
[
|
||
"blown in yellow wool",
|
||
"retro drilled & filled",
|
||
"white fibre from build",
|
||
"foam filled from build",
|
||
"retro drilled gas in block",
|
||
"block in rock wool",
|
||
"rdf / tilehung",
|
||
"fibre from build",
|
||
"blown in rock wool",
|
||
"rdf / tile hung",
|
||
"retro drilled",
|
||
"rock wool from build",
|
||
"part rendered retro drilled",
|
||
"white fibtr from build.",
|
||
"retro drilled and filled",
|
||
"blown in white wool",
|
||
"blown in yellow fibre from build",
|
||
"rdf",
|
||
"polybead",
|
||
"foam filled",
|
||
"blown in white bead from build",
|
||
"blown in yellow fibre",
|
||
"retro drilled det",
|
||
"blown in rockwool",
|
||
"retro drilled det empty cav",
|
||
"retro drilled end",
|
||
"retro filled extension",
|
||
"retro filled",
|
||
"foam",
|
||
]
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction"
|
||
] = extraction_wall_filter
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction_no_year_filter"
|
||
] = False
|
||
|
||
else:
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction"
|
||
] = False
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction_no_year_filter"
|
||
] = False
|
||
|
||
######################################################
|
||
# Solar
|
||
######################################################
|
||
# Criteria:
|
||
# Check 1: Does the property have a valid heating system?
|
||
self.standardised_asset_list[
|
||
"solar_landlord_data_indicates_correct_heating_system"
|
||
] = self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
|
||
[
|
||
"air source heat pump",
|
||
"ground source heat pump",
|
||
"high heat retention storage heaters",
|
||
"electric boiler",
|
||
]
|
||
)
|
||
self.standardised_asset_list[
|
||
"solar_landlord_data_indicates_needs_heating_upgrade"
|
||
] = self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
|
||
[
|
||
"electric storage heaters",
|
||
"room heaters",
|
||
"electric radiators",
|
||
"no heating",
|
||
"electric fuel",
|
||
]
|
||
)
|
||
|
||
self.standardised_asset_list[
|
||
"solar_epc_data_indicates_correct_heating_system"
|
||
] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheat-description"]
|
||
]
|
||
.str.lower()
|
||
.str.contains(
|
||
"air source heat pump|ground source heat pump|boiler and radiators, electric"
|
||
)
|
||
) | (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheat-description"]
|
||
]
|
||
.str.lower()
|
||
.str.contains("electric storage heaters")
|
||
& (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheatcont-description"]
|
||
]
|
||
== "Controls for high heat retention storage heaters"
|
||
)
|
||
)
|
||
|
||
# If the landlord has given us the heating system, we default to that on heating upgrades. Because of the
|
||
# poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the
|
||
# landlord data suggests otherwise (e.g. there's a gas boiler), we default to what the landlord has told us
|
||
self.standardised_asset_list[
|
||
"solar_epc_data_indicates_requires_heating_upgrade"
|
||
] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheat-description"]
|
||
]
|
||
.str.lower()
|
||
.str.contains("electric storage heaters|room heaters")
|
||
& (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheatcont-description"]
|
||
]
|
||
!= "Controls for high heat retention storage heaters"
|
||
)
|
||
) & (
|
||
~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin(
|
||
["district heating", "communal heating", "communal gas boiler"]
|
||
)
|
||
& ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
|
||
.astype(str)
|
||
.str.contains("gas ")
|
||
)
|
||
|
||
# Basic check - both of the previous two shouldn't be true simultaneously
|
||
if (
|
||
self.standardised_asset_list[
|
||
"solar_epc_data_indicates_correct_heating_system"
|
||
]
|
||
& self.standardised_asset_list[
|
||
"solar_epc_data_indicates_requires_heating_upgrade"
|
||
]
|
||
).sum():
|
||
logger.info(
|
||
"We have an example of both heating system checks being true - checking known cases"
|
||
)
|
||
known_edge_cases = [
|
||
"Ground source heat pump, radiators, electric, Electric storage heaters"
|
||
]
|
||
error_cases = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[
|
||
"solar_epc_data_indicates_correct_heating_system"
|
||
]
|
||
& self.standardised_asset_list[
|
||
"solar_epc_data_indicates_requires_heating_upgrade"
|
||
]
|
||
)
|
||
]
|
||
if all(
|
||
error_cases[self.EPC_API_DATA_NAMES["mainheat-description"]].isin(
|
||
known_edge_cases
|
||
)
|
||
):
|
||
logger.info("Within known edge cases")
|
||
else:
|
||
raise ValueError(
|
||
"Both heating system checks are true - this should not be possible"
|
||
)
|
||
|
||
# Check 3: Does the property meet the fabric condition
|
||
# Solar PV installs are subject to the minimum insulation requirements which means:
|
||
# 1) one of the following insulation measures must be installed as part of the same
|
||
# ECO4 project:
|
||
# • roof insulation (flat roof, pitched roof, room-in-roof)
|
||
# • exterior facing wall insulation (cavity wall, solid wall)
|
||
# • party cavity wall insulation
|
||
# • floor insulation (solid and underfloor)
|
||
#
|
||
# OR
|
||
#
|
||
# all measures (except any exempted measure referred to in paragraph 4.28)
|
||
# listed in paragraph a) must already be installed
|
||
#
|
||
# With this in mind, we look for 2 clases
|
||
# 1) The property is fully insulated apart from the loft (<200mm insulation)
|
||
# 2) THe property is fully insulated
|
||
self.standardised_asset_list[
|
||
"solar_landlord_walls_insulated"
|
||
] = self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
|
||
[
|
||
"filled cavity",
|
||
"insulated solid brick",
|
||
"insulated timber frame",
|
||
"uninsulated cavity",
|
||
"insulated system built",
|
||
"insulated granite or whinstone",
|
||
"insulated sandstone or limestone",
|
||
"new build - average thermal transmittance",
|
||
]
|
||
)
|
||
|
||
if self.non_intrusives_present:
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = (
|
||
self.standardised_asset_list["non-intrusives: Insulated"].isin(
|
||
["EWI", "RETRO DRILLED", "FILLED AT BUILD"]
|
||
)
|
||
)
|
||
elif self.old_format_non_intrusives_present:
|
||
self.standardised_asset_list[
|
||
"solar_non_intrusives_walls_insulated"
|
||
] = self.standardised_asset_list[
|
||
"non-intrusives: WFT Findings"
|
||
].str.lower().str.strip().isin(
|
||
[
|
||
"retro drilled",
|
||
"retro filled",
|
||
"ewi",
|
||
"retro drilled/ solid",
|
||
"retro drilled and filled",
|
||
]
|
||
) | self.standardised_asset_list[
|
||
"non-intrusives: WFT Findings"
|
||
].str.lower().str.strip().str.contains(
|
||
"retro drilled"
|
||
)
|
||
else:
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False
|
||
|
||
self.standardised_asset_list["walls_u_value"] = self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["walls-description"]
|
||
].apply(
|
||
lambda x: (
|
||
WallAttributes(x).process()["thermal_transmittance"]
|
||
if not pd.isnull(x)
|
||
else None
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["solar_epc_walls_insulated"] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]]
|
||
.str.lower()
|
||
.str.contains("|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS))
|
||
) | (
|
||
self.standardised_asset_list["walls_u_value"].apply(
|
||
lambda x: x <= 0.7 if not pd.isnull(x) else False
|
||
)
|
||
)
|
||
|
||
roof_data = []
|
||
for desc in self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["roof-description"]
|
||
].unique():
|
||
if pd.isnull(desc):
|
||
continue
|
||
roof_data.append(
|
||
{
|
||
self.EPC_API_DATA_NAMES["roof-description"]: desc,
|
||
**RoofAttributes(desc).process(),
|
||
}
|
||
)
|
||
roof_data = pd.DataFrame(roof_data)
|
||
roof_data = roof_data.rename(columns={"thermal_transmittance": "roof_u_value"})
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
|
||
)
|
||
|
||
# If the u-value of a roof is less than 0.7 we consider it insulated
|
||
self.standardised_asset_list["solar_epc_roof_insulated"] = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]]
|
||
.str.lower()
|
||
.str.contains(
|
||
"|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS),
|
||
)
|
||
| (
|
||
self.standardised_asset_list[
|
||
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
|
||
].apply(lambda x: int(x) >= 200 if str(x).isdigit() else False)
|
||
)
|
||
| (
|
||
self.standardised_asset_list["roof_u_value"].apply(
|
||
lambda x: x <= 0.7 if not pd.isnull(x) else False
|
||
)
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list[
|
||
"solar_epc_loft_needs_topup"
|
||
] = self.standardised_asset_list[
|
||
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
|
||
].apply(
|
||
lambda x: int(x) < 200 if str(x).isdigit() else False
|
||
) | (
|
||
(
|
||
self.standardised_asset_list["is_loft"]
|
||
| self.standardised_asset_list["is_pitched"]
|
||
)
|
||
& (
|
||
self.standardised_asset_list[
|
||
self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS
|
||
].isin(["below average", "none"])
|
||
)
|
||
)
|
||
|
||
self.standardised_asset_list["epc_has_floor_recommendation"] = (
|
||
self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False)
|
||
)
|
||
|
||
# Check if the boiler is electric
|
||
# We check if it contains both the terms boiler & electric
|
||
self.standardised_asset_list["has_electric_boiler"] = (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["mainheat-description"]
|
||
]
|
||
.str.lower()
|
||
.isin(["boiler and radiators, electric"])
|
||
) | (
|
||
self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM]
|
||
== "electric boiler"
|
||
)
|
||
|
||
####################################
|
||
# Check solar eligibility
|
||
####################################
|
||
|
||
# Set up the filters to stop repetition
|
||
correct_heating_system = (
|
||
self.standardised_asset_list[
|
||
"solar_landlord_data_indicates_correct_heating_system"
|
||
]
|
||
| self.standardised_asset_list[
|
||
"solar_epc_data_indicates_correct_heating_system"
|
||
]
|
||
| self.standardised_asset_list["has_electric_boiler"]
|
||
)
|
||
|
||
needs_heating_upgrade = (
|
||
self.standardised_asset_list[
|
||
"solar_landlord_data_indicates_needs_heating_upgrade"
|
||
]
|
||
| self.standardised_asset_list[
|
||
"solar_epc_data_indicates_requires_heating_upgrade"
|
||
]
|
||
)
|
||
|
||
# The requirements for walls are:
|
||
# 1) walls are insulated
|
||
# 2) property is a cavity (can be done insulated or not)
|
||
|
||
walls_meet_solar_requirements = (
|
||
# The landlord is saying the walls are insulated
|
||
self.standardised_asset_list["solar_landlord_walls_insulated"]
|
||
|
|
||
# EPC data is saying the walls are insulated
|
||
self.standardised_asset_list["solar_epc_walls_insulated"]
|
||
|
|
||
# Non-intrusives are saying the walls are insulated
|
||
self.standardised_asset_list["solar_non_intrusives_walls_insulated"]
|
||
|
|
||
# It's empty cavity
|
||
self.standardised_asset_list["cavity_is_empty"]
|
||
|
|
||
# It's a cavity wall
|
||
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
|
||
["filled cavity", "partial insulated cavity"]
|
||
)
|
||
)
|
||
|
||
# Determine if the client gave us property type in the first place
|
||
if all(self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "unknown"):
|
||
# Use EPC
|
||
not_a_flat = (
|
||
self.standardised_asset_list[self.EPC_API_DATA_NAMES["property-type"]]
|
||
!= "Flat"
|
||
)
|
||
else:
|
||
not_a_flat = (
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat"
|
||
)
|
||
|
||
solar_roof_meets_criteria = (
|
||
self.standardised_asset_list["solar_epc_roof_insulated"]
|
||
| self.standardised_asset_list["solar_epc_loft_needs_topup"]
|
||
)
|
||
|
||
self.standardised_asset_list["solar_eligible"] = (
|
||
# Property isn't a flag
|
||
not_a_flat
|
||
&
|
||
# Landlord data or EPC data indicates the heating system is appropriate
|
||
correct_heating_system
|
||
&
|
||
# The property doesn't currently have solar
|
||
~self.standardised_asset_list["property_has_solar"]
|
||
&
|
||
# The walls are insulated
|
||
walls_meet_solar_requirements
|
||
&
|
||
# Roof meets criteria
|
||
solar_roof_meets_criteria
|
||
)
|
||
|
||
# With heating upgrade
|
||
self.standardised_asset_list["solar_eligible_needs_heating_upgrade"] = (
|
||
not_a_flat
|
||
&
|
||
# Needs heating upgrade
|
||
needs_heating_upgrade
|
||
&
|
||
# The property doesn't currently have solar
|
||
~self.standardised_asset_list["property_has_solar"]
|
||
&
|
||
# The walls are insulated
|
||
walls_meet_solar_requirements
|
||
&
|
||
# Roof meets criteria
|
||
solar_roof_meets_criteria
|
||
)
|
||
|
||
# We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E
|
||
# or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables
|
||
self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = (
|
||
not_a_flat
|
||
&
|
||
# Landlord data or EPC data indicates the heating system is appropriate - in this case, we can also take
|
||
# electric boilers
|
||
correct_heating_system
|
||
&
|
||
# The property doesn't currently have solar
|
||
~self.standardised_asset_list["property_has_solar"]
|
||
&
|
||
# The walls are uninsulated solid
|
||
~walls_meet_solar_requirements
|
||
& (
|
||
self.standardised_asset_list[
|
||
self.EPC_API_DATA_NAMES["current-energy-efficiency"]
|
||
]
|
||
<= 57
|
||
)
|
||
)
|
||
|
||
# Drop anything we don't need
|
||
self.standardised_asset_list = self.standardised_asset_list.drop(
|
||
columns=["walls_u_value", "roof_u_value"]
|
||
)
|
||
|
||
# Adjust flagged extraction jobs to remove anything for solar
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
|
||
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"]
|
||
& ~self.standardised_asset_list["solar_eligible"]
|
||
)
|
||
|
||
# Finally, we note why each property has been flagged
|
||
self.standardised_asset_list["cavity_reason"] = None
|
||
|
||
empty_cavity_map = {
|
||
"non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE
|
||
+ ": ",
|
||
"non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property "
|
||
"already has solar: ",
|
||
"non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, "
|
||
f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ",
|
||
}
|
||
for variable, description in empty_cavity_map.items():
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
self.standardised_asset_list[variable]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"]),
|
||
description + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
# We break the cavity reason into a few different categories, when the EPC is different from inspections
|
||
if self.old_format_non_intrusives_present:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity"
|
||
]
|
||
& (
|
||
self.standardised_asset_list["non-intrusives: WFT Findings"]
|
||
.str.lower()
|
||
.str.strip()
|
||
.isin(
|
||
[
|
||
"retro drilled and filled",
|
||
"retro drilled",
|
||
"retro filled",
|
||
"retro drilled & filled",
|
||
]
|
||
)
|
||
)
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity"
|
||
]
|
||
& self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction"
|
||
]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EPC_EMPTY_INSPECTIONS_FILLED}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
elif self.non_intrusives_present:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity"
|
||
]
|
||
& (
|
||
self.standardised_asset_list["non-intrusives: Insulated"]
|
||
== "RETRO DRILLED"
|
||
)
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity"
|
||
]
|
||
& (
|
||
self.standardised_asset_list["non-intrusives: Insulated"]
|
||
== "FILLED AT BUILD"
|
||
)
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
else:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list[
|
||
"non_intrusive_indicates_empty_cavity"
|
||
]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EPC_EMPTY_INSPECTIONS_NON_CAVITY}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
# Work type prefixes
|
||
|
||
# Landlord data: The landlord's data indicates that the wall is an uninsulated cavity wall, but EPC and
|
||
# inspections show filled
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list["landlord_data_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]
|
||
& ~self.standardised_asset_list["epc_indicates_empty_cavity"]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.LANDLORD_EMPTY_INSPECTIONS_OTHER}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
# Flag extraction
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction"
|
||
]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EXTRACTION_NON_INTRUSIVE}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
self.standardised_asset_list[
|
||
"non_intrusive_indicates_cavity_extraction_no_year_filter"
|
||
]
|
||
& pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
f"{self.EXTRACTION_NON_INTRUSIVE}, built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: "
|
||
+ self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
######################################################
|
||
# Flag solar
|
||
######################################################
|
||
self.standardised_asset_list["solar_reason"] = None
|
||
|
||
# Map of variables and fill values for the solar_reason variable
|
||
# ordering of this map is important, where we flag our prioritised work types first
|
||
solar_reason_map = {
|
||
"solar_eligible": f"{self.SOLAR_ELIGIBLE}: ",
|
||
"solar_eligible_solid_wall_uninsulated": f"{self.SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED}: ",
|
||
"solar_eligible_needs_heating_upgrade": f"{self.SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE}: ",
|
||
}
|
||
|
||
for variable, reason in solar_reason_map.items():
|
||
self.standardised_asset_list["solar_reason"] = np.where(
|
||
self.standardised_asset_list[variable]
|
||
& pd.isnull(self.standardised_asset_list["solar_reason"]),
|
||
reason + self.standardised_asset_list["SAP Category"],
|
||
self.standardised_asset_list["solar_reason"],
|
||
)
|
||
|
||
# Finally, anything flagged for solar should not be flagged for cavity - make them None
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(
|
||
~pd.isnull(self.standardised_asset_list["solar_reason"])
|
||
& ~pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
),
|
||
None,
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
# Flag anything that has existing outcomes
|
||
if (self.outcomes is not None) and (
|
||
"surveyed" in self.standardised_asset_list.columns
|
||
):
|
||
|
||
if "installer refusal" not in self.standardised_asset_list.columns:
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
((self.standardised_asset_list["surveyed"] > 0)),
|
||
None,
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
else:
|
||
for col in ["cavity_reason", "solar_reason"]:
|
||
self.standardised_asset_list[col] = np.where(
|
||
(
|
||
(self.standardised_asset_list["surveyed"] > 0)
|
||
| (self.standardised_asset_list["installer refusal"] > 0)
|
||
),
|
||
None,
|
||
self.standardised_asset_list[col],
|
||
)
|
||
|
||
if self.master_surveyed is not None:
|
||
for col in ["cavity_reason", "solar_reason"]:
|
||
self.standardised_asset_list[col] = np.where(
|
||
((~pd.isnull(self.standardised_asset_list["submission_status"]))),
|
||
None,
|
||
self.standardised_asset_list[col],
|
||
)
|
||
|
||
if (
|
||
self.ecosurv is not None
|
||
and "ecosurv_install_status" in self.standardised_asset_list.columns
|
||
):
|
||
# If we didn't match anything to ecosurv, the ecosurv_install_status won't exist
|
||
for col in ["cavity_reason", "solar_reason"]:
|
||
self.standardised_asset_list[col] = np.where(
|
||
(
|
||
(
|
||
~pd.isnull(
|
||
self.standardised_asset_list["ecosurv_install_status"]
|
||
)
|
||
)
|
||
),
|
||
None,
|
||
self.standardised_asset_list[col],
|
||
)
|
||
|
||
# We prepare outcomes for output
|
||
if self.outcomes is not None:
|
||
logger.info("Preparing outcomes for output")
|
||
identified_work = self.standardised_asset_list[
|
||
~pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
| ~pd.isnull(self.standardised_asset_list["solar_reason"])
|
||
][self.DOMNA_PROPERTY_ID].values
|
||
|
||
if self.DOMNA_PROPERTY_ID in self.outcomes.columns:
|
||
self.outcomes_for_output = self.outcomes[
|
||
self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work)
|
||
]
|
||
|
||
# Finally, direct operations feedback has suggested that if a property is a flat that has a SAP rating of
|
||
# 76 or above, we should exclude it because it's likely not going to be eligible for anyting
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
(self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "flat")
|
||
& (self.standardised_asset_list["SAP Category"] == "SAP Rating 76 or more"),
|
||
self.standardised_asset_list["cavity_reason"] + " - (unlikely to quality)",
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
# Split cavity_reason on the colon and check if the first part is equal to one of the two options above
|
||
# that indicates empties
|
||
self.standardised_asset_list["identified_empty_cavity"] = (
|
||
self.standardised_asset_list["cavity_reason"]
|
||
.str.split(":")
|
||
.str[0]
|
||
.isin(
|
||
[
|
||
self.EMPTY_CAVITY_NON_INTRUSIVE,
|
||
self.EMPTY_CAVITY_NON_INTRUSIVE_YEAR,
|
||
self.EPC_EMPTY,
|
||
]
|
||
)
|
||
)
|
||
|
||
def get_work_figures(self):
|
||
blocks_of_flats = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
|
||
== "block of flats"
|
||
]
|
||
|
||
non_blocks_of_flats = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
|
||
!= "block of flats"
|
||
]
|
||
|
||
# Produce some aggregate figures
|
||
self.work_type_figures = {
|
||
**non_blocks_of_flats["cavity_reason"].value_counts().to_dict(),
|
||
**{
|
||
k + " (Block of flats)": v
|
||
for k, v in blocks_of_flats["solar_reason"]
|
||
.value_counts()
|
||
.to_dict()
|
||
.items()
|
||
},
|
||
**self.standardised_asset_list["solar_reason"].value_counts().to_dict(),
|
||
}
|
||
pprint(self.work_type_figures)
|
||
|
||
def fill_landlord_block_reference(self, has_blocks_of_flats):
|
||
if not has_blocks_of_flats:
|
||
return
|
||
|
||
# If we have blocks of flats, we fill the landlord_block_reference field with address 1 + postcode
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE] = np.where(
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
|
||
== "block of flats"
|
||
)
|
||
& (pd.isnull(self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE])),
|
||
self.standardised_asset_list[self.STANDARD_ADDRESS_1]
|
||
+ " "
|
||
+ self.standardised_asset_list[self.STANDARD_POSTCODE],
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE],
|
||
)
|
||
|
||
def split_blocks(self):
|
||
"""
|
||
Where we have a single row that is a block of flats, we split this into multiple rows,
|
||
one for each unit. The data that we have will be copied across rows
|
||
:return:
|
||
"""
|
||
|
||
blocks = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
|
||
== "block of flats"
|
||
].copy()
|
||
|
||
if blocks.empty:
|
||
return
|
||
|
||
RANGE_RE = re.compile(r"\b(\d+[A-Za-z]?)\s*[-–]\s*(\d+[A-Za-z]?)\b")
|
||
NUM_RE = re.compile(r"\b\d+[A-Za-z]?\b") # captures 12, 12A, etc.
|
||
TO_RANGE_RE = re.compile(
|
||
r"\b(\d+[A-Za-z]?)\s+(?:to|To|TO)\s+(\d+[A-Za-z]?)\b"
|
||
) # captures "13 to 15"
|
||
LETTER_RANGE_RE = re.compile(
|
||
r"\b(\d+)([A-Za-z]?)\s*[-–]\s*(\d+)([A-Za-z]?)\b"
|
||
) # captures "1A-3B"
|
||
|
||
expanded_rows = []
|
||
|
||
for _, row in blocks.iterrows():
|
||
addr = str(row[self.STANDARD_ADDRESS_1])
|
||
full_addr = row[self.STANDARD_FULL_ADDRESS]
|
||
|
||
# We also look for terms like "Odd", "even", "all" in the address to indicate if it should be just
|
||
# the odds, evens or all of the numbers
|
||
has_odd = (
|
||
"(odd)" in addr.lower()
|
||
or "(odd)" in full_addr.lower()
|
||
or "(odds)" in addr.lower()
|
||
or "(odds)" in full_addr.lower()
|
||
)
|
||
has_even = (
|
||
"(even)" in addr.lower()
|
||
or "(even)" in full_addr.lower()
|
||
or "(evens)" in addr.lower()
|
||
or "(evens)" in full_addr.lower()
|
||
)
|
||
|
||
# 1 ─ Range (e.g. 1-7)
|
||
m_range = RANGE_RE.search(addr)
|
||
to_range = TO_RANGE_RE.search(addr)
|
||
|
||
if m_range or to_range:
|
||
start, end = m_range.groups() if m_range else to_range.groups()
|
||
start, end = int(re.match(r"\d+", start)[0]), int(
|
||
re.match(r"\d+", end)[0]
|
||
)
|
||
if start > end or (end - start) > 200:
|
||
raise ValueError(f"Suspicious range '{addr}'")
|
||
|
||
# We define the looping range on whether we have odd, even or all numbers
|
||
house_number_range = range(start, end + 1)
|
||
if has_odd:
|
||
house_number_range = [x for x in house_number_range if x % 2 != 0]
|
||
if has_even:
|
||
house_number_range = [x for x in house_number_range if x % 2 == 0]
|
||
|
||
for n in house_number_range:
|
||
new = row.copy()
|
||
range_text = m_range.group(0) if m_range else to_range.group(0)
|
||
new_addr = addr.replace(range_text, str(n))
|
||
# Build the new full address by also swapping out the range_text
|
||
original_full_address = new[self.STANDARD_FULL_ADDRESS]
|
||
new_full_address = original_full_address.replace(range_text, str(n))
|
||
new[self.STANDARD_ADDRESS_1] = str(n)
|
||
new[self.STANDARD_FULL_ADDRESS] = new_full_address
|
||
new[self.STANDARD_PROPERTY_TYPE] = "flat"
|
||
# Keep a record of the previous address 1
|
||
new["block_address1"] = addr
|
||
new["block_full_address"] = original_full_address
|
||
new["is_expended_block"] = True
|
||
# We update the full address
|
||
|
||
new[self.DOMNA_PROPERTY_ID] = (
|
||
f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}"
|
||
)
|
||
expanded_rows.append(new.to_dict())
|
||
continue
|
||
|
||
# 2 ─ Explicit list (e.g. 1, 2, 5 Block) or split by an ampersand (e.g. 1 & 2 Block)
|
||
nums = NUM_RE.findall(addr)
|
||
if len(nums) > 1 and (
|
||
"," in addr or "&" in addr or " and " in addr.lower()
|
||
):
|
||
for n in nums:
|
||
new = row.copy()
|
||
new_addr = re.sub(
|
||
NUM_RE, n, addr, count=1
|
||
) # replace the first number only
|
||
new[self.STANDARD_ADDRESS_1] = new_addr
|
||
new[self.DOMNA_PROPERTY_ID] = (
|
||
f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}"
|
||
)
|
||
expanded_rows.append(new.to_dict())
|
||
continue
|
||
|
||
# Check for a range of lettered addresses e.g 31A - 31D
|
||
letter_range = LETTER_RANGE_RE.search(full_addr)
|
||
if letter_range:
|
||
start_num, start_letter, end_num, end_letter = letter_range.groups()
|
||
start_num, end_num = int(start_num), int(end_num)
|
||
if start_num != end_num:
|
||
raise NotImplementedError(f"Unusual range - handle me")
|
||
|
||
# We define the looping range on whether we have odd, even or all numbers
|
||
house_number_range = range(start_num, end_num + 1)
|
||
if has_odd:
|
||
house_number_range = [x for x in house_number_range if x % 2 != 0]
|
||
if has_even:
|
||
house_number_range = [x for x in house_number_range if x % 2 == 0]
|
||
|
||
for n in house_number_range:
|
||
for letter in range(ord(start_letter), ord(end_letter) + 1):
|
||
new = row.copy()
|
||
new_addr = f"{n}{chr(letter)}"
|
||
new[self.STANDARD_ADDRESS_1] = new_addr
|
||
new[self.DOMNA_PROPERTY_ID] = (
|
||
f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}"
|
||
)
|
||
expanded_rows.append(new.to_dict())
|
||
continue
|
||
|
||
# 4 ─ Single number or no number, treat as individual dwelling
|
||
if (len(nums) == 1) or not nums:
|
||
expanded_rows.append(row.to_dict())
|
||
continue
|
||
|
||
# Anything else with digits is unrecognised
|
||
raise NotImplementedError(f"Unhandled block format: '{addr}'")
|
||
|
||
expanded_blocks = pd.DataFrame(expanded_rows)
|
||
|
||
# Check for duplicated domna ids
|
||
if expanded_blocks[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
raise ValueError("expanded blocks has duplicated IDs")
|
||
|
||
# We drop the blocks from the standardised asset list and append on the expanded blocks
|
||
self.standardised_asset_list = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE]
|
||
!= "block of flats"
|
||
]
|
||
|
||
self.standardised_asset_list = pd.concat(
|
||
[self.standardised_asset_list, expanded_blocks], ignore_index=True
|
||
)
|
||
|
||
# As a final clean up, for any blocks that are size 1, we don't includr a project code
|
||
sizes = (
|
||
expanded_blocks.groupby(self.STANDARD_BLOCK_REFERENCE)[
|
||
self.DOMNA_PROPERTY_ID
|
||
]
|
||
.nunique()
|
||
.reset_index()
|
||
)
|
||
size_1 = sizes[sizes[self.DOMNA_PROPERTY_ID] <= 1]
|
||
# Remove the size 1 blocks from the standardised asset list
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE] = np.where(
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin(
|
||
size_1[self.STANDARD_BLOCK_REFERENCE].values
|
||
),
|
||
None,
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE],
|
||
)
|
||
|
||
def label_property_status(self):
|
||
"""
|
||
This function is designed to be run after identify_worktypes() has been run, and will create a "property_status"
|
||
column, which will note where each property is (to be surveyed, surveyed, installed), using the stages we
|
||
recognise within hubspot
|
||
:return:
|
||
"""
|
||
|
||
# For anything that is ready to go, that gets set to ready to be scheduled
|
||
self.standardised_asset_list["hubspot_status"] = np.where(
|
||
~pd.isnull(self.standardised_asset_list["cavity_reason"])
|
||
| ~pd.isnull(self.standardised_asset_list["solar_reason"]),
|
||
hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label,
|
||
None,
|
||
)
|
||
|
||
# we step through the process of flagging completed surveys
|
||
|
||
# We utilise submissions, ecosurv and outcomes to define the hubspot status
|
||
# We'll take the maximum of these three columns, based on the enum integer value
|
||
label_to_enum = {e.label: e for e in hubspot_config.HubspotProcessStatus}
|
||
|
||
def get_max_status_from_columns(row):
|
||
status_candidates = []
|
||
for col in [
|
||
"submission_status",
|
||
"ecosurv_install_status",
|
||
"outcome_status",
|
||
]:
|
||
label = row.get(col)
|
||
if label in label_to_enum:
|
||
status_candidates.append(label_to_enum[label])
|
||
if not status_candidates:
|
||
return row[
|
||
"hubspot_status"
|
||
] # fallback to existing status if no updates
|
||
return max(status_candidates).label
|
||
|
||
self.standardised_asset_list["hubspot_status"] = (
|
||
self.standardised_asset_list.apply(get_max_status_from_columns, axis=1)
|
||
)
|
||
|
||
self.standardised_asset_list["project_code"] = None
|
||
# if we have any blocks, where work is eligible, we flag them now
|
||
# These blocks may be refecence via the landlord_block_reference field, or by property types being
|
||
# blocks of flats
|
||
has_landlord_block_reference = sum(
|
||
~pd.isnull(self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE])
|
||
)
|
||
|
||
if has_landlord_block_reference:
|
||
# For blocks that have a 50% allocation, we create project codes
|
||
self.block_analysis()
|
||
# find any block refs with more than 50% emptires
|
||
viable_empty_blocks = self.block_analysis_df[
|
||
self.block_analysis_df["Percentage of Empties"] >= 0.50
|
||
]
|
||
|
||
if not viable_empty_blocks.empty:
|
||
project_code_lookup = viable_empty_blocks[["Block Reference"]].copy()
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
project_code_lookup,
|
||
how="left",
|
||
left_on=self.STANDARD_BLOCK_REFERENCE,
|
||
right_on="Block Reference",
|
||
)
|
||
self.standardised_asset_list["project_code"] = np.where(
|
||
~pd.isnull(self.standardised_asset_list["Block Reference"]),
|
||
self.standardised_asset_list["Block Reference"],
|
||
self.standardised_asset_list["project_code"],
|
||
)
|
||
self.standardised_asset_list = self.standardised_asset_list.drop(
|
||
columns=["Block Reference"]
|
||
)
|
||
|
||
def analyse_geographies(self):
|
||
cavity_programme = (
|
||
self.standardised_asset_list[["domna_postcode", "cavity_reason"]]
|
||
.groupby(["domna_postcode"])["cavity_reason"]
|
||
.count()
|
||
.reset_index()
|
||
)
|
||
solar_programme = (
|
||
self.standardised_asset_list[["domna_postcode", "solar_reason"]]
|
||
.groupby(["domna_postcode"])["solar_reason"]
|
||
.count()
|
||
.reset_index()
|
||
)
|
||
postcodes = (
|
||
self.standardised_asset_list[["domna_postcode", "landlord_property_id"]]
|
||
.groupby("domna_postcode")["landlord_property_id"]
|
||
.count()
|
||
.reset_index()
|
||
.rename(columns={"landlord_property_id": "n_properties"})
|
||
)
|
||
geographical_areas = (
|
||
postcodes.merge(cavity_programme, how="left", on="domna_postcode")
|
||
.merge(solar_programme, how="left", on="domna_postcode")
|
||
.fillna(0)
|
||
)
|
||
geographical_areas["coverage"] = (
|
||
(geographical_areas["solar_reason"] + geographical_areas["cavity_reason"])
|
||
/ geographical_areas["n_properties"]
|
||
* 100
|
||
)
|
||
|
||
geographical_areas = geographical_areas.sort_values("coverage", ascending=False)
|
||
self.geographical_areas = geographical_areas
|
||
|
||
def block_analysis(self):
|
||
|
||
# Reverse mapping: label -> enum
|
||
LABEL_TO_ENUM = {e.label: e for e in hubspot_config.HubspotProcessStatus}
|
||
|
||
# Threshold status - anything that is at this stage or beyond is considered surveyed
|
||
threshold = (
|
||
hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.value
|
||
)
|
||
|
||
block_analysis = []
|
||
for block_reference, group in self.standardised_asset_list.groupby(
|
||
self.STANDARD_BLOCK_REFERENCE
|
||
):
|
||
|
||
cavity_breakdown = (
|
||
group["cavity_reason"]
|
||
.fillna("No Eligibility")
|
||
.value_counts(normalize=True)
|
||
* 100
|
||
)
|
||
|
||
if all(cavity_breakdown.index == "No Eligibility"):
|
||
continue
|
||
|
||
# We check the % of empty vs not empty as right now, we're focused on empty
|
||
n_empties = (
|
||
(group["identified_empty_cavity"] == True)
|
||
& (~pd.isnull(group["cavity_reason"]))
|
||
& (
|
||
~group["cavity_reason"].str.contains(
|
||
"(unlikely to quality)", case=False, na=False, regex=False
|
||
)
|
||
)
|
||
).sum()
|
||
|
||
n_empties_high_confidence = (
|
||
(group["identified_empty_cavity"] == True)
|
||
& (
|
||
~group["SAP Category"].isin(
|
||
["SAP Rating 69-75", "SAP Rating 76 or more"]
|
||
)
|
||
)
|
||
& (~pd.isnull(group["cavity_reason"]))
|
||
& (
|
||
~group["cavity_reason"].str.contains(
|
||
"(unlikely to quality)", case=False, na=False, regex=False
|
||
)
|
||
)
|
||
).sum()
|
||
|
||
# Average age of the EPCs
|
||
group["time_since_epc"] = (
|
||
pd.to_datetime("now")
|
||
- pd.to_datetime(group[self.EPC_API_DATA_NAMES["inspection-date"]])
|
||
).dt.days
|
||
|
||
average_age_of_epc = group["time_since_epc"].mean()
|
||
|
||
works = group["hubspot_status"]
|
||
above_threshold = works.map(LABEL_TO_ENUM.get).dropna()
|
||
count_above = (above_threshold >= threshold).sum()
|
||
proportion_surveyed = count_above / len(works)
|
||
proportion_empty = n_empties / len(works)
|
||
proportion_empty_high_confidence = n_empties_high_confidence / len(works)
|
||
# We auto-populate any blocks that have greater than 50% proportion empty
|
||
|
||
block_analysis.append(
|
||
{
|
||
"Block Reference": block_reference,
|
||
"Block Size": len(group),
|
||
"average_age_of_epc": average_age_of_epc,
|
||
"Proportion of properties suryeyed": proportion_surveyed,
|
||
"Percentage of Empties": proportion_empty,
|
||
"Percentage of Empties (high confidence)": proportion_empty_high_confidence,
|
||
**cavity_breakdown.to_dict(),
|
||
}
|
||
)
|
||
|
||
block_analysis = pd.DataFrame(block_analysis)
|
||
block_analysis = block_analysis.fillna(0)
|
||
|
||
# We flag which properties are eligible for works. We need at least 50%
|
||
block_analysis["Eligible for Works"] = (
|
||
block_analysis["Percentage of Empties"] >= 0.50
|
||
)
|
||
block_analysis = block_analysis.sort_values(
|
||
"Percentage of Empties", ascending=False
|
||
)
|
||
|
||
# For properties that are NOT eligible, we should update the cavity reason
|
||
ineligible_blocks = block_analysis[~block_analysis["Eligible for Works"]][
|
||
"Block Reference"
|
||
].values
|
||
|
||
eligible_blocks = block_analysis[block_analysis["Eligible for Works"]][
|
||
"Block Reference"
|
||
].values
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin(
|
||
ineligible_blocks
|
||
),
|
||
self.standardised_asset_list["cavity_reason"]
|
||
+ " (Flat in block with less than 50% eligible)",
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
# if the property is in a block of flats that eligible, but the property itself is not eligible, we flag this
|
||
# The criteria is:
|
||
# =The property should be in a block of flats
|
||
|
||
self.standardised_asset_list["cavity_reason"] = np.where(
|
||
self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin(
|
||
eligible_blocks
|
||
),
|
||
self.standardised_asset_list["cavity_reason"]
|
||
+ " "
|
||
+ "(Flat in block with more than 50% eligible)",
|
||
self.standardised_asset_list["cavity_reason"],
|
||
)
|
||
|
||
self.block_analysis_df = block_analysis
|
||
|
||
@staticmethod
|
||
def split_full_name(x):
|
||
if pd.isnull(x):
|
||
return None, None, None
|
||
x = x.lower()
|
||
titles = ["mr", "mrs", "ms", "miss", "dr", "prof"]
|
||
# Remove titles
|
||
detected_title = [title for title in titles if x.startswith(title)]
|
||
if detected_title:
|
||
for title in detected_title:
|
||
x = x.replace(title, "")
|
||
x = x.strip()
|
||
first_name, last_name = x.split(" ")[0], x.split(" ")[-1]
|
||
title = detected_title[0].title() if detected_title else None
|
||
return title, first_name.title(), last_name.title()
|
||
|
||
def load_contact_details(
|
||
self,
|
||
local_filepath,
|
||
sheet_name,
|
||
landlord_property_id,
|
||
phone_number_column=None,
|
||
secondary_phone_number_column=None,
|
||
secondary_contact_full_name=None,
|
||
email_column=None,
|
||
fullname_column=None,
|
||
firstname_column=None,
|
||
lastname_column=None,
|
||
):
|
||
|
||
self.contact_detail_fields = {
|
||
"landlord_property_id": landlord_property_id,
|
||
"phone_number": phone_number_column,
|
||
"secondary_phone_number": secondary_phone_number_column,
|
||
"secondary_contact_full_name": secondary_contact_full_name,
|
||
"email": email_column,
|
||
"fullname": fullname_column,
|
||
"firstname": firstname_column,
|
||
"lastname": lastname_column,
|
||
}
|
||
|
||
details_colnames = [
|
||
phone_number_column,
|
||
secondary_phone_number_column,
|
||
email_column,
|
||
fullname_column,
|
||
firstname_column,
|
||
lastname_column,
|
||
]
|
||
# We'll fill them
|
||
none_details = [x for x in details_colnames if x is None]
|
||
details_colnames = [x for x in details_colnames if x is not None]
|
||
|
||
if local_filepath is None:
|
||
# Create an empty DataFrame based on the fields in self.contact_detail_fields
|
||
self.contact_details = pd.DataFrame(
|
||
columns=list(self.contact_detail_fields.keys())
|
||
)
|
||
return
|
||
|
||
contact_details = pd.read_excel(local_filepath, sheet_name=sheet_name)[
|
||
[self.contact_detail_fields["landlord_property_id"]] + details_colnames
|
||
]
|
||
contact_details = contact_details[
|
||
~pd.isnull(
|
||
contact_details[self.contact_detail_fields["landlord_property_id"]]
|
||
)
|
||
]
|
||
# Fill anything we don't have
|
||
for detail in none_details:
|
||
contact_details[detail] = None
|
||
|
||
if fullname_column and not (firstname_column and lastname_column):
|
||
(
|
||
contact_details["title"],
|
||
contact_details["first_name"],
|
||
contact_details["last_name"],
|
||
) = zip(*contact_details[fullname_column].apply(self.split_full_name))
|
||
else:
|
||
contact_details["title"] = None
|
||
|
||
self.contact_details = contact_details
|
||
|
||
@classmethod
|
||
def load_standardised_asset_list(cls, filepath, sheet_name, header):
|
||
"""
|
||
This function is designed to load the standardised asset list from a file
|
||
:return:
|
||
"""
|
||
# This is a placeholder for now
|
||
# instantiate the class
|
||
instance = cls(
|
||
local_filepath=filepath,
|
||
sheet_name=sheet_name,
|
||
address1_colname=cls.STANDARD_ADDRESS_1,
|
||
postcode_colname=cls.STANDARD_POSTCODE,
|
||
full_address_colname=cls.STANDARD_FULL_ADDRESS,
|
||
landlord_property_id=cls.STANDARD_LANDLORD_PROPERTY_ID,
|
||
full_address_cols_to_concat=[],
|
||
missing_postcodes_method=None,
|
||
address1_extraction_method=None,
|
||
landlord_year_built=cls.STANDARD_YEAR_BUILT,
|
||
landlord_uprn=cls.STANDARD_UPRN,
|
||
landlord_property_type=cls.STANDARD_PROPERTY_TYPE,
|
||
landlord_built_form=cls.STANDARD_BUILT_FORM,
|
||
landlord_wall_construction=cls.STANDARD_WALL_CONSTRUCTION,
|
||
landlord_roof_construction=cls.STANDARD_ROOF_CONSTRUCTION,
|
||
landlord_heating_system=cls.STANDARD_HEATING_SYSTEM,
|
||
landlord_existing_pv=cls.STANDARD_EXISTING_PV,
|
||
landlord_sap=cls.STANDARD_SAP,
|
||
landlord_block_reference=cls.STANDARD_BLOCK_REFERENCE,
|
||
phase=False,
|
||
header=header,
|
||
)
|
||
return instance
|
||
|
||
def prepare_for_crm(
|
||
self, company_domain, installer_name, reconcile_programme=False
|
||
):
|
||
"""
|
||
This function prepares the data for upload into Hubspot
|
||
:param company_domain: The company domain name to be used in the CRM
|
||
:param installer_name: The name of the installer to be used in the CRM
|
||
:param reconcile_programme: If True, will include all properties with a project code, regardless of status
|
||
:raises ValueError: If the installer name is not valid or if there are missing products
|
||
:return:
|
||
"""
|
||
# This maps the opportunities as we reference them, to the product data as stored in Hubspot
|
||
if not hubspot_config.Installer.is_valid_value(installer_name):
|
||
raise ValueError(
|
||
f"Installer name {installer_name} is not valid. Please check the installer name."
|
||
)
|
||
|
||
# We check if all products are covered in the lookup table
|
||
cavity_products = (
|
||
self.standardised_asset_list["cavity_reason"].unique().tolist()
|
||
)
|
||
cavity_products = [x for x in cavity_products if not pd.isnull(x)]
|
||
solar_products = self.standardised_asset_list["solar_reason"].unique().tolist()
|
||
solar_products = [x for x in solar_products if not pd.isnull(x)]
|
||
|
||
product_map = {}
|
||
for identified_product in cavity_products + solar_products:
|
||
if pd.isnull(identified_product):
|
||
continue
|
||
|
||
matched_product = None
|
||
for product_prefix, crm_product in self.prefixes_to_products.items():
|
||
if identified_product.startswith(product_prefix):
|
||
matched_product = crm_product
|
||
|
||
product_map[identified_product] = matched_product
|
||
|
||
# For each cavity and solar product, we iterate through the prexies and map to the products
|
||
|
||
programme_data = self.standardised_asset_list.copy()
|
||
programme_data["domna_full_address"] = (
|
||
programme_data["domna_full_address"]
|
||
.str.replace(";", ", ", regex=False)
|
||
.str.replace(" ", "")
|
||
)
|
||
|
||
# Format the two date columns
|
||
programme_data["survey_date"] = pd.to_datetime(
|
||
programme_data["survey_date"], errors="coerce"
|
||
)
|
||
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]] = pd.to_datetime(
|
||
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]], errors="coerce"
|
||
)
|
||
# Convert to dd/mm/yyyy format
|
||
programme_data["survey_date"] = programme_data["survey_date"].dt.strftime(
|
||
"%d/%m/%Y"
|
||
)
|
||
programme_data[self.EPC_API_DATA_NAMES["inspection-date"]] = programme_data[
|
||
self.EPC_API_DATA_NAMES["inspection-date"]
|
||
].dt.strftime("%d/%m/%Y")
|
||
|
||
# We take rows that have a survyor and a date for the survey
|
||
# We include properties under 2 circumstances:
|
||
# 1) The hubspot status is ready to be scheduled and there is an assigned surveyor and week for survey
|
||
# 2) The hubspot status is something else, meaning this has been included in an existing programme
|
||
# 3) reconcile programme is true, and therefore all proeprties with a project code will be included
|
||
|
||
if reconcile_programme:
|
||
programme_data = programme_data[~pd.isnull(programme_data["project_code"])]
|
||
else:
|
||
|
||
if programme_data["hubspot_status"].nunique() > 1:
|
||
logger.info(
|
||
"Multiple hubspot_status found - are you sure you don't want to reconcile the programme?"
|
||
)
|
||
|
||
ready_to_be_scheduled = (
|
||
programme_data["hubspot_status"]
|
||
== hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label
|
||
)
|
||
# completed_works = (
|
||
# (programme_data["hubspot_status"] !=
|
||
# hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label) &
|
||
# (~pd.isnull(programme_data["hubspot_status"]))
|
||
# )
|
||
programme_data = programme_data[ready_to_be_scheduled]
|
||
|
||
# Merge on the contact details
|
||
programme_data = programme_data.merge(
|
||
self.contact_details,
|
||
how="left",
|
||
left_on=self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
right_on=self.landlord_property_id,
|
||
)
|
||
|
||
programme_data["Company Domain Name <COMPANY domain>"] = company_domain
|
||
# Append the product data onto the programme data
|
||
programme_data["cavity_product"] = programme_data["cavity_reason"].map(
|
||
lambda x: product_map.get(x, {"name": None})["name"]
|
||
)
|
||
programme_data["solar_product"] = programme_data["solar_reason"].map(
|
||
lambda x: product_map.get(x, {"name": None})["name"]
|
||
)
|
||
|
||
# We check if we have any missings
|
||
cavity_missing = pd.isnull(
|
||
programme_data[~pd.isnull(programme_data["cavity_reason"])][
|
||
"cavity_product"
|
||
]
|
||
).sum()
|
||
solar_missing = pd.isnull(
|
||
programme_data[~pd.isnull(programme_data["solar_reason"])]["solar_product"]
|
||
).sum()
|
||
|
||
if cavity_missing > 0 or solar_missing > 0:
|
||
raise ValueError(
|
||
f"We have {cavity_missing} cavity products and {solar_missing} solar products that are not "
|
||
"mapped to a product in the lookup table. Please check the mapping."
|
||
)
|
||
|
||
programme_data["domna_product"] = programme_data["solar_product"].copy()
|
||
programme_data["domna_product"] = np.where(
|
||
pd.isnull(programme_data["domna_product"]),
|
||
programme_data["cavity_product"],
|
||
programme_data["domna_product"],
|
||
)
|
||
# We filter just on rows where we have a product
|
||
if reconcile_programme:
|
||
# We include historical works, which will include hisorical cavity so we set these as extraction (as
|
||
# this is the main work mix)
|
||
programme_data["domna_product"] = programme_data["domna_product"].fillna(
|
||
self.CRM_HISTORICAL_CAVITY_PRODUCT["name"]
|
||
)
|
||
else:
|
||
# We shouldn't have any missing products
|
||
# programme_data = programme_data[
|
||
# ~pd.isnull(programme_data["survey_date"])
|
||
# ]
|
||
|
||
if pd.isnull(programme_data["domna_product"]).sum():
|
||
raise ValueError("Missing products")
|
||
programme_data = programme_data.drop(
|
||
columns=["solar_product", "cavity_product"]
|
||
)
|
||
|
||
product_df = (
|
||
pd.DataFrame(self.CRM_PRODUCTS)
|
||
.T[["name", "id", "unit_price"]]
|
||
.reset_index()
|
||
.rename(
|
||
columns={
|
||
"name": "Name <LINE_ITEM name>",
|
||
"id": "Product ID <LINE_ITEM hs_product_id>",
|
||
"unit_price": "Unit price <LINE_ITEM price>",
|
||
"index": "domna_product",
|
||
}
|
||
)
|
||
)
|
||
|
||
product_df["Quantity <LINE_ITEM quantity>"] = 1
|
||
|
||
# Append on the product data
|
||
programme_data = programme_data.merge(
|
||
product_df, how="left", on="domna_product"
|
||
)
|
||
|
||
# Add in deal and pipeline information
|
||
programme_data["dealname"] = (
|
||
programme_data[self.STANDARD_FULL_ADDRESS]
|
||
+ ", "
|
||
+ programme_data[self.STANDARD_POSTCODE]
|
||
+ " : "
|
||
+ programme_data["domna_product"]
|
||
)
|
||
programme_data["Pipeline <DEAL pipeline>"] = hubspot_config.CRM_PIPELINE_NAME
|
||
programme_data["Associations: Listing"] = "Property Owner"
|
||
|
||
# We determine which column we should use for the UPRN
|
||
if self.STANDARD_UPRN not in programme_data.columns:
|
||
uprn_column = self.EPC_API_DATA_NAMES["uprn"]
|
||
# If we're working form the EPC, we don't have this information if the EPC is estimated
|
||
programme_data[uprn_column] = np.where(
|
||
programme_data["estimated"] == True, None, programme_data[uprn_column]
|
||
)
|
||
else:
|
||
# Use the value that has the most coverage
|
||
uprn_column = "hubspot_uprn"
|
||
programme_data[uprn_column] = programme_data[self.STANDARD_UPRN].fillna(
|
||
programme_data[self.EPC_API_DATA_NAMES["uprn"]]
|
||
)
|
||
|
||
# Remove any negative URPSN which are not valid
|
||
programme_data[uprn_column] = np.where(
|
||
programme_data["estimated"].isin([1, True]),
|
||
None,
|
||
programme_data[uprn_column],
|
||
)
|
||
|
||
# Add in some columns if we have them
|
||
date_of_inspections = (
|
||
"Non-Intrusives: Date of Inspection"
|
||
if "Non-Intrusives: Date of Inspection" in programme_data.columns
|
||
else None
|
||
)
|
||
|
||
# Ammend the property type and built form columns
|
||
programme_data["hubspot_property_type"] = programme_data[
|
||
self.STANDARD_PROPERTY_TYPE
|
||
].copy()
|
||
# We don't already have this
|
||
if self.STANDARD_BUILT_FORM in programme_data.columns:
|
||
programme_data["hubspot_built_form"] = programme_data[
|
||
self.STANDARD_BUILT_FORM
|
||
].copy()
|
||
else:
|
||
programme_data["hubspot_built_form"] = None
|
||
|
||
def _replace_property_description_data(programme_data, column_name):
|
||
"""
|
||
Helper function to replace property type or built form data with a specified value.
|
||
"""
|
||
|
||
if column_name == "hubspot_property_type":
|
||
valid_values = ["house", "bungalow", "flat", "maisonette"]
|
||
epc_fill_col = "property-type"
|
||
elif column_name == "hubspot_built_form":
|
||
valid_values = [
|
||
"detached",
|
||
"semi-detached",
|
||
"mid-terrace",
|
||
"end-terrace",
|
||
]
|
||
epc_fill_col = "built-form"
|
||
else:
|
||
raise ValueError(
|
||
f"Invalid column name: {column_name}. Must be 'hubspot_property_type' or "
|
||
f"'hubspot_built_form'."
|
||
)
|
||
|
||
# Any vakue that is not house, bungalow, flat or maisonette is set to None
|
||
programme_data[column_name] = np.where(
|
||
~programme_data[column_name].isin(valid_values),
|
||
None,
|
||
programme_data[column_name],
|
||
)
|
||
# We fill with the EPC property type
|
||
programme_data[column_name] = np.where(
|
||
pd.isnull(programme_data[column_name]),
|
||
programme_data[self.EPC_API_DATA_NAMES[epc_fill_col]],
|
||
programme_data[column_name],
|
||
)
|
||
|
||
programme_data[column_name] = programme_data[column_name].fillna("unknown")
|
||
|
||
return programme_data
|
||
|
||
# Clean up the property type and built form columns
|
||
programme_data = _replace_property_description_data(
|
||
programme_data, "hubspot_property_type"
|
||
)
|
||
programme_data = _replace_property_description_data(
|
||
programme_data, "hubspot_built_form"
|
||
)
|
||
|
||
# We accomodate the old vs new inspections format
|
||
if "non-intrusives: WFT Findings" in programme_data.columns:
|
||
# We have the old format - we only have notes
|
||
non_intrusives_surveyor_notes = "non-intrusives: WFT Findings"
|
||
non_intrusives_construction = None
|
||
non_intrusives_insulated = None
|
||
non_intrusives_insulation_material = None
|
||
non_intrusives_ciga_check_required = None
|
||
non_intrusives_pv_access = None
|
||
non_intrusives_roof_orientation = None
|
||
non_intrusives_surveyor_name = None
|
||
else:
|
||
non_intrusives_surveyor_notes = "non-intrusives: Any further surveyor notes"
|
||
non_intrusives_construction = "non-intrusives: Construction"
|
||
non_intrusives_insulated = "non-intrusives: Insulated"
|
||
non_intrusives_insulation_material = "non-intrusives: Material"
|
||
non_intrusives_ciga_check_required = "non-intrusives: CIGA Check Required"
|
||
non_intrusives_pv_access = "non-intrusives: PV, ACCESS ISSUE, SEE NOTES"
|
||
non_intrusives_roof_orientation = (
|
||
"non-intrusives: OFF GAS - ROOF ORIENTATION"
|
||
)
|
||
non_intrusives_surveyor_name = "non-intrusives: Surveyors Name"
|
||
|
||
# This maps the hubspot schema to the template. Anything that is not covered in this will be flagged
|
||
schema_mappings = {
|
||
"Company Domain Name <COMPANY domain>": "Company Domain Name <COMPANY domain>",
|
||
"Email <CONTACT email>": (
|
||
self.contact_detail_fields["email"]
|
||
if self.contact_detail_fields["email"]
|
||
else None
|
||
), # TODO: Review
|
||
"First Name <CONTACT firstname>": (
|
||
self.contact_detail_fields["firstname"]
|
||
if self.contact_detail_fields["firstname"]
|
||
else None
|
||
), # TODO: Review
|
||
"Last Name <CONTACT lastname>": (
|
||
self.contact_detail_fields["lastname"]
|
||
if self.contact_detail_fields["lastname"]
|
||
else None
|
||
), # TODO: Review
|
||
"Phone <CONTACT phone>": (
|
||
self.contact_detail_fields["phone_number"]
|
||
if self.contact_detail_fields["phone_number"]
|
||
else None
|
||
), # TODO: Review
|
||
"Secondary Phone <CONTACT secondary_phone_number>": (
|
||
self.contact_detail_fields["secondary_phone_number"]
|
||
if self.contact_detail_fields["secondary_phone_number"]
|
||
else None
|
||
),
|
||
"Secondary Contact Full Name <CONTACT secondary_contact_full_name>": (
|
||
self.contact_detail_fields["secondary_contact_full_name"]
|
||
if self.contact_detail_fields["secondary_contact_full_name"]
|
||
else None
|
||
),
|
||
"Full Address <LISTING full_address>": self.STANDARD_FULL_ADDRESS,
|
||
"Address 1 <LISTING hs_address_1>": self.STANDARD_ADDRESS_1,
|
||
"Address 2 <LISTING hs_address_2>": None, # TODO: Don't have this for the moment
|
||
"Postcode <LISTING hs_zip>": self.STANDARD_POSTCODE,
|
||
"Property Type <LISTING property_type>": "hubspot_property_type",
|
||
"Property Sub Type <LISTING property_sub_type>": "hubspot_built_form",
|
||
"Bedroom(s) <LISTING hs_bedrooms>": None, # TODO: Don't have this for the moment
|
||
"Domna Property ID <LISTING domna_property_id>": self.DOMNA_PROPERTY_ID,
|
||
# We populate this with the column that we have
|
||
"National UPRN <LISTING national_uprn>": uprn_column,
|
||
"Owner Property ID <LISTING owner_property_id>": self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
"Wall Construction <LISTING wall_construction>": self.STANDARD_WALL_CONSTRUCTION,
|
||
"Heating System <LISTING heating_system>": self.STANDARD_HEATING_SYSTEM,
|
||
"Year Built <LISTING hs_year_built>": self.STANDARD_YEAR_BUILT,
|
||
"Boiler Make <LISTING boiler_make>": None, # TODO: Don't have this for the moment
|
||
"Boiler Model <LISTING boiler_model>": None, # TODO: Don't have this for the moment
|
||
"Non-Intrusives: Date Checked <LISTING non_intrusives__date_checked>": date_of_inspections,
|
||
"Non-Intrusives: Wall Type <LISTING non_intrusives__wall_type>": non_intrusives_construction,
|
||
"Non-intrusives: Insulation <LISTING non_intrusives__insulation>": non_intrusives_insulated,
|
||
"Non-intrusives: Insulation Material <LISTING non_intrusives__insulation_material>": non_intrusives_insulation_material,
|
||
"Non-Intrusives: CIGA Check Required <LISTING non_intrusives__ciga_check_required>": non_intrusives_ciga_check_required,
|
||
"Non-Intrusives: PV Access Issues <LISTING non_intrusives__access_issues>": non_intrusives_pv_access,
|
||
"Non-Intrusives: Roof Orientation <LISTING non_intrusives__roof_orientation>": non_intrusives_roof_orientation,
|
||
"Non-Intrusives: Surveyor Notes <LISTING non_intrusives__surveyor_notes>": non_intrusives_surveyor_notes,
|
||
"Non-Intrusives: Surveyor Name <LISTING non_intrusives__surveyor_name>": non_intrusives_surveyor_name,
|
||
"CIGA: Date Requested <LISTING ciga__date_requested>": None, # TODO: Don't have this for the moment
|
||
"CIGA: Cavity Guarantee Found <LISTING ciga__cavity_guarantee_found>": None,
|
||
"Last EPC: Is Estimated <LISTING last_epc__is_estimated>": self.EPC_API_DATA_NAMES[
|
||
"estimated"
|
||
],
|
||
"Last EPC: EPC Rating <LISTING last_epc__epc_rating>": self.EPC_API_DATA_NAMES[
|
||
"current-energy-rating"
|
||
],
|
||
"Last EPC: SAP Rating <LISTING last_epc__sap_rating>": self.EPC_API_DATA_NAMES[
|
||
"current-energy-efficiency"
|
||
],
|
||
"Last EPC: Main Heating Description <LISTING last_epc__main_heating_description>": self.EPC_API_DATA_NAMES[
|
||
"mainheat-description"
|
||
],
|
||
"Last EPC: Heating Controls <LISTING last_epc__heating_controls>": self.EPC_API_DATA_NAMES[
|
||
"mainheatcont-description"
|
||
],
|
||
"Last EPC: Lodgement Date <LISTING last_epc__lodgement_date>": self.EPC_API_DATA_NAMES[
|
||
"inspection-date"
|
||
],
|
||
"Last EPC: Floor Area <LISTING last_epc__floor_area>": self.EPC_API_DATA_NAMES[
|
||
"total-floor-area"
|
||
],
|
||
"Last EPC: Wall <LISTING last_epc__wall>": self.EPC_API_DATA_NAMES[
|
||
"walls-description"
|
||
],
|
||
"Last EPC: Roof <LISTING last_epc__roof>": self.EPC_API_DATA_NAMES[
|
||
"roof-description"
|
||
],
|
||
"Last EPC: Floor <LISTING last_epc__floor>": self.EPC_API_DATA_NAMES[
|
||
"floor-description"
|
||
],
|
||
"Last EPC: Room Height <LISTING last_epc__room_height>": self.EPC_API_DATA_NAMES[
|
||
"floor-height"
|
||
],
|
||
"Last EPC: Age Band <LISTING last_epc__age_band>": self.EPC_API_DATA_NAMES[
|
||
"construction-age-band"
|
||
],
|
||
"Pipeline <DEAL pipeline>": "Pipeline <DEAL pipeline>",
|
||
"Expected Commencement Date <DEAL expected_commencement_date>": "survey_date",
|
||
"Deal Name <DEAL dealname>": "dealname", # Need to create this,
|
||
"Product ID <LINE_ITEM hs_product_id>": "Product ID <LINE_ITEM hs_product_id>",
|
||
"Name <LINE_ITEM name>": "Name <LINE_ITEM name>",
|
||
"Unit price <LINE_ITEM price>": "Unit price <LINE_ITEM price>",
|
||
"Quantity <LINE_ITEM quantity>": "Quantity <LINE_ITEM quantity>",
|
||
"Deal Owner": "surveyor",
|
||
"Project Code <DEAL project_code>": "project_code",
|
||
"Associations: Listing": "Associations: Listing",
|
||
"Deal Stage <DEAL dealstage>": "hubspot_status",
|
||
}
|
||
|
||
# We sometimes columns if the landlord never provided them
|
||
missed_mapping_cols = [
|
||
c
|
||
for c in schema_mappings.values()
|
||
if c not in programme_data.columns
|
||
if c is not None
|
||
]
|
||
for c in missed_mapping_cols:
|
||
programme_data[c] = None
|
||
|
||
# We now create the finalised dataset to be uploaded into Hubspot
|
||
variables_required = list(schema_mappings.values())
|
||
variables_required = [v for v in variables_required if v is not None]
|
||
# We now flag anything that has a none value, which is information we haven't got right now
|
||
none_variables = [k for k, v in schema_mappings.items() if v is None]
|
||
# We'll add placeholder columns for the None variables
|
||
programme_data = programme_data[variables_required]
|
||
for col in none_variables:
|
||
programme_data[col] = None
|
||
|
||
programme_data = programme_data.rename(
|
||
columns={v: k for k, v in schema_mappings.items() if v is not None}
|
||
)
|
||
|
||
programme_data["Postcode <DEAL postcode>"] = programme_data[
|
||
"Postcode <LISTING hs_zip>"
|
||
].copy()
|
||
programme_data["Installer <DEAL installer>"] = installer_name
|
||
programme_data["Name <LISTING hs_name>"] = (
|
||
programme_data["Full Address <LISTING full_address>"]
|
||
+ " ,"
|
||
+ programme_data["Postcode <LISTING hs_zip>"]
|
||
)
|
||
# The listing owner email is the same as the surveyor email (deal owner), so they can see the listing
|
||
programme_data["Listing Owner Email <LISTING hubspot_owner_id>"] = (
|
||
programme_data["Deal Owner"]
|
||
)
|
||
programme_data["Amount <DEAL amount>"] = 0
|
||
programme_data["Deal Owner"] = np.where(
|
||
~pd.isnull(programme_data["Deal Owner"]),
|
||
programme_data["Deal Owner"].astype(str).str.lower(),
|
||
programme_data["Deal Owner"],
|
||
)
|
||
|
||
# We make sure we have all of the columns that we need
|
||
missed_columns = [
|
||
c
|
||
for c in hubspot_config.CRM_UPLOAD_COLUMNS
|
||
if c not in programme_data.columns
|
||
]
|
||
if missed_columns:
|
||
raise ValueError(
|
||
f"We have the following columns that are not in the programme data: {missed_columns}. "
|
||
"Please check the mapping and ensure all required columns are present."
|
||
)
|
||
|
||
self.hubspot_data = programme_data
|
||
|
||
def flag_ecosurv(self, ecosurv_landlords=None, landlords_to_ignore=None):
|
||
"""
|
||
This class will match ecosurv data to the asset list
|
||
:return:
|
||
"""
|
||
if ecosurv_landlords is None:
|
||
return
|
||
|
||
# TODO: Fetch from Sharepoint
|
||
ecosurv_filepath = (
|
||
"/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/07.05.2025.csv"
|
||
)
|
||
logger.info("Getting Ecosurv data from %s", ecosurv_filepath)
|
||
self.ecosurv = pd.read_csv(ecosurv_filepath, encoding="cp437")
|
||
|
||
landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False)
|
||
landlord_references = landlords[
|
||
landlords["Landlord"].str.lower().str.contains(ecosurv_landlords)
|
||
]
|
||
|
||
landlord_ecosurv_data = self.ecosurv[
|
||
self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values)
|
||
]
|
||
|
||
if landlords_to_ignore is not None:
|
||
landlord_ecosurv_data = landlord_ecosurv_data[
|
||
~landlord_ecosurv_data["Landlord"].isin(landlords_to_ignore)
|
||
]
|
||
|
||
# Try and match to asset list
|
||
matched = []
|
||
unmatched = []
|
||
for _, row in tqdm(
|
||
landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0]
|
||
):
|
||
postcode = row["Postcode"].lower()
|
||
df = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE]
|
||
.str.replace(" ", "")
|
||
.str.lower()
|
||
== postcode
|
||
)
|
||
].copy()
|
||
|
||
if df.empty:
|
||
unmatched.append(row["Reference"])
|
||
continue
|
||
|
||
if df.shape[0] > 1:
|
||
house_no = SearchEpc.get_house_number(
|
||
row["Address Line 1"], row["Postcode"]
|
||
)
|
||
df["house_no"] = df.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE]
|
||
),
|
||
axis=1,
|
||
)
|
||
df = df[df["house_no"] == house_no]
|
||
|
||
if df.shape[0] > 1:
|
||
# We compare address line 1 to full address
|
||
if any(
|
||
df[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.str.contains(row["Address Line 1"].lower(), na=False)
|
||
):
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.str.contains(row["Address Line 1"].lower(), na=False)
|
||
]
|
||
|
||
if df.shape[0] > 1:
|
||
df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"]
|
||
|
||
if df.shape[0] == 1:
|
||
matched.append(
|
||
{
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].values[0],
|
||
"ecosurv_reference": row["Reference"],
|
||
"ecosurv_address1": row["Address Line 1"],
|
||
"ecosurv_postcode": row["Postcode"],
|
||
}
|
||
)
|
||
continue
|
||
|
||
if df.shape[0] > 1:
|
||
unmatched.append(row["Reference"])
|
||
continue
|
||
|
||
logger.info("Matched %s properties to ecosurv data", len(matched))
|
||
logger.info("%s properties in Ecosurv remain unmatched", len(unmatched))
|
||
|
||
if not matched:
|
||
return
|
||
|
||
# We now match
|
||
matched = pd.DataFrame(matched)
|
||
# We'll possibly have duplicates here, where properties have been sold twice. Ww de-dupe
|
||
if matched[self.STANDARD_LANDLORD_PROPERTY_ID].duplicated().sum():
|
||
# It doesn't matter too much which record we take
|
||
matched = matched.drop_duplicates(
|
||
subset=[self.STANDARD_LANDLORD_PROPERTY_ID]
|
||
)
|
||
|
||
# We merge on the status of the property
|
||
matched = matched.merge(
|
||
self.ecosurv[["Reference", "Status", "Lead Status", "Tags"]].rename(
|
||
columns={
|
||
"Reference": "ecosurv_reference",
|
||
"Status": "ecosurv_status",
|
||
"Lead Status": "ecosurv_lead_status",
|
||
"Tags": "ecosurv_tags",
|
||
"Installer": "ecosurv_installer",
|
||
}
|
||
),
|
||
how="left",
|
||
on="ecosurv_reference",
|
||
)
|
||
|
||
matched["ecosurv_install_status"] = (
|
||
hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER
|
||
)
|
||
|
||
# This mapping is ordered by process order, where lodgment is the final step so if we have an indication
|
||
# that the property is ready for lodgement, we set the status to that. We then proceed through the other
|
||
# statuses where the penultimate status is install complete
|
||
mapping = {
|
||
"Cancelled": hubspot_config.HubspotProcessStatus.INSTALLER_CANCELLED_FINALIZED,
|
||
"TrustMark: Lodged": hubspot_config.HubspotProcessStatus.LODGEMENT_COMPLETE,
|
||
"Retrofit: Complete": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE,
|
||
"Retrofit: Awaiting TrustMark": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE,
|
||
"Retrofit: Awaiting post checks": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE,
|
||
"Installer Notification Sent": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"Submitted to RC": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"COONEY": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"Signed off for install": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"Retrofit: Signed off for install": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"Audit": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"Accepted": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
"Sold": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER,
|
||
}
|
||
|
||
def get_max_status(tag_str):
|
||
if pd.isna(tag_str):
|
||
return None
|
||
matched_statuses = []
|
||
for tag, status in mapping.items():
|
||
if tag in tag_str:
|
||
matched_statuses.append(status)
|
||
if not matched_statuses:
|
||
return None
|
||
return max(matched_statuses).label
|
||
|
||
matched["ecosurv_install_status"] = matched["ecosurv_tags"].apply(
|
||
get_max_status
|
||
)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
matched,
|
||
how="left",
|
||
on=self.STANDARD_LANDLORD_PROPERTY_ID,
|
||
)
|
||
|
||
# We keep a record of submissions that were NOT matches
|
||
self.ecosurv_no_match = self.ecosurv[
|
||
self.ecosurv["Reference"].isin(unmatched)
|
||
].copy()
|
||
|
||
def flag_outcomes(
|
||
self,
|
||
outcomes_filepaths,
|
||
outcomes_sheetname,
|
||
outcomes_address,
|
||
outcomes_postcode,
|
||
outcomes_houseno,
|
||
outcomes_id,
|
||
):
|
||
if not outcomes_filepaths:
|
||
return
|
||
|
||
self.outcomes = []
|
||
outcomes_no_match = []
|
||
lookup = []
|
||
for idx, outcomes_filepath in enumerate(outcomes_filepaths):
|
||
outcomes = pd.read_excel(
|
||
outcomes_filepath, sheet_name=outcomes_sheetname[idx]
|
||
)
|
||
outcomes["row_id"] = outcomes.index
|
||
|
||
if outcomes_houseno[idx] is None:
|
||
outcomes_houseno[idx] = "houseno"
|
||
outcomes["houseno"] = outcomes[outcomes_address[idx]].apply(
|
||
lambda x: SearchEpc.get_house_number(x, outcomes[outcomes_postcode])
|
||
)
|
||
|
||
# We handle an edge case that occured for LHP
|
||
if (
|
||
"Notes / Outcomes" in outcomes.columns
|
||
and "Outcome" not in outcomes.columns
|
||
):
|
||
# We use the re-mapper to handle this:
|
||
outcomes["Notes / Outcomes"] = outcomes["Notes / Outcomes"].str.strip()
|
||
values_to_remap = outcomes["Notes / Outcomes"].unique()
|
||
# We want to map this to our standardised list of property types we're interested in
|
||
remapper = DataRemapper(
|
||
standard_values=outcomes_mappings.outcomes_values,
|
||
standard_map=outcomes_mappings.outcomes_map,
|
||
)
|
||
remap_dictionary = remapper.standardize_list(
|
||
values_to_remap=values_to_remap.tolist()
|
||
)
|
||
# Perform the remap
|
||
outcomes["Outcome"] = outcomes["Notes / Outcomes"].map(remap_dictionary)
|
||
|
||
outcomes["Outcome"] = outcomes["Outcome"].str.lower().str.strip()
|
||
|
||
logger.info("Matching outcomes to asset list")
|
||
# Merge the outcomes onto the asset list - we check we're able to match sufficiently well
|
||
lookup_i = []
|
||
nomatch_i = []
|
||
for _, x in tqdm(outcomes.iterrows(), total=len(outcomes)):
|
||
|
||
if pd.isnull(x[outcomes_address[idx]]) or not x[outcomes_address[idx]]:
|
||
continue
|
||
|
||
# Check if we have an id
|
||
oid = x[outcomes_id[idx]] if outcomes_id[idx] is not None else None
|
||
|
||
if oid is not None:
|
||
matched = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].str.strip()
|
||
== oid
|
||
)
|
||
]
|
||
|
||
if matched.shape[0] == 1:
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[
|
||
self.DOMNA_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
continue
|
||
|
||
address_clean = (
|
||
x[outcomes_address[idx]].lower().replace(",", "").replace(" ", " ")
|
||
)
|
||
|
||
matched = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.str.replace(",", "")
|
||
.str.replace(" ", " ")
|
||
== address_clean
|
||
)
|
||
]
|
||
|
||
if matched.shape[0] == 1:
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[
|
||
self.DOMNA_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
continue
|
||
|
||
matched = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip()
|
||
== x[outcomes_postcode[idx]]
|
||
)
|
||
].copy()
|
||
if not matched.empty:
|
||
matched["houseno"] = matched.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
str(x[self.STANDARD_ADDRESS_1]),
|
||
str(x[self.STANDARD_POSTCODE]),
|
||
),
|
||
axis=1,
|
||
)
|
||
|
||
if pd.isnull(x[outcomes_houseno[idx]]):
|
||
house_no_to_match = SearchEpc.get_house_number(
|
||
str(x[outcomes_address[idx]]),
|
||
str(x[outcomes_postcode[idx]]),
|
||
)
|
||
if isinstance(house_no_to_match, str):
|
||
house_no_to_match = house_no_to_match.lower()
|
||
else:
|
||
house_no_to_match = str(x[outcomes_houseno[idx]]).strip()
|
||
|
||
matched = matched[
|
||
matched["houseno"].astype(str) == house_no_to_match
|
||
]
|
||
if matched.shape[0] == 1:
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[
|
||
self.DOMNA_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
continue
|
||
elif not matched.empty:
|
||
# Use levenstein distance to match
|
||
matched["address"] = (
|
||
matched[self.STANDARD_ADDRESS_1]
|
||
+ " "
|
||
+ matched[self.STANDARD_POSTCODE]
|
||
)
|
||
|
||
best_match = process.extractOne(
|
||
x[outcomes_address[idx]],
|
||
matched[self.STANDARD_FULL_ADDRESS].values,
|
||
)[0]
|
||
matched = matched[
|
||
matched[self.STANDARD_FULL_ADDRESS] == best_match
|
||
]
|
||
lookup_i.append(
|
||
{
|
||
"row_id": x["row_id"],
|
||
self.DOMNA_PROPERTY_ID: matched[
|
||
self.DOMNA_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
continue
|
||
|
||
nomatch_i.append(x["row_id"])
|
||
|
||
outcomes_no_match_i = outcomes[outcomes["row_id"].isin(nomatch_i)]
|
||
lookup_i = pd.DataFrame(lookup_i)
|
||
|
||
outcomes_no_match.append(outcomes_no_match_i)
|
||
lookup.append(lookup_i)
|
||
self.outcomes.append(outcomes)
|
||
|
||
lookup = pd.concat(lookup)
|
||
self.outcomes_no_match = pd.concat(outcomes_no_match)
|
||
self.outcomes = pd.concat(self.outcomes)
|
||
|
||
if lookup.empty:
|
||
return
|
||
|
||
# We will have duplicated domna property IDs, where a surveyor has been to a property multiple times
|
||
# Where we have multiple rows, we want to make a call on what the action should be. For example,
|
||
# there may be properties that have been visited multiple times where the outcome was "See notes" implying
|
||
# that the surveyor had a detailed explanation as to why they couldn't gain access so if this has
|
||
# happened multiple times, in this case we judge that the work may not be viable
|
||
|
||
if "Week Commencing" in self.outcomes.columns:
|
||
date_col = "Week Commencing"
|
||
elif "Survey Date" in self.outcomes.columns:
|
||
date_col = "Survey Date"
|
||
elif "Date letters sent" in self.outcomes.columns:
|
||
date_col = "Date letters sent"
|
||
elif "Date Letter sent" in self.outcomes.columns:
|
||
date_col = "Date Letter sent"
|
||
elif "WEEK COMMENCING" in self.outcomes.columns:
|
||
date_col = "WEEK COMMENCING"
|
||
else:
|
||
raise NotImplementedError("Invalid date in outcomes - implement me")
|
||
|
||
if "Notes" in self.outcomes.columns:
|
||
notes_col = "Notes"
|
||
elif "Notes / Outcomes" in self.outcomes.columns:
|
||
notes_col = "Notes / Outcomes"
|
||
elif "NOTES" in self.outcomes.columns:
|
||
notes_col = "NOTES"
|
||
else:
|
||
raise NotImplementedError("Invalid notes in outcomes - implement me")
|
||
|
||
lookup = lookup.merge(
|
||
self.outcomes[["row_id", "Outcome", notes_col, date_col]],
|
||
how="left",
|
||
on="row_id",
|
||
)
|
||
|
||
visit_counts = (
|
||
lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"]
|
||
.count()
|
||
.reset_index()
|
||
.rename(columns={"row_id": "visit_count"})
|
||
.sort_values("visit_count", ascending=False)
|
||
)
|
||
|
||
def extract_date(s):
|
||
if isinstance(s, str):
|
||
match = re.search(r"(\d{2}\.\d{2}\.\d{4})", s)
|
||
if match:
|
||
return pd.to_datetime(
|
||
match.group(1), format="%d.%m.%Y", errors="coerce"
|
||
)
|
||
return pd.NaT
|
||
|
||
lookup["parsed_date"] = lookup[date_col].apply(extract_date)
|
||
|
||
def get_latest_note(group):
|
||
surveyed = group[group["Outcome"] == "surveyed"]
|
||
if not surveyed.empty:
|
||
return surveyed.sort_values("parsed_date", ascending=False).iloc[0]
|
||
else:
|
||
return group.sort_values("parsed_date", ascending=False).iloc[0]
|
||
|
||
latest_note = (
|
||
lookup.groupby("domna_property_id", group_keys=False)
|
||
.apply(get_latest_note)
|
||
.reset_index(drop=True)
|
||
)
|
||
latest_note = latest_note[["domna_property_id", notes_col, "Outcome"]].rename(
|
||
columns={"Notes": "latest_outcome_note", "Outcome": "latest_outcome"}
|
||
)
|
||
|
||
pivot_df = (
|
||
lookup.groupby(["domna_property_id", "Outcome"])
|
||
.size()
|
||
.unstack(fill_value=0)
|
||
.reset_index()
|
||
)
|
||
pivot_df = pivot_df.merge(visit_counts, how="left", on="domna_property_id")
|
||
pivot_df = pivot_df.merge(latest_note, how="left", on="domna_property_id")
|
||
|
||
# We want the latest note
|
||
|
||
if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
raise Exception("We have duplicated property IDs in the outcomes data")
|
||
|
||
# We merge this data onto outcomes
|
||
self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin(
|
||
lookup["row_id"].values
|
||
)
|
||
self.outcomes = self.outcomes.merge(
|
||
lookup[["row_id", "domna_property_id"]], how="left", on="row_id"
|
||
)
|
||
|
||
# We flag the outcome status, based on the outcome
|
||
pivot_df["outcome_status"] = None
|
||
|
||
if "surveyed" in pivot_df.columns:
|
||
pivot_df["outcome_status"] = np.where(
|
||
pivot_df["surveyed"] > 0,
|
||
hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.label,
|
||
pivot_df["outcome_status"],
|
||
)
|
||
|
||
if "installer refusal" in pivot_df.columns:
|
||
pivot_df["outcome_status"] = np.where(
|
||
pivot_df["installer refusal"] > 0,
|
||
hubspot_config.HubspotProcessStatus.NOT_VIABLE.label,
|
||
pivot_df["outcome_status"],
|
||
)
|
||
|
||
pivot_df["outcome_status"] = np.where(
|
||
pivot_df["latest_outcome"].isin(["see notes"])
|
||
& (
|
||
pivot_df["outcome_status"]
|
||
!= hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.label
|
||
),
|
||
hubspot_config.HubspotProcessStatus.SURVEYED_NO_ACCESS_NEEDS_SIGN_OFF.label,
|
||
pivot_df["outcome_status"],
|
||
)
|
||
|
||
# We merge out pivoted outcomes onto the asset list
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
pivot_df,
|
||
how="left",
|
||
left_on=self.DOMNA_PROPERTY_ID,
|
||
right_on="domna_property_id",
|
||
)
|
||
|
||
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
raise ValueError("Duplicates appreared - something went wrong")
|
||
|
||
self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False)
|
||
|
||
def flag_survey_master(
|
||
self, master_filepaths, master_id_colnames, master_to_asset_list_filepath=None
|
||
):
|
||
# TODO: This probably needs further expansion
|
||
|
||
if not master_filepaths:
|
||
return
|
||
|
||
if master_to_asset_list_filepath is not None:
|
||
id_map = pd.read_csv(master_to_asset_list_filepath)
|
||
else:
|
||
id_map = pd.DataFrame()
|
||
|
||
logger.info("Getting masters and merging onto asset list")
|
||
master_surveyed = []
|
||
unmatched_submissions = []
|
||
for idx, filepath in enumerate(master_filepaths):
|
||
master_data = pd.read_csv(filepath)
|
||
# Strip columns
|
||
master_data.columns = [c.strip() for c in master_data.columns]
|
||
master_data.columns = [re.sub(r"\s+", " ", c) for c in master_data.columns]
|
||
# Drop any unnamed columns
|
||
unnamed_columns = [c for c in master_data.columns if "Unnamed:" in c]
|
||
master_data = master_data.drop(columns=unnamed_columns)
|
||
|
||
if not id_map.empty:
|
||
master_data = master_data.merge(
|
||
id_map, how="left", on=["NO.", "Street / Block Name", "Post Code"]
|
||
)
|
||
|
||
if "INSTALLED OR CANCELLED" in master_data.columns:
|
||
install_col = "INSTALLED OR CANCELLED"
|
||
elif "INSTALL / CANCELLATION DATE" in master_data.columns:
|
||
install_col = "INSTALL / CANCELLATION DATE"
|
||
elif "INSTALL/ CANCELLATION DATE" in master_data.columns:
|
||
install_col = "INSTALL/ CANCELLATION DATE"
|
||
elif "INSTALL/CANCELLATION DATE" in master_data.columns:
|
||
install_col = "INSTALL/CANCELLATION DATE"
|
||
elif "Measure 1 Install Date" in master_data.columns:
|
||
install_col = "Measure 1 Install Date"
|
||
else:
|
||
raise ValueError("No install or cancellation date")
|
||
|
||
if "SUBMISSION DATE" in master_data.columns:
|
||
submission_col = "SUBMISSION DATE"
|
||
elif "SUBMISSION DATE TO INSTALLERS" in master_data.columns:
|
||
submission_col = "SUBMISSION DATE TO INSTALLERS"
|
||
elif "Submission Date" in master_data.columns:
|
||
submission_col = "Submission Date"
|
||
else:
|
||
raise ValueError("No submission date column found in master data")
|
||
|
||
master_data["row_id"] = master_data.index
|
||
|
||
self.standardised_asset_list["house_no"] = (
|
||
self.standardised_asset_list.apply(
|
||
lambda x: SearchEpc.get_house_number(
|
||
str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE])
|
||
),
|
||
axis=1,
|
||
)
|
||
)
|
||
|
||
if (
|
||
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION"
|
||
in master_data.columns
|
||
):
|
||
scheme_col = "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION"
|
||
elif "AFFORDABLE WARMTH" in master_data.columns:
|
||
scheme_col = "AFFORDABLE WARMTH"
|
||
elif "Scheme" in master_data.columns:
|
||
scheme_col = "Scheme"
|
||
elif "Affordable Warmth" in master_data.columns:
|
||
scheme_col = "Affordable Warmth"
|
||
else:
|
||
scheme_col = "OFFICE USE ONLY"
|
||
|
||
postcode_col = (
|
||
"POSTCODE" if "POSTCODE" in master_data.columns else "Post Code"
|
||
)
|
||
if "NO." in master_data.columns:
|
||
house_no_col = "NO."
|
||
elif "NO" in master_data.columns:
|
||
house_no_col = "NO"
|
||
else:
|
||
house_no_col = "NUMBER"
|
||
|
||
if "PROPERTY TYPE As per table emailed" in master_data.columns:
|
||
property_type_col = "PROPERTY TYPE As per table emailed"
|
||
elif "PROPERTY TYPE As per table emailed" in master_data.columns:
|
||
property_type_col = "PROPERTY TYPE As per table emailed"
|
||
elif "PROPERTY TYPE" in master_data.columns:
|
||
property_type_col = "PROPERTY TYPE"
|
||
elif "Property Type" in master_data.columns:
|
||
property_type_col = "Property Type"
|
||
else:
|
||
property_type_col = "PROPERTY TYPE (SEE DEEMED SCORES SHEET) Eg. 3W_Flat_1 (As per Matrix)"
|
||
|
||
if "INSTALLERS NOTES ; REASONS FOR CANCELLATIONS" in master_data.columns:
|
||
installer_notes_col = "INSTALLERS NOTES ; REASONS FOR CANCELLATIONS"
|
||
elif "INSTALLERS NOTES" in master_data.columns:
|
||
installer_notes_col = "INSTALLERS NOTES"
|
||
elif "Installers Notes" in master_data.columns:
|
||
installer_notes_col = "Installers Notes"
|
||
elif (
|
||
"NOTES ; REASONS FOR CANCELLATIONS OR WHERE INSTALL DATE WAS OBTAINED FROM"
|
||
in master_data.columns
|
||
):
|
||
installer_notes_col = "NOTES ; REASONS FOR CANCELLATIONS OR WHERE INSTALL DATE WAS OBTAINED FROM"
|
||
elif (
|
||
"INSTALLERS NOTES / REASONS FOR CANCELLATIONS / WHERE INSTALL DATE WAS RECEIVED FROM"
|
||
in master_data.columns
|
||
):
|
||
installer_notes_col = (
|
||
"INSTALLERS NOTES / REASONS FOR CANCELLATIONS / WHERE INSTALL DATE WAS RECEIVED "
|
||
"FROM"
|
||
)
|
||
else:
|
||
raise ValueError("No installer notes column found in master data")
|
||
|
||
if "INSTALLER" in master_data.columns:
|
||
installer_col = "INSTALLER"
|
||
elif "Installer" in master_data.columns:
|
||
installer_col = "Installer"
|
||
else:
|
||
raise ValueError("No installer column found in master data")
|
||
|
||
measure_mix_col = "MEASURE COMBO"
|
||
|
||
if "TOWN" in master_data.columns:
|
||
town_colname = "TOWN"
|
||
elif "Town/Area" in master_data.columns:
|
||
town_colname = "Town/Area"
|
||
else:
|
||
town_colname = "Town/City"
|
||
|
||
logger.info("Matching master data to asset list")
|
||
matched = []
|
||
unmatched = []
|
||
for _, row in tqdm(master_data.iterrows(), total=len(master_data)):
|
||
|
||
original_house_no = row[house_no_col]
|
||
original_street = row["Street / Block Name"]
|
||
original_postcode = row[postcode_col]
|
||
|
||
if pd.isnull(row[postcode_col]):
|
||
continue
|
||
|
||
if master_id_colnames[idx] is not None:
|
||
# Filter the standardised asset list on this
|
||
df = self.standardised_asset_list[
|
||
self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID]
|
||
== row[master_id_colnames[idx]]
|
||
]
|
||
if df.shape[0] == 1:
|
||
matched.append(
|
||
{
|
||
"row_id": row["row_id"],
|
||
"original_house_no": original_house_no,
|
||
"original_street": original_street,
|
||
"original_postcode": original_postcode,
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
continue
|
||
|
||
postcode_no_space = row[postcode_col].strip().replace(" ", "").lower()
|
||
|
||
df = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE]
|
||
.str.strip()
|
||
.str.lower()
|
||
.str.replace(" ", "")
|
||
== postcode_no_space
|
||
)
|
||
]
|
||
|
||
house_no = row[house_no_col]
|
||
|
||
if pd.isnull(house_no):
|
||
house_no = None
|
||
|
||
if isinstance(house_no, (float, int)):
|
||
house_no = str(int(house_no))
|
||
|
||
if house_no not in df["house_no"].values:
|
||
# Handle postcode errors
|
||
postal_region = row[postcode_col].split(" ")[0].lower()
|
||
df = self.standardised_asset_list[
|
||
(
|
||
self.standardised_asset_list[self.STANDARD_POSTCODE]
|
||
.str.strip()
|
||
.str.lower()
|
||
.str.startswith(postal_region)
|
||
)
|
||
]
|
||
|
||
if house_no not in df["house_no"].values:
|
||
unmatched.append(row["row_id"])
|
||
continue
|
||
df = df[df["house_no"] == house_no]
|
||
if df.shape[0] > 1:
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.str.contains(row["Street / Block Name"].lower())
|
||
]
|
||
if df.shape[0] == 0:
|
||
unmatched.append(row["row_id"])
|
||
continue
|
||
matched.append(
|
||
{
|
||
"row_id": row["row_id"],
|
||
"original_house_no": original_house_no,
|
||
"original_street": original_street,
|
||
"original_postcode": original_postcode,
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
continue
|
||
|
||
if house_no in df["house_no"].values:
|
||
df = df[df["house_no"] == house_no]
|
||
if df.shape[0] != 1:
|
||
# Levenstein distance
|
||
|
||
if any(
|
||
df[self.STANDARD_FULL_ADDRESS].str.contains(
|
||
row["Street / Block Name"]
|
||
)
|
||
):
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS].str.contains(
|
||
row["Street / Block Name"]
|
||
)
|
||
]
|
||
else:
|
||
# Levenstein distance
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.apply(
|
||
lambda x: process.extractOne(
|
||
" ".join(
|
||
[
|
||
row[house_no_col],
|
||
row["Street / Block Name"],
|
||
row[town_colname],
|
||
]
|
||
).lower(),
|
||
x,
|
||
)[1]
|
||
)
|
||
> 90
|
||
]
|
||
|
||
if df.shape[0] == 0:
|
||
unmatched.append(row["row_id"])
|
||
continue
|
||
|
||
if any(
|
||
df[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.str.contains(
|
||
" ".join(
|
||
[row[house_no_col], row["Street / Block Name"]]
|
||
).lower()
|
||
)
|
||
):
|
||
df = df[
|
||
df[self.STANDARD_FULL_ADDRESS]
|
||
.str.lower()
|
||
.str.contains(
|
||
" ".join(
|
||
[row[house_no_col], row["Street / Block Name"]]
|
||
).lower()
|
||
)
|
||
]
|
||
|
||
if any(
|
||
df[self.STANDARD_PROPERTY_TYPE].str.contains(
|
||
row[property_type_col].split(" ")[-1].lower()
|
||
)
|
||
):
|
||
# We ignore "block of flats" entries
|
||
df = df[
|
||
df[self.STANDARD_PROPERTY_TYPE].str.contains(
|
||
row[property_type_col].split(" ")[-1].lower()
|
||
)
|
||
& (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
|
||
]
|
||
|
||
if df.shape[0] != 1:
|
||
# We have multiple matches - it's likely because the landlord has a duplicate
|
||
# that has been referenced in totally different ways so we just match to both
|
||
for _, x in df.iterrows():
|
||
matched.append(
|
||
{
|
||
"row_id": row["row_id"],
|
||
"original_house_no": original_house_no,
|
||
"original_street": original_street,
|
||
"original_postcode": original_postcode,
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: x[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
],
|
||
}
|
||
)
|
||
continue
|
||
|
||
matched.append(
|
||
{
|
||
"row_id": row["row_id"],
|
||
"original_house_no": original_house_no,
|
||
"original_street": original_street,
|
||
"original_postcode": original_postcode,
|
||
self.STANDARD_LANDLORD_PROPERTY_ID: df[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].values[0],
|
||
}
|
||
)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.drop(
|
||
columns="house_no"
|
||
)
|
||
|
||
# We match the "UPRN" which is the landlords ID, onto the master sheet
|
||
|
||
if measure_mix_col not in master_data.columns:
|
||
master_data[measure_mix_col] = "Measure mix not recorded"
|
||
|
||
matched = pd.DataFrame(matched)
|
||
if matched.empty:
|
||
continue
|
||
|
||
master_to_append = (
|
||
master_data[
|
||
[
|
||
scheme_col,
|
||
"row_id",
|
||
install_col,
|
||
submission_col,
|
||
measure_mix_col,
|
||
installer_notes_col,
|
||
installer_col,
|
||
]
|
||
]
|
||
.merge(matched, how="left", on="row_id")
|
||
.rename(
|
||
columns={
|
||
scheme_col: "funding_scheme",
|
||
measure_mix_col: "measure_mix",
|
||
install_col: "survey_status",
|
||
submission_col: "submission_date",
|
||
installer_notes_col: "submission_installer_notes",
|
||
installer_col: "submission_installer",
|
||
}
|
||
)
|
||
)
|
||
master_to_append["submission_cancelled"] = (
|
||
master_to_append["survey_status"].str.lower().str.contains("cancel")
|
||
)
|
||
master_to_append["submission_installed"] = (
|
||
master_to_append["survey_status"].str.lower().str.contains("installed")
|
||
)
|
||
master_surveyed.append(master_to_append)
|
||
unmatched_df = master_data[master_data["row_id"].isin(unmatched)]
|
||
|
||
# The columns are massively different - we take just a few
|
||
unmatched_df = unmatched_df[
|
||
[
|
||
scheme_col,
|
||
house_no_col,
|
||
"Street / Block Name",
|
||
postcode_col,
|
||
install_col,
|
||
submission_col,
|
||
]
|
||
].rename(
|
||
columns={
|
||
scheme_col: "Funding Scheme",
|
||
house_no_col: "House Number",
|
||
postcode_col: "Postcode",
|
||
install_col: "survey_status",
|
||
submission_col: "submission_date",
|
||
}
|
||
)
|
||
|
||
unmatched_submissions.append(unmatched_df)
|
||
|
||
master_surveyed = pd.concat(master_surveyed)
|
||
master_surveyed = master_surveyed[
|
||
~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID])
|
||
]
|
||
master_surveyed = master_surveyed[
|
||
~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin(
|
||
["NOT ON ASSET LIST", "Missing From Asset List"]
|
||
)
|
||
]
|
||
|
||
master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[
|
||
self.STANDARD_LANDLORD_PROPERTY_ID
|
||
].astype(str)
|
||
|
||
# We de-dupe crudely on landlord property id
|
||
self.master_surveyed = master_surveyed.drop_duplicates(
|
||
subset=[self.STANDARD_LANDLORD_PROPERTY_ID]
|
||
).copy()
|
||
|
||
# We now add the submission status, based on the hubspot stages
|
||
self.master_surveyed["submission_status"] = (
|
||
hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER.label
|
||
)
|
||
self.master_surveyed["submission_status"] = np.where(
|
||
self.master_surveyed["submission_cancelled"] == True,
|
||
hubspot_config.HubspotProcessStatus.INSTALLER_CANCELLED_FINALIZED.label,
|
||
self.master_surveyed["submission_status"],
|
||
)
|
||
|
||
self.master_surveyed["submission_status"] = np.where(
|
||
self.master_surveyed["submission_installed"] == True,
|
||
hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE.label,
|
||
self.master_surveyed["submission_status"],
|
||
)
|
||
|
||
self.standardised_asset_list = self.standardised_asset_list.merge(
|
||
self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
|
||
)
|
||
|
||
# Make sure no dupes
|
||
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
|
||
raise ValueError("duplicated ids!")
|
||
|
||
# Finally, we keep a record of the unmatched
|
||
if unmatched_submissions:
|
||
self.unmatched_submissions = pd.concat(unmatched_submissions)
|