import hashlib import os import re import tiktoken from pprint import pprint from datetime import datetime import asset_list.hubspot.config as hubspot_config from openai import OpenAI import numpy as np import pandas as pd from tqdm import tqdm from thefuzz import process from utils.logger import setup_logger from backend.SearchEpc import SearchEpc from BaseUtility import Definitions import asset_list.mappings.property_type as property_type_mappings import asset_list.mappings.walls as walls_mappings import asset_list.mappings.heating_systems as heating_mappings import asset_list.mappings.exising_pv as existing_pv_mappings import asset_list.mappings.built_form as built_form_mappings import asset_list.mappings.roof as roof_mappings import asset_list.mappings.outcomes as outcomes_mappings from recommendations.recommendation_utils import ( estimate_perimeter, estimate_external_wall_area, estimate_number_of_floors, ) from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from dotenv import load_dotenv logger = setup_logger() load_dotenv(dotenv_path="../backend/.env") # OpenAI API Key (set this in your environment variables for security) OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") class DataRemapper: def __init__(self, standard_values, standard_map=None, max_tokens=1000): """ Initialize the remapper with standard values and a predefined mapping. :param standard_values: Set of allowed standardized values. :param standard_map: Dictionary of common remappings {raw_value: standard_value}. """ self.standard_values = standard_values self.standard_map = standard_map self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing # Tokenizer for counting tokens self.tokenizer = tiktoken.encoding_for_model(self.ai_model) # Track token usage and remap dictionary self.total_tokens_used = 0 self.total_cost = 0 self.remap_dict = {} # {original_value: standardized_value} self.max_tokens = max_tokens # Limit for OpenAI API # Memoization for AI calls self.ai_cache = ( {} ) # {tuple(unmapped_values): {original_value: standardized_value}} # Capture the reponse for debugging self.ai_response = None # OpenAI pricing (as of Feb 2024) self.pricing = { "gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000}, "gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000}, } self.openai_client = OpenAI(api_key=OPENAI_API_KEY) @staticmethod def clean_string(text): """Basic text cleaning: remove extra spaces, punctuation, and normalize case.""" if not isinstance(text, str): return None text = text.strip().lower() text = re.sub(r"[^\w\s]", "", text) # Remove punctuation # Replace double strings text = re.sub(r"\s+", " ", text) return text def fuzzy_match(self, text): """Use fuzzy matching to find the closest standard value.""" match, score = ( process.extractOne(text, self.standard_values) if text else (None, 0) ) return match if score >= self.fuzzy_threshold else None def count_tokens(self, text): """Estimate the number of tokens in a given text.""" return len(self.tokenizer.encode(text)) if text else 0 def ai_standardize(self, unmapped_values): """Call OpenAI API **once** for all unmapped values to minimize cost, with memoization.""" if not unmapped_values: return {} unmapped_tuple = tuple( sorted(unmapped_values) ) # Ensure consistency for memoization if unmapped_tuple in self.ai_cache: return self.ai_cache[unmapped_tuple] # Return memoized result prompt = f""" You are an expert in data classification. Standardize each of these values into one of the categories: {list(self.standard_values)}. Return only a JSON dictionary where: - The keys are the original values. - The values are the standardized ones. Strictly return JSON **without markdown formatting** or extra text. Example Output: {{ "BLKHOUS": "block house", "BEDSIT": "bedsit" }} Values to standardize: {unmapped_values} """ # Count input tokens input_tokens = self.count_tokens(prompt) if input_tokens > self.max_tokens: raise ValueError("Input tokens exceed the maximum limit.") logger.info("Calling OpenAI API for standardization...") response = self.openai_client.chat.completions.create( model=self.ai_model, messages=[{"role": "user", "content": prompt}], max_tokens=self.max_tokens, temperature=0.1, ) output_text = response.choices[0].message.content.strip() output_tokens = self.count_tokens(output_text) # Count output tokens # Track total token usage self.total_tokens_used += input_tokens + output_tokens # Estimate cost input_cost = input_tokens * self.pricing[self.ai_model]["input"] output_cost = output_tokens * self.pricing[self.ai_model]["output"] self.total_cost += input_cost + output_cost try: # Parse response as dictionary mapping = eval(output_text) # OpenAI should return a valid dictionary except: mapping = {val: "unknown" for val in unmapped_values} # Fallback # Memoize the AI response self.ai_cache[unmapped_tuple] = mapping # We store the raw AI response for debugging logger.debug(f"AI Response: {mapping}") self.ai_response = output_text return mapping def standardize_list(self, values_to_remap): """ Standardizes a list of values and returns a dictionary {original_value: standardized_value}. :param values_to_remap: List of raw values to standardize. :return: Dictionary {original_value: standardized_value}. """ unique_values = set(values_to_remap) # Process only unique values unmapped_values = [] for value in unique_values: if pd.isna(value): # Handle NaN values self.remap_dict[value] = "unknown" continue cleaned_value = self.clean_string(value) # Rule-Based Check (Predefined Mapping) if cleaned_value in self.standard_map or value in self.standard_map: self.remap_dict[value] = ( self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value] ) continue if value.lower() in self.standard_map: self.remap_dict[value] = self.standard_map[value.lower()] continue # Exact Match in Standard Values if cleaned_value in self.standard_values: self.remap_dict[value] = cleaned_value continue # Fuzzy Matching fuzzy_match = self.fuzzy_match(cleaned_value) if fuzzy_match: self.remap_dict[value] = fuzzy_match continue # Capture anything that wasn't mapped unmapped_values.append(value) # AI Model - remap anything unmapped (batch request) ai_mapping = self.ai_standardize(unmapped_values) self.remap_dict.update(ai_mapping) return self.remap_dict def report_usage(self): """Prints a summary of token usage and cost.""" print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}") print(f"💰 Estimated Cost: ${self.total_cost:.4f}") class AssetList: """ This class is used to standardise asset lists so that we can process the core information in a consistent manner. """ EPC_API_DATA_NAMES = { "uprn": "epc_os_uprn", "address1": "epc_address1", "address": "epc_address", "postcode": "epc_postcode", "inspection-date": "epc_inspection_date", "current-energy-efficiency": "epc_sap_score_on_register", "current-energy-rating": "epc_rating_on_register", "property-type": "epc_property_type", "built-form": "epc_archetype", "total-floor-area": "epc_total_floor_area", "construction-age-band": "epc_age_band", "floor-height": "epc_floor_height", "number-habitable-rooms": "epc_number_habitable_rooms", "walls-description": "epc_wall_construction", "roof-description": "epc_roof_construction", "floor-description": "epc_floor_construction", "mainheat-description": "epc_heating_type", "mainheatcont-description": "epc_heating_controls", "secondheat-description": "epc_secondary_heating", "transaction-type": "epc_reason", "energy-consumption-current": "epc_heat_demand", "photo-supply": "epc_photo_supply", "estimated": "estimated", } FIND_EPC_DATA_NAMES = { "heating_text": "epc_estiamted_heating_kwh", "hot_water_text": "epc_estimated_hotwater_kwh", "Assessor’s name": "epc_assessor_name", "Assessor's Telephone": "epc_assessor_telephone", "Assessor's Email": "epc_assessor_email", "Accreditation scheme": "epc_assessor_accreditation", "Assessor’s ID": "epc_assessor_id", "Solar photovoltaics": "epc_solar_pv", } DATETIME_REMAP = { "Pre 1900": datetime(year=1899, month=12, day=31), } # These are the accepted methods we have for cleaning the address1 column ADDRESS_1_CLEANING_METHODS = [ "first_two_words", # This method will split on the fist two words, where the separator is a space "first_word", # This method will split on the first word, where the separator is a space "house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber # "address1_extraction" # This method will use the NLP model to extract address1 ] # Standard column Names STANDARD_ADDRESS_1 = "domna_address_1" STANDARD_POSTCODE = "domna_postcode" STANDARD_FULL_ADDRESS = "domna_full_address" STANDARD_YEAR_BUILT = "landlord_year_built" STANDARD_UPRN = "ordnance_survey_uprn" STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id" STANDARD_PROPERTY_TYPE = "landlord_property_type" STANDARD_BUILT_FORM = "landlord_built_form" STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction" STANDARD_ROOF_CONSTRUCTION = "landlord_roof_construction" STANDARD_HEATING_SYSTEM = "landlord_heating_system" STANDARD_EXISTING_PV = "landlord_existing_pv" STANDARD_SAP = "landlord_sap_rating" STANDARD_BLOCK_REFERENCE = "landlord_block_reference" DOMNA_PROPERTY_ID = "domna_property_id" # Regular expression for identifying if the address might point to multiple units MULTI_UNIT_REGEX = re.compile(r"\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b") # List of columns relating to the non-intrusive data NON_INTRUSIVES_COLNAMES = [ "Archetype", "Construction", "Insulated", "Material", "CIGA Check Required", "PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION", "Any further surveyor notes", "Surveyors Name", ] NON_INTRUSIVES_NEW_FORMAT_COLNAMES = [ "Has the property been re-walled?", "Is the property tile hung?", "Does the property have a render?", "Does the property have cladding?", "Gable Wall Obstructions", "Does the property have foliage that needs removal?", "Potential unsafe environment", "Date of Inspection", "Borescoped?", ] # Another version of non-intrusives: NON_INTRUSIVES_NEW_FORMAT_COLNAMES_V2 = [ "Archetype", "Archetype 2", "Construction", "Insulated", "Material", "Borescoped?", "CIGA Check Required", "ROOF ORIENTATION", "TILE HUNG", "RENDERED", "CLADDING", "ACCESS ISSUES", "FURTHER SURVEYOR NOTES", "DATE", "NAME OF SURVEYOR", ] # Solar non-intrusive fields NON_INTRUSIVES_SOLAR_COLNAMES = [ "PV, ACCESS ISSUE, SEE NOTES", "ROOF ORIENTATION", "AREA (m²) OF ROOF WHERE PV WILL BE SITUATED ", "SHADING", "Roof Tiles - CONCRETE/SLATE/ROSEMARY", "NO. OF PANELS (Typical size of 420W panel is 1mx1.7m and need 30cm all the way around panels)", "SCAFFOLD REQUIRED? IF YES, ARE THERE ANY SURROUNDING ACCESS ISSUES - PLEASE DESCRIBE", "IF PANELS ARE GOING ON REAR PLEASE CHECK FOR SPACE FOR SCAFFOLDING - DESCRIBE ANY ISSUES BELOW", "DATE", "NAME OF SURVEYOR", ] NON_INTRUSIVES_ELIGIBILITY_COLUMN = "Eligibility (Red/Yellow/Green)" OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ["WFT Findings", "ECO Eligibility"] # This SAP threshold is a key search criteria for properties that may be eligible for extraction FILLED_CAVITY_SAP_THRESHOLD = 75 # This SAP the EMPTY_CAVITY_SAP_THRESHOLD = 75 # Any EPC deemed to have been conducted prior to this year is deemed to be unreliable EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5 # Properties before this year are more likely to have lower EPC ratings and more likely to qualify EMPTY_CAVITY_YEAR_THRESHOLD = 2002 # Attributes - these are columns that we produce, calcualted based on other pieces of data ATTRIBUTE_HAS_SOLAR = "attribute_has_solar" ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors" ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter" ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area" ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness" ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = ( f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below" ) ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}" # These are the descriptions that we look for in the EPC data that are indicative of no insulation EPC_NO_WALL_INSULATION_DESCRIPTIONS = [ "cavity wall, as built, no insulation (assumed)", "cavity wall, as built, partial insulation (assumed)", "cavity wall, as built, partial insulation", "cavity wall, as built, no insulation", ] # List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated EPC_INSULATED_WALLS_SUBSTRINGS = [ ", insulated", "with external insulation", "with internal insulation", "filled cavity", ] # List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated EPC_INSULATED_ROOF_SUBSTRINGS = [ "(another dwelling above)", ", insulated", ", insulated (assumed) ", ", ceiling insulated", ] # List of strings we look for in the EPC data, where substrings indicate that the cavity is empty UNINSULATED_CAVITY_SUBSTRINGS = [ "cavity wall, as built, no insulation (assumed)", "cavity wall, as built, no insulation", "cavity wall, as built, partial insulation (assumed)", "cavity wall, as built, partial insulation", ] # Work type prefixes: # Empties EMPTY_CAVITY_NON_INTRUSIVE = "Non-Intrusive Data Shows Empty Cavity" EMPTY_CAVITY_NON_INTRUSIVE_YEAR = ( "Non-Intrusive Data Shows Empty Cavity, built after 2002" ) EPC_EMPTY_INSPECTIONS_RETRO_DRILLED = ( "EPC Shows Empty Cavity, inspections show retro drilled" ) EPC_EMPTY_INSPECTIONS_FILLED = ( "EPC Shows Empty Cavity, inspections show filled or other" ) EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD = ( "EPC Shows Empty Cavity, inspections show filled at build" ) EPC_EMPTY_INSPECTIONS_NON_CAVITY = ( "EPC Shows Empty Cavity, inspections show non-cavity build" ) EPC_EMPTY = "EPC Shows Empty Cavity" LANDLORD_EMPTY_INSPECTIONS_OTHER = ( "Landlord Data Shows Empty Cavity, EPC & Inspections Shows Filled or " "Non-cavity" ) # Extraction EXTRACTION_NON_INTRUSIVE = "Non-Intrusive Data Shows Cavity Extraction" # Solar SOLAR_ELIGIBLE = "Solar Eligible" SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED = ( "Solar Eligible, Solid Wall Uninsulated, EPC E or Below" ) SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE = "Solar Eligible, Needs Heating Upgrade" CRM_HISTORICAL_CAVITY_PRODUCT = { "id": 156989182176, "unit_price": 0, "name": "Historical ECO Cavity", } CRM_PRODUCTS = { "Empty Cavity - ECO4": { "id": 82733738177, "unit_price": 1000, "name": "Empty Cavity - ECO4", }, "Extract & Fill - ECO4": { "id": 100307905778, "unit_price": 500, "name": "Extract & Fill - ECO4", }, "Solar PV - ECO4": { "id": 82623589564, "unit_price": 1608, "name": "Solar PV - ECO4", }, "Solar PV + HHRSH - ECO4": { "id": 155529972924, "unit_price": 1608, "name": "Solar PV + HHRSH - ECO4", }, "Solar PV + Heating Upgrade - ECO4": { "id": 109265426665, "unit_price": 1608, "name": "Solar PV + Heating Upgrade - ECO4", }, "Historical ECO Cavity": CRM_HISTORICAL_CAVITY_PRODUCT, } def __init__( self, local_filepath, sheet_name, address1_colname, postcode_colname, full_address_colname, landlord_property_id=None, full_address_cols_to_concat=None, missing_postcodes_method=None, address1_extraction_method=None, landlord_year_built=None, landlord_uprn=None, landlord_property_type=None, landlord_built_form=None, landlord_wall_construction=None, landlord_roof_construction=None, landlord_heating_system=None, landlord_existing_pv=None, landlord_sap=None, landlord_block_reference=None, phase=False, header=0, ): self.local_filepath = local_filepath self.sheet_name = sheet_name # Read in the data if local_filepath.endswith(".xlsx"): self.raw_asset_list = pd.read_excel( local_filepath, header=header, sheet_name=sheet_name ) else: self.raw_asset_list = pd.read_csv(local_filepath) self.standardised_asset_list = self.raw_asset_list.copy() # Will be used to store aggregated figures against the various work types self.work_type_figures = {} self.block_analysis_df = None self.duplicated_addresses = None self.contact_details = None self.contact_detail_fields = None self.outcomes = None self.outcomes_no_match = pd.DataFrame() self.outcomes_for_output = pd.DataFrame() self.master_surveyed = None self.unmatched_submissions = pd.DataFrame() self.ecosurv = None self.ecosurv_no_match = pd.DataFrame() self.geographical_areas = pd.DataFrame() # When this is True, we intend to break the programme into multiple phases. We may need to review # how this is structured in the future, as depending on how we get future data, we may need to # remove some existing phases from the reporting, or specifically highlight the phase (1 to n-1) # properties, assuming the current phase is n. self.phase = phase # We detect the presence of the non-intrusive columns self.non_intrusives_present = ( "CIGA Check Required" in self.raw_asset_list.columns ) # We detect if we have the old format of non-intruvies self.old_format_non_intrusives_present = ( "WFT Findings" in self.raw_asset_list.columns ) if self.old_format_non_intrusives_present: self.non_intrusives_present = False self.non_intrusives_eligibility = ( "Eligibility (Red/Yellow/Green)" in self.raw_asset_list.columns ) self.new_format_non_insturives_present = ( "Has the property been re-walled?" in self.raw_asset_list.columns ) self.new_format_non_insturives_present_v2 = ( "TILE HUNG" in self.raw_asset_list.columns ) self.solar_non_intrusives_present = ( "AREA (m²) OF ROOF WHERE PV WILL BE SITUATED" in self.raw_asset_list.columns ) # Names of columns self.landlord_property_id = landlord_property_id self.address1_colname = address1_colname self.postcode_colname = postcode_colname self.full_address_colname = full_address_colname self.landlord_year_built = landlord_year_built self.landlord_uprn = landlord_uprn self.landlord_property_type = landlord_property_type self.landlord_built_form = landlord_built_form self.landlord_wall_construction = landlord_wall_construction self.landlord_roof_construction = landlord_roof_construction self.landlord_heating_system = landlord_heating_system self.landlord_existing_pv = landlord_existing_pv self.landlord_sap = landlord_sap self.landlord_block_reference = landlord_block_reference # parameters for cleaning self.full_address_cols_to_concat = full_address_cols_to_concat self.missing_postcodes_method = missing_postcodes_method self.address1_extraction_method = address1_extraction_method self.debug_information = { "property_type": None, "wall_construction": None, "heating_system": None, "existing_pv": None, } self.variable_mappings = {} self.hubspot_data = None self.rename_map = {} self.keep_variables = [] # Finally, we handle the case where the landlord's property ID is actually the OS UPRN if (self.landlord_uprn == self.landlord_property_id) and ( self.landlord_property_id is not None ): self.standardised_asset_list[self.STANDARD_UPRN] = ( self.standardised_asset_list[self.landlord_uprn].copy() ) # Update the reference to landlord UPRn self.landlord_uprn = self.STANDARD_UPRN # Handle the case when full address and address 1 are the same if self.full_address_colname == self.address1_colname: self.full_address_colname = self.STANDARD_FULL_ADDRESS self.standardised_asset_list[self.full_address_colname] = ( self.standardised_asset_list[self.address1_colname].copy() ) # Handle the case where the property type column and built form are missing if self.landlord_property_type is None and self.landlord_built_form is None: if "Archetype" in self.raw_asset_list.columns: # We use the non-intrusives as our property type and built form self.landlord_property_type = self.STANDARD_PROPERTY_TYPE self.landlord_built_form = self.STANDARD_BUILT_FORM self.standardised_asset_list[self.landlord_property_type] = ( self.standardised_asset_list["Archetype"].copy() ) self.standardised_asset_list[self.landlord_built_form] = ( self.standardised_asset_list["Archetype"].copy() ) else: # We use the EPC data as our property type and built form self.landlord_property_type = self.STANDARD_PROPERTY_TYPE self.landlord_built_form = self.STANDARD_BUILT_FORM self.standardised_asset_list[self.landlord_property_type] = None self.standardised_asset_list[self.landlord_built_form] = None # Handle the case where the property type column is the same as the built type if self.landlord_property_type == self.landlord_built_form: self.landlord_built_form = self.STANDARD_BUILT_FORM self.standardised_asset_list[self.landlord_built_form] = ( self.standardised_asset_list[self.landlord_property_type].copy() ) # If landlord built form is None (which it often is) we use the built for from inspections if (self.landlord_built_form is None) and self.non_intrusives_present: self.landlord_built_form = self.STANDARD_BUILT_FORM self.standardised_asset_list[self.landlord_built_form] = ( self.standardised_asset_list["Archetype"].copy() ) self.prefixes_to_products = { # Empty self.EMPTY_CAVITY_NON_INTRUSIVE: self.CRM_PRODUCTS["Empty Cavity - ECO4"], self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED: self.CRM_PRODUCTS[ "Empty Cavity - ECO4" ], self.EPC_EMPTY_INSPECTIONS_FILLED: self.CRM_PRODUCTS["Empty Cavity - ECO4"], self.EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD: self.CRM_PRODUCTS[ "Empty Cavity - ECO4" ], self.EPC_EMPTY_INSPECTIONS_NON_CAVITY: self.CRM_PRODUCTS[ "Empty Cavity - ECO4" ], self.EPC_EMPTY: self.CRM_PRODUCTS["Empty Cavity - ECO4"], self.LANDLORD_EMPTY_INSPECTIONS_OTHER: self.CRM_PRODUCTS[ "Empty Cavity - ECO4" ], # Extraction self.EXTRACTION_NON_INTRUSIVE: self.CRM_PRODUCTS["Extract & Fill - ECO4"], # Solar self.SOLAR_ELIGIBLE: self.CRM_PRODUCTS["Solar PV - ECO4"], self.SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED: self.CRM_PRODUCTS[ "Solar PV - ECO4" ], self.SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE: self.CRM_PRODUCTS[ "Solar PV + Heating Upgrade - ECO4" ], } def _extract_address1( self, asset_list, full_address_col, postcode_col, method="first_two_words" ): if method not in self.ADDRESS_1_CLEANING_METHODS: raise ValueError(f"Method {method} for producing address1 not recognized") if method == "first_two_words": asset_list[self.address1_colname] = ( asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") ) return asset_list if method == "first_word": asset_list[self.address1_colname] = ( asset_list[full_address_col].str.split(" ").str[0] ) return asset_list if method == "house_number_extraction": asset_list[self.address1_colname] = asset_list.apply( lambda x: SearchEpc.get_house_number( address=x[full_address_col], postcode=x[postcode_col] ), axis=1, ) for _, x in asset_list.iterrows(): SearchEpc.get_house_number( address=x[full_address_col], postcode=x[postcode_col] ) return asset_list raise ValueError(f"Method {method} not recognized") @staticmethod def _address1_extraction(x): pass def create_property_id(self): """ This function creates the domna property ID, which is simply a hash of the full address and postcode We want all figures to be positive :return: """ # We'll remove punctuation and whitespace from the address, before hashing to produce an ID def _make_hash(value): """Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value.""" # Normalize and remove special characters for cleaner ID cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower() # Generate SHA-256 hash and truncate it short_hash = hashlib.sha256(value.encode()).hexdigest()[:12] return f"{cleaned_value}-{short_hash}" # Apply transformation self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = ( ( self.standardised_asset_list[self.full_address_colname] + self.standardised_asset_list[self.postcode_colname] ) .str.strip() .str.replace(r"[^\w\s]", "", regex=True) .str.replace(" ", "") .str.lower() .apply(_make_hash) ) @staticmethod def _strip_postcode_from_full_address(full_address, postcode): cleaned = full_address.replace(postcode, "") # Remove any trailing commas and spaces cleaned = cleaned.rstrip(", ").strip(",").strip() return cleaned @classmethod def _identify_multi_address(cls, address): # We check if the address is comma separated if "," in address: address1_section = address.split(",")[0] # We look for string in the form (x-y) return bool(cls.MULTI_UNIT_REGEX.search(address1_section)) @staticmethod def _convert_uprn(x): """ Used to convert UPRNS to integer strings :param x: uprn to convert :return: converted uprn """ if pd.isnull(x): return x # check if numeric if np.isreal(x): return str(int(x)) if str(x).isdigit(): return str(int(x)) return x @staticmethod def _clean_postcode(postcode): # Remove double spaces postcode = postcode.replace(" ", " ") if " " not in postcode: # Restructure it return " ".join([postcode[:-3], postcode[-3:]]) return postcode def init_standardise(self): """ This function is used to standardise the asset list :return: standardised asset list """ # Remove rows without a postcode if self.postcode_colname is not None: self.standardised_asset_list = self.standardised_asset_list.dropna( subset=[self.postcode_colname] ) # We also clean postcode columns where if there is not space, we create one self.standardised_asset_list[self.postcode_colname] = ( self.standardised_asset_list[self.postcode_colname].apply( self._clean_postcode ) ) # We clean up portential non-breaking spaces, and double spaces for col in [ c for c in [ self.postcode_colname, self.full_address_colname, self.address1_colname, ] if c is not None ]: self.standardised_asset_list[col] = self.standardised_asset_list[ col ].astype(str) self.standardised_asset_list[col] = self.standardised_asset_list[ col ].str.replace("\xa0", " ", regex=False) self.standardised_asset_list[col] = self.standardised_asset_list[ col ].str.replace(" ", " ", regex=False) if self.address1_colname is None: if self.address1_extraction_method is None: raise ValueError( "Missing address 1 - please specify an extraction method" ) self.address1_colname = self.STANDARD_ADDRESS_1 # If we do not have this, we produce it self.standardised_asset_list = self._extract_address1( asset_list=self.standardised_asset_list, full_address_col=self.full_address_colname, postcode_col=self.postcode_colname, method=self.address1_extraction_method, ) if self.full_address_colname is None: if not self.full_address_cols_to_concat: raise ValueError( "Missing full address - please specify columns to concatenate" ) self.full_address_colname = self.STANDARD_FULL_ADDRESS self.standardised_asset_list[self.full_address_colname] = ( self.standardised_asset_list[self.full_address_cols_to_concat].apply( lambda x: ", ".join([y for y in x if not pd.isnull(y)]), axis=1 ) ) else: # Make sure to strip the postcode out of the full address self.standardised_asset_list[self.full_address_colname] = ( self.standardised_asset_list.apply( lambda x: self._strip_postcode_from_full_address( full_address=x[self.full_address_colname], postcode=x[self.postcode_colname], ), axis=1, ) ) # We create the domna property id self.create_property_id() # Clean up the UPRN column, if the landlord has provided them if self.landlord_uprn is not None: self.standardised_asset_list[self.landlord_uprn] = ( self.standardised_asset_list[self.landlord_uprn].apply( self._convert_uprn ) ) # We keep just the columns we care about and will work through the various columns and standardise variables = [ self.landlord_property_id, self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname, self.full_address_colname, self.landlord_uprn, self.landlord_property_type, self.landlord_built_form, self.landlord_year_built, self.landlord_wall_construction, self.landlord_roof_construction, self.landlord_heating_system, self.landlord_existing_pv, self.landlord_sap, self.landlord_block_reference, ] # Keep just non-null variables (e.g landlord may not provide uprn self.keep_variables = [v for v in variables if v is not None] self.rename_map = { self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID, self.address1_colname: self.STANDARD_ADDRESS_1, self.postcode_colname: self.STANDARD_POSTCODE, self.full_address_colname: self.STANDARD_FULL_ADDRESS, self.landlord_uprn: self.STANDARD_UPRN, self.landlord_property_type: self.STANDARD_PROPERTY_TYPE, self.landlord_built_form: self.STANDARD_BUILT_FORM, self.landlord_year_built: self.STANDARD_YEAR_BUILT, self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION, self.landlord_roof_construction: self.STANDARD_ROOF_CONSTRUCTION, self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM, self.landlord_existing_pv: self.STANDARD_EXISTING_PV, self.landlord_sap: self.STANDARD_SAP, self.landlord_block_reference: self.STANDARD_BLOCK_REFERENCE, } self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None} non_intrusive_columns = [] if ( self.non_intrusives_present and not self.new_format_non_insturives_present_v2 ): non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES if self.non_intrusives_eligibility: non_intrusive_columns.append(self.NON_INTRUSIVES_ELIGIBILITY_COLUMN) if self.new_format_non_insturives_present: non_intrusive_columns += self.NON_INTRUSIVES_NEW_FORMAT_COLNAMES if self.new_format_non_insturives_present_v2: non_intrusive_columns += self.NON_INTRUSIVES_NEW_FORMAT_COLNAMES_V2 if self.solar_non_intrusives_present: non_intrusive_columns += self.NON_INTRUSIVES_SOLAR_COLNAMES if self.old_format_non_intrusives_present: # We check if we have the ECO Eligibility column, which we might not have non_intrusive_columns = [ c for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES if c in self.standardised_asset_list.columns ] if "Warmfront Finding" in self.standardised_asset_list.columns: non_intrusive_columns.append("Warmfront Finding") self.keep_variables += non_intrusive_columns self.rename_map = { **self.rename_map, **dict( zip( non_intrusive_columns, ["non-intrusives: " + c for c in non_intrusive_columns], ) ), } # We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y) self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[ self.full_address_colname ].apply(lambda x: self._identify_multi_address(x)) # We handle cleaning for walls, in the instance that the landlord provides us with EPC data and # we see instances of "average thermal transmittance" in the description if self.landlord_wall_construction is not None: self.standardised_asset_list[self.landlord_wall_construction] = np.where( self.standardised_asset_list[self.landlord_wall_construction] .str.lower() .str.contains("average thermal transmittance") == True, "new build - average thermal transmittance", self.standardised_asset_list[self.landlord_wall_construction], ) else: # We want to make sure that we have a column for wall construction self.landlord_wall_construction = self.STANDARD_WALL_CONSTRUCTION self.standardised_asset_list[self.landlord_wall_construction] = None if self.landlord_roof_construction is None: self.landlord_roof_construction = self.STANDARD_ROOF_CONSTRUCTION self.standardised_asset_list[self.landlord_roof_construction] = None # Clear our build year column # We attempt to process the year built column if self.landlord_year_built is not None: # We check if we have a datetime - year built has not been renamed if isinstance( self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime ): # We treat any string columns - with common values we see self.standardised_asset_list[self.landlord_year_built] = ( self.standardised_asset_list[self.landlord_year_built].replace( self.DATETIME_REMAP ) ) no_data_codes = {"No Data": None} self.standardised_asset_list[self.landlord_year_built] = ( self.standardised_asset_list[self.landlord_year_built].replace( no_data_codes ) ) self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime( self.standardised_asset_list[self.landlord_year_built] ) # Convert this to year self.standardised_asset_list[self.landlord_year_built] = ( self.standardised_asset_list[self.landlord_year_built].dt.year ) else: # We attempt to convert the year built to a datetime, by detecting the format and converting def extract_year(date_str): known_errors = { "#MULTIVALUE", "ND", "PIMSS EMPTY", "UNKNOWN", "This cell has an external reference that can't be shown or edited. Editing this cell will " "remove the external reference.", 0, } if pd.isnull(date_str) or date_str in known_errors: return None # Handle datetime if isinstance(date_str, datetime): return date_str.year # Handle numeric year (float or int) if isinstance(date_str, (int, float, np.int_)): if 1000 <= int(date_str) <= 2100: return int(date_str) # Now handle string-based logic if isinstance(date_str, str): # Direct date match e.g. 01-Jul-2021 match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str) if match: return int(match.group(1)) # Find all 4-digit years in string years = [ int(y) for y in re.findall(r"\b(?:19|20)\d{2}\b", date_str) ] if years: return max(years) # Return most recent year # If only numbers are present without format numeric_str = re.sub(r"\D", "", date_str) if len(numeric_str) == 4 and numeric_str.isdigit(): return int(numeric_str) raise NotImplementedError( f"Unhandled format for year built, value is {date_str} - implement me" ) self.standardised_asset_list[self.landlord_year_built] = ( self.standardised_asset_list[self.landlord_year_built].apply( extract_year ) ) # We now create standard lookups to_remap = { self.landlord_property_type: { "standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES, "standard_map": property_type_mappings.PROPERTY_MAPPING, }, self.landlord_built_form: { "standard_values": built_form_mappings.STANDARD_BUILT_FORMS, "standard_map": built_form_mappings.BUILT_FORM_MAPPINGS, }, self.landlord_wall_construction: { "standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS, "standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS, }, self.landlord_heating_system: { "standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS, "standard_map": heating_mappings.HEATING_MAPPINGS, }, self.landlord_existing_pv: { "standard_values": existing_pv_mappings.STANDARD_EXISTING_PV, "standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS, }, self.landlord_roof_construction: { "standard_values": roof_mappings.STANDARD_ROOF_CONSTRUCTIONS, "standard_map": roof_mappings.ROOF_CONSTRUCTION_MAPPINGS, }, } # Keep just entries where the key is not None to_remap = {k: v for k, v in to_remap.items() if k is not None} for variable, config in to_remap.items(): logger.info("Standardising variable: %s", variable) # Strip each of these columns self.standardised_asset_list[variable] = self.standardised_asset_list[ variable ].str.strip() values_to_remap = self.standardised_asset_list[variable].unique() # We want to map this to our standardised list of property types we're interested in remapper = DataRemapper( standard_values=config["standard_values"], standard_map=config["standard_map"], ) remap_dictionary = remapper.standardize_list( values_to_remap=values_to_remap.tolist() ) self.variable_mappings[variable] = remap_dictionary # We now print out the variable mappings, which can be reviewed by the user, before the final standardised # asset list is returned for variable, mapping in self.variable_mappings.items(): pprint(f"Variable: {variable}") pprint(mapping) # Print a space print("\n") pprint("=======================================") def apply_standardiation(self, override_empty_mappings=False): """ This function applies the standardisation to the asset list :param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant if there are no categories which need remapping which is highly unlikely :return: """ if self.phase: # We filter on just the properties that have had an inspection if ( self.new_format_non_insturives_present_v2 or self.solar_non_intrusives_present ): self.standardised_asset_list = self.standardised_asset_list[ ~self.standardised_asset_list["NAME OF SURVEYOR"].isin( ["YET TO BE SURVEYED", "", None] ) ] self.standardised_asset_list = self.standardised_asset_list[ ~pd.isnull(self.standardised_asset_list["NAME OF SURVEYOR"]) ] else: self.standardised_asset_list = self.standardised_asset_list[ ~self.standardised_asset_list["Surveyors Name"].isin( ["YET TO BE SURVEYED"] ) ] if not self.variable_mappings and not override_empty_mappings: raise ValueError("Please run init_standardise first") logger.info("Applying standardisation to asset list") for variable, mapping in self.variable_mappings.items(): self.standardised_asset_list[variable + "_original_from_landlord"] = ( self.standardised_asset_list[variable].copy() ) self.standardised_asset_list[variable] = self.standardised_asset_list[ variable ].map(mapping) if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum(): # Drop the dupes pprint( f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated " f"addresses - dropping" ) # Keep a record of duplicates self.duplicated_addresses = self.standardised_asset_list[ self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated() ][ [ self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname, self.postcode_colname, ] ].copy() df = self.standardised_asset_list[ self.standardised_asset_list[self.DOMNA_PROPERTY_ID].isin( self.duplicated_addresses[self.DOMNA_PROPERTY_ID] ) ][ [ self.landlord_property_id, self.DOMNA_PROPERTY_ID, self.full_address_colname, self.address1_colname, self.postcode_colname, ] ].copy() df = df.sort_values(by=[self.DOMNA_PROPERTY_ID]) self.standardised_asset_list = self.standardised_asset_list[ ~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated() ] # Apply renames to our standard names # Perform final variable selection and renaming: # We add the original columns to the keep variables self.keep_variables += [ k + "_original_from_landlord" for k in self.variable_mappings.keys() ] self.standardised_asset_list = self.standardised_asset_list[ self.keep_variables ].rename(columns=self.rename_map) # We fill any standard columns that are not in the data because they were not provided by the landlord missing_variables = [ v for v in [ self.STANDARD_EXISTING_PV, self.STANDARD_HEATING_SYSTEM, self.STANDARD_UPRN, self.STANDARD_PROPERTY_TYPE, self.STANDARD_YEAR_BUILT, self.STANDARD_WALL_CONSTRUCTION, self.STANDARD_HEATING_SYSTEM, self.STANDARD_BLOCK_REFERENCE, ] if v not in self.standardised_asset_list.columns ] for v in missing_variables: self.standardised_asset_list[v] = None # Convert to string self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = ( self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str) ) # CLean up the standard SAP column, that can be problematic if self.landlord_sap is not None: self.standardised_asset_list[self.STANDARD_SAP] = ( self.standardised_asset_list[self.STANDARD_SAP] .astype(str) .str.replace("\xa0", " ", regex=False) .str.strip() ) self.standardised_asset_list[self.STANDARD_SAP] = np.where( self.standardised_asset_list[self.STANDARD_SAP] == "", None, self.standardised_asset_list[self.STANDARD_SAP], ) self.standardised_asset_list[self.STANDARD_SAP] = ( self.standardised_asset_list[self.STANDARD_SAP].astype(float) ) # If it's zero, we set it to None self.standardised_asset_list[self.STANDARD_SAP] = np.where( self.standardised_asset_list[self.STANDARD_SAP] == 0, None, self.standardised_asset_list[self.STANDARD_SAP], ) has_blocks_of_flats = ( self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats" ).sum() # Perform block splitting, ahead of fetching the EPC data # If we blocks of flats, without a landlord block reference, we create this self.fill_landlord_block_reference(has_blocks_of_flats) # If we have blocks of flats, we split these out into individual units. self.split_blocks() def merge_data(self, df: pd.DataFrame): """ Used to insert data into the standardised asset list, based on the domna property id :return: """ if self.DOMNA_PROPERTY_ID not in df.columns: raise ValueError( f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}" ) if df[self.DOMNA_PROPERTY_ID].duplicated().sum(): df = df.drop_duplicates(subset=[self.DOMNA_PROPERTY_ID], keep="first") self.standardised_asset_list = self.standardised_asset_list.merge( df, how="left", on=self.DOMNA_PROPERTY_ID ) def extract_attributes(self, pull_epc=True): # Used to extracty the typical attributes that we use to identify viable work self.standardised_asset_list[ self.ATTRIBUTE_HAS_SOLAR ] = self.standardised_asset_list[ self.FIND_EPC_DATA_NAMES["Solar photovoltaics"] ] | ~self.standardised_asset_list[ self.EPC_API_DATA_NAMES["photo-supply"] ].isin( ["0.0", 0, None, "", np.nan] ) accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"] # The logic here is: # 1) Take the property type provided by the HA themselves # 2) In absence of that, take the EPC property type # 3) Otherwise use None self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = ( self.standardised_asset_list.apply( lambda x: estimate_number_of_floors( property_type=( str(x[self.STANDARD_PROPERTY_TYPE]).title() if str(x[self.STANDARD_PROPERTY_TYPE]).title() in accepted_epc_property_types else ( x[self.EPC_API_DATA_NAMES["property-type"]] if not pd.isnull( x[self.EPC_API_DATA_NAMES["property-type"]] ) else None ) ) ), axis=1, ) ) self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["total-floor-area"] ].astype(float) ) # Replace "" value with None self.standardised_asset_list[ self.EPC_API_DATA_NAMES["number-habitable-rooms"] ] = self.standardised_asset_list[ self.EPC_API_DATA_NAMES["number-habitable-rooms"] ].replace( "", None ) self.standardised_asset_list[ self.EPC_API_DATA_NAMES["number-habitable-rooms"] ] = self.standardised_asset_list[ self.EPC_API_DATA_NAMES["number-habitable-rooms"] ].astype( float ) # Estimate the perimeter # Handle funky edge case self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = ( np.where( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["total-floor-area"] ] == 0 ), self.standardised_asset_list[ self.EPC_API_DATA_NAMES["total-floor-area"] ].mean(), self.standardised_asset_list[ self.EPC_API_DATA_NAMES["total-floor-area"] ], ) ) self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = ( self.standardised_asset_list.apply( lambda x: estimate_perimeter( floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS], num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS], ), axis=1, ) ) self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = ( self.standardised_asset_list.apply( lambda x: estimate_external_wall_area( num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS], floor_height=( float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if not pd.isnull(x[self.EPC_API_DATA_NAMES["floor-height"]]) else 2.5 ), perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER], built_form=x[self.EPC_API_DATA_NAMES["built-form"]], ), axis=1, ) ) col = self.EPC_API_DATA_NAMES["roof-description"] self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = ( self.standardised_asset_list.apply( lambda x: ( RoofAttributes(description=x[col]).process()["insulation_thickness"] if not pd.isnull(x[col]) else None ), axis=1, ) ) self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = ( self.standardised_asset_list[ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS ].str.replace("+", "") ) # We produce some additional fields # 1) Is the SAP rating below C75 self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ].astype(float) <= self.FILLED_CAVITY_SAP_THRESHOLD ) # 2) Flag anything where the EPC is older than 5 years self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = ( pd.to_datetime( self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]] ).dt.year < self.EPC_YEAR_THRESHOLD ) self.process_age_band() def process_age_band(self): processed_age_band = [] for _, x in self.standardised_asset_list.iterrows(): if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or ( x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES ): processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": None, "epc_year_upper_bound": None, "does_age_band_match_epc_age_band": "No EPC Age Band", } ) continue # We exatract the upper and lower bounds if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [ "England and Wales: 2007 onwards", "England and Wales: 2012 onwards", ]: year_lower_bound = ( 2007 if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: 2007 onwards" else 2012 ) if pd.isnull(x[self.STANDARD_YEAR_BUILT]): age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound else "EPC Age Band is older than Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": year_lower_bound, "epc_year_upper_bound": None, "does_age_band_match_epc_age_band": age_band_matches, } ) continue if ( x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900" ): if pd.isnull(x[self.STANDARD_YEAR_BUILT]): age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900 else "EPC Age Band is newer than Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": None, "epc_year_upper_bound": 1899, "does_age_band_match_epc_age_band": age_band_matches, } ) continue if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit(): if pd.isnull(x[self.STANDARD_YEAR_BUILT]): age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) else "EPC Age Band is different from Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": int( x[self.EPC_API_DATA_NAMES["construction-age-band"]] ), "epc_year_upper_bound": int( x[self.EPC_API_DATA_NAMES["construction-age-band"]] ), "does_age_band_match_epc_age_band": age_band_matches, } ) continue # Oherwise, we extract the upper and lower bounds age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[ 1 ] lower_date, upper_date = age_band.split("-") if not x[self.STANDARD_YEAR_BUILT]: age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and (x[self.STANDARD_YEAR_BUILT] <= float(upper_date)) else ( "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date) else "EPC Age Band is newer than Year Built" ) ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": int(lower_date), "epc_year_upper_bound": int(upper_date), "does_age_band_match_epc_age_band": age_band_matches, } ) processed_age_band = pd.DataFrame(processed_age_band) self.standardised_asset_list = self.standardised_asset_list.merge( processed_age_band, how="left" ) def identify_worktypes(self): if self.landlord_sap is not None: # We add a SAP category for all work type identification self.standardised_asset_list["SAP Category"] = np.where( ( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= 54 ) | (self.standardised_asset_list[self.STANDARD_SAP] <= 54) ), "SAP Rating 54 or less", np.where( ( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= 68 ) | (self.standardised_asset_list[self.STANDARD_SAP] <= 68) ), "SAP Rating 55-68", np.where( ( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= self.EMPTY_CAVITY_SAP_THRESHOLD ) | ( self.standardised_asset_list[self.STANDARD_SAP] <= self.EMPTY_CAVITY_SAP_THRESHOLD ) ), f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}", f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more", ), ), ) self.standardised_asset_list["SAP Category"] = np.where( pd.isnull(self.standardised_asset_list[self.STANDARD_SAP]) & pd.isnull( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] ), "SAP Unknown", self.standardised_asset_list["SAP Category"], ) else: # We add a SAP category for all work type identification # We break into 4 categories (54 or less, 55-68, 69-74, 75 or more) self.standardised_asset_list["SAP Category"] = np.where( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= 54 ), "SAP Rating 54 or less", np.where( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= 68 ), "SAP Rating 55-68", np.where( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= self.EMPTY_CAVITY_SAP_THRESHOLD ), f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}", f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more", ), ), ) self.standardised_asset_list["SAP Category"] = np.where( pd.isnull( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] ), "SAP Unknown", self.standardised_asset_list["SAP Category"], ) # Before we being, we identify if a property has solar already as we use this # for identifying cavity jobs if self.non_intrusives_present and not self.old_format_non_intrusives_present: if ( self.new_format_non_insturives_present_v2 or self.solar_non_intrusives_present ): existing_solar_non_intrusives_check = ( self.standardised_asset_list["non-intrusives: ROOF ORIENTATION"] .str.strip() .isin(["ALREADY HAS SOLAR PV", "ALREADY HAS PV"]) ) else: existing_solar_non_intrusives_check = ( self.standardised_asset_list[ "non-intrusives: PV, ACCESS ISSUE, SEE NOTES" ] == "SOLAR PV ON ROOF" ) elif self.old_format_non_intrusives_present: existing_solar_non_intrusives_check = ( self.standardised_asset_list["non-intrusives: WFT Findings"] .str.lower() .str.strip() .isin(["solar pv on roof"]) ) else: # We don't have an indication existing_solar_non_intrusives_check = False self.standardised_asset_list["property_has_solar"] = ( ( self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV" ) | existing_solar_non_intrusives_check | (self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR]) ) # If we have non-intrusives completed, we can use this to identify work types ###################################################### # Empty cavity: ###################################################### # 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled # 2) The age is before 1995 # 3) We don't remove anything that haas access issues yet if self.non_intrusives_present: if self.new_format_non_insturives_present_v2: non_intrusives_wall_filter = ( self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY" ) & self.standardised_asset_list["non-intrusives: Insulated"].isin( ["EMPTY", "PARTIAL", "EMPTY CAVITY"] ) else: non_intrusives_wall_filter = ( self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY" ) & self.standardised_asset_list["non-intrusives: Insulated"].isin( ["EMPTY", "PARTIAL"] ) elif self.old_format_non_intrusives_present: non_intrusives_wall_filter = self.standardised_asset_list[ "non-intrusives: WFT Findings" ].str.lower().str.strip().isin( [ "empty cavity", "partial fill", "empty", "EMPTY CAVITY 70MM", "partial", "empty cav", ] ) | ( ( self.standardised_asset_list["non-intrusives: WFT Findings"] .str.lower() .str.strip() .str.contains("empty cavity|partial fill") & ~self.standardised_asset_list["non-intrusives: WFT Findings"] .astype(str) .str.lower() .str.strip() .str.contains("major access issues") ) ) else: # We set the filter to False, as we have no non-intrusives non_intrusives_wall_filter = False if self.landlord_year_built is None: year_built_filter = ( self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) else: year_built_filter = ( self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) | ( self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) # Criteria: # The property isn't a bedsit # Non-intrusives indicate it needs a fill # The EPC year is before 2002 # We also flag where the property has solar on the roof, because this is a signal of a high EPC rating self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = ( ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ["bedsit"] ) ) & non_intrusives_wall_filter & year_built_filter & (~self.standardised_asset_list["property_has_solar"]) ) self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity_has_solar" ] = ( ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ["bedsit"] ) ) & non_intrusives_wall_filter & year_built_filter & ( # If the property has solar, there's a chance it won't qualify self.standardised_asset_list["property_has_solar"] ) ) # We also add a filter on anything that was generally identified by the non-intrusives self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity_no_year_filter" ] = ( ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & ~self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity_has_solar" ] & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ["bedsit"] ) ) & non_intrusives_wall_filter ) if (not self.non_intrusives_eligibility) and ( not self.old_format_non_intrusives_present ): # If we have NO inspections data, we capture all of the wall types and don't filter on age of the EPC self.standardised_asset_list["epc_indicates_empty_cavity"] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["walls-description"] ] .str.lower() .isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS) & ( self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ["bedsit"] ) ) ) else: self.standardised_asset_list["epc_indicates_empty_cavity"] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["walls-description"] ] .str.lower() .isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS) & ( self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) & (~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]) & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ["bedsit"] ) ) ) self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = ( self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( ["uninsulated cavity"] ) & ( ( self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) | ( self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) ) & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin( ["bedsit"] ) ) ) # Finally, we create a flag to indicate that the cavity is empty, based on the criteria above self.standardised_asset_list["cavity_is_empty"] = ( non_intrusives_wall_filter | self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]] .str.lower() .isin(self.EPC_NO_WALL_INSULATION_DESCRIPTIONS) | self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( ["uninsulated cavity"] ) ) ###################################################### # Extraction ###################################################### # as needing a CIGA check. What is the logic we should be applying here? if self.non_intrusives_present: extraction_wall_filter = ( ( self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY" ) & ( self.standardised_asset_list["non-intrusives: Insulated"].isin( ["RETRO DRILLED", "FILLED AT BUILD"] ) ) & ( ~self.standardised_asset_list["non-intrusives: Material"].isin( [ "GREY LOOSE BEAD", "COMPACTED BEAD", "FIBRE BATT NO CAVITY", "EMPTY NARROW BELOW 30mm", ] ) ) ) if self.non_intrusives_eligibility: # If we have the eligibility column, we check if the wall is eligible extraction_wall_filter = ( extraction_wall_filter & ~self.standardised_asset_list[ "non-intrusives: Eligibility (Red/Yellow/Green)" ].isin(["RED"]) ) self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction" ] = (extraction_wall_filter & year_built_filter) self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction_no_year_filter" ] = (extraction_wall_filter & ~year_built_filter) elif self.old_format_non_intrusives_present: print("Review these categories!!!!") extraction_wall_filter = ( self.standardised_asset_list["non-intrusives: WFT Findings"] .str.lower() .str.strip() .isin( [ "blown in yellow wool", "retro drilled & filled", "white fibre from build", "foam filled from build", "retro drilled gas in block", "block in rock wool", "rdf / tilehung", "fibre from build", "blown in rock wool", "rdf / tile hung", "retro drilled", "rock wool from build", "part rendered retro drilled", "white fibtr from build.", "retro drilled and filled", "blown in white wool", "blown in yellow fibre from build", "rdf", "polybead", "foam filled", "blown in white bead from build", "blown in yellow fibre", "retro drilled det", "blown in rockwool", "retro drilled det empty cav", "retro drilled end", "retro filled extension", "retro filled", "foam", ] ) ) self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction" ] = extraction_wall_filter self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction_no_year_filter" ] = False else: self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction" ] = False self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction_no_year_filter" ] = False ###################################################### # Solar ###################################################### # Criteria: # Check 1: Does the property have a valid heating system? self.standardised_asset_list[ "solar_landlord_data_indicates_correct_heating_system" ] = self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin( [ "air source heat pump", "ground source heat pump", "high heat retention storage heaters", "electric boiler", ] ) self.standardised_asset_list[ "solar_landlord_data_indicates_needs_heating_upgrade" ] = self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin( [ "electric storage heaters", "room heaters", "electric radiators", "no heating", "electric fuel", ] ) self.standardised_asset_list[ "solar_epc_data_indicates_correct_heating_system" ] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheat-description"] ] .str.lower() .str.contains( "air source heat pump|ground source heat pump|boiler and radiators, electric" ) ) | ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheat-description"] ] .str.lower() .str.contains("electric storage heaters") & ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheatcont-description"] ] == "Controls for high heat retention storage heaters" ) ) # If the landlord has given us the heating system, we default to that on heating upgrades. Because of the # poor heating in place, if the EPC indicates that this property had a low efficiency heating system but the # landlord data suggests otherwise (e.g. there's a gas boiler), we default to what the landlord has told us self.standardised_asset_list[ "solar_epc_data_indicates_requires_heating_upgrade" ] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheat-description"] ] .str.lower() .str.contains("electric storage heaters|room heaters") & ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheatcont-description"] ] != "Controls for high heat retention storage heaters" ) ) & ( ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin( ["district heating", "communal heating", "communal gas boiler"] ) & ~self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM] .astype(str) .str.contains("gas ") ) # Basic check - both of the previous two shouldn't be true simultaneously if ( self.standardised_asset_list[ "solar_epc_data_indicates_correct_heating_system" ] & self.standardised_asset_list[ "solar_epc_data_indicates_requires_heating_upgrade" ] ).sum(): logger.info( "We have an example of both heating system checks being true - checking known cases" ) known_edge_cases = [ "Ground source heat pump, radiators, electric, Electric storage heaters" ] error_cases = self.standardised_asset_list[ ( self.standardised_asset_list[ "solar_epc_data_indicates_correct_heating_system" ] & self.standardised_asset_list[ "solar_epc_data_indicates_requires_heating_upgrade" ] ) ] if all( error_cases[self.EPC_API_DATA_NAMES["mainheat-description"]].isin( known_edge_cases ) ): logger.info("Within known edge cases") else: raise ValueError( "Both heating system checks are true - this should not be possible" ) # Check 3: Does the property meet the fabric condition # Solar PV installs are subject to the minimum insulation requirements which means: # 1) one of the following insulation measures must be installed as part of the same # ECO4 project: # • roof insulation (flat roof, pitched roof, room-in-roof) # • exterior facing wall insulation (cavity wall, solid wall) # • party cavity wall insulation # • floor insulation (solid and underfloor) # # OR # # all measures (except any exempted measure referred to in paragraph 4.28) # listed in paragraph a) must already be installed # # With this in mind, we look for 2 clases # 1) The property is fully insulated apart from the loft (<200mm insulation) # 2) THe property is fully insulated self.standardised_asset_list[ "solar_landlord_walls_insulated" ] = self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( [ "filled cavity", "insulated solid brick", "insulated timber frame", "uninsulated cavity", "insulated system built", "insulated granite or whinstone", "insulated sandstone or limestone", "new build - average thermal transmittance", ] ) if self.non_intrusives_present: self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = ( self.standardised_asset_list["non-intrusives: Insulated"].isin( ["EWI", "RETRO DRILLED", "FILLED AT BUILD"] ) ) elif self.old_format_non_intrusives_present: self.standardised_asset_list[ "solar_non_intrusives_walls_insulated" ] = self.standardised_asset_list[ "non-intrusives: WFT Findings" ].str.lower().str.strip().isin( [ "retro drilled", "retro filled", "ewi", "retro drilled/ solid", "retro drilled and filled", ] ) | self.standardised_asset_list[ "non-intrusives: WFT Findings" ].str.lower().str.strip().str.contains( "retro drilled" ) else: self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False self.standardised_asset_list["walls_u_value"] = self.standardised_asset_list[ self.EPC_API_DATA_NAMES["walls-description"] ].apply( lambda x: ( WallAttributes(x).process()["thermal_transmittance"] if not pd.isnull(x) else None ) ) self.standardised_asset_list["solar_epc_walls_insulated"] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES[ "walls-description"]] .str.lower() .str.contains("|".join( self.EPC_INSULATED_WALLS_SUBSTRINGS)) ) | ( self.standardised_asset_list[ "walls_u_value"].apply( lambda x: x <= 0.7 if not pd.isnull( x) else False ) ) roof_data = [] for desc in self.standardised_asset_list[ self.EPC_API_DATA_NAMES["roof-description"] ].unique(): if pd.isnull(desc): continue roof_data.append( { self.EPC_API_DATA_NAMES["roof-description"]: desc, **RoofAttributes(desc).process(), } ) roof_data = pd.DataFrame(roof_data) roof_data = roof_data.rename(columns={"thermal_transmittance": "roof_u_value"}) self.standardised_asset_list = self.standardised_asset_list.merge( roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"] ) # If the u-value of a roof is less than 0.7 we consider it insulated self.standardised_asset_list["solar_epc_roof_insulated"] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]] .str.lower() .str.contains( "|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS), ) | ( self.standardised_asset_list[ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS ].apply(lambda x: int(x) >= 200 if str(x).isdigit() else False) ) | ( self.standardised_asset_list["roof_u_value"].apply( lambda x: x <= 0.7 if not pd.isnull(x) else False ) ) ) self.standardised_asset_list[ "solar_epc_loft_needs_topup" ] = self.standardised_asset_list[ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS ].apply( lambda x: int(x) < 200 if str(x).isdigit() else False ) | ( ( self.standardised_asset_list["is_loft"] | self.standardised_asset_list["is_pitched"] ) & ( self.standardised_asset_list[ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS ].isin(["below average", "none"]) ) ) self.standardised_asset_list["epc_has_floor_recommendation"] = ( self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False) ) # Check if the boiler is electric # We check if it contains both the terms boiler & electric self.standardised_asset_list["has_electric_boiler"] = ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheat-description"] ] .str.lower() .isin(["boiler and radiators, electric"]) ) | ( self.standardised_asset_list[ self.STANDARD_HEATING_SYSTEM] == "electric boiler" ) #################################### # Check solar eligibility #################################### # Set up the filters to stop repetition correct_heating_system = ( self.standardised_asset_list[ "solar_landlord_data_indicates_correct_heating_system" ] | self.standardised_asset_list[ "solar_epc_data_indicates_correct_heating_system" ] | self.standardised_asset_list["has_electric_boiler"] ) needs_heating_upgrade = ( self.standardised_asset_list[ "solar_landlord_data_indicates_needs_heating_upgrade" ] | self.standardised_asset_list[ "solar_epc_data_indicates_requires_heating_upgrade" ] ) # The requirements for walls are: # 1) walls are insulated # 2) property is a cavity (can be done insulated or not) walls_meet_solar_requirements = ( # The landlord is saying the walls are insulated self.standardised_asset_list["solar_landlord_walls_insulated"] | # EPC data is saying the walls are insulated self.standardised_asset_list["solar_epc_walls_insulated"] | # Non-intrusives are saying the walls are insulated self.standardised_asset_list["solar_non_intrusives_walls_insulated"] | # It's empty cavity self.standardised_asset_list["cavity_is_empty"] | # It's a cavity wall self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( ["filled cavity", "partial insulated cavity"] ) ) # Determine if the client gave us property type in the first place if all(self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "unknown"): # Use EPC not_a_flat = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["property-type"]] != "Flat" ) else: not_a_flat = ( self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat" ) solar_roof_meets_criteria = ( self.standardised_asset_list["solar_epc_roof_insulated"] | self.standardised_asset_list["solar_epc_loft_needs_topup"] ) self.standardised_asset_list["solar_eligible"] = ( # Property isn't a flag not_a_flat & # Landlord data or EPC data indicates the heating system is appropriate correct_heating_system & # The property doesn't currently have solar ~self.standardised_asset_list["property_has_solar"] & # The walls are insulated walls_meet_solar_requirements & # Roof meets criteria solar_roof_meets_criteria ) # With heating upgrade self.standardised_asset_list["solar_eligible_needs_heating_upgrade"] = ( not_a_flat & # Needs heating upgrade needs_heating_upgrade & # The property doesn't currently have solar ~self.standardised_asset_list["property_has_solar"] & # The walls are insulated walls_meet_solar_requirements & # Roof meets criteria solar_roof_meets_criteria ) # We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E # or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = ( not_a_flat & # Landlord data or EPC data indicates the heating system is appropriate - in this case, we can also take # electric boilers correct_heating_system & # The property doesn't currently have solar ~self.standardised_asset_list["property_has_solar"] & # The walls are uninsulated solid ~walls_meet_solar_requirements & ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ] <= 57 ) ) # Drop anything we don't need self.standardised_asset_list = self.standardised_asset_list.drop( columns=["walls_u_value", "roof_u_value"] ) # Adjust flagged extraction jobs to remove anything for solar self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = ( self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] & ~self.standardised_asset_list["solar_eligible"] ) # Finally, we note why each property has been flagged self.standardised_asset_list["cavity_reason"] = None empty_cavity_map = { "non_intrusive_indicates_empty_cavity": self.EMPTY_CAVITY_NON_INTRUSIVE + ": ", "non_intrusive_indicates_empty_cavity_has_solar": f"{self.EMPTY_CAVITY_NON_INTRUSIVE} - property " "already has solar: ", "non_intrusive_indicates_empty_cavity_no_year_filter": f"{self.EMPTY_CAVITY_NON_INTRUSIVE}, " f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ", } for variable, description in empty_cavity_map.items(): self.standardised_asset_list["cavity_reason"] = np.where( self.standardised_asset_list[variable] & pd.isnull(self.standardised_asset_list["cavity_reason"]), description + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) # We break the cavity reason into a few different categories, when the EPC is different from inspections if self.old_format_non_intrusives_present: self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity" ] & ( self.standardised_asset_list["non-intrusives: WFT Findings"] .str.lower() .str.strip() .isin( [ "retro drilled and filled", "retro drilled", "retro filled", "retro drilled & filled", ] ) ) & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity" ] & self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction" ] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EPC_EMPTY_INSPECTIONS_FILLED}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) elif self.non_intrusives_present: self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity" ] & ( self.standardised_asset_list["non-intrusives: Insulated"] == "RETRO DRILLED" ) & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EPC_EMPTY_INSPECTIONS_RETRO_DRILLED}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity" ] & ( self.standardised_asset_list["non-intrusives: Insulated"] == "FILLED AT BUILD" ) & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EPC_EMPTY_INSPECTIONS_FILLED_AT_BUILD}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) else: self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list[ "non_intrusive_indicates_empty_cavity" ] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EPC_EMPTY}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EPC_EMPTY_INSPECTIONS_NON_CAVITY}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) # Work type prefixes # Landlord data: The landlord's data indicates that the wall is an uninsulated cavity wall, but EPC and # inspections show filled self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["landlord_data_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & ~self.standardised_asset_list["epc_indicates_empty_cavity"] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.LANDLORD_EMPTY_INSPECTIONS_OTHER}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) # Flag extraction self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction" ] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EXTRACTION_NON_INTRUSIVE}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list[ "non_intrusive_indicates_cavity_extraction_no_year_filter" ] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), f"{self.EXTRACTION_NON_INTRUSIVE}, built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"], ) ###################################################### # Flag solar ###################################################### self.standardised_asset_list["solar_reason"] = None # Map of variables and fill values for the solar_reason variable # ordering of this map is important, where we flag our prioritised work types first solar_reason_map = { "solar_eligible": f"{self.SOLAR_ELIGIBLE}: ", "solar_eligible_solid_wall_uninsulated": f"{self.SOLAR_ELIGIBLE_SOLID_WALL_UNINSULATED}: ", "solar_eligible_needs_heating_upgrade": f"{self.SOLAR_ELIGIBLE_NEEDS_HEATING_UPGRADE}: ", } for variable, reason in solar_reason_map.items(): self.standardised_asset_list["solar_reason"] = np.where( self.standardised_asset_list[variable] & pd.isnull(self.standardised_asset_list["solar_reason"]), reason + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["solar_reason"], ) # Finally, anything flagged for solar should not be flagged for cavity - make them None self.standardised_asset_list["cavity_reason"] = np.where( ( ~pd.isnull(self.standardised_asset_list["solar_reason"]) & ~pd.isnull(self.standardised_asset_list["cavity_reason"]) ), None, self.standardised_asset_list["cavity_reason"], ) # Flag anything that has existing outcomes if (self.outcomes is not None) and ( "surveyed" in self.standardised_asset_list.columns ): if "installer refusal" not in self.standardised_asset_list.columns: self.standardised_asset_list["cavity_reason"] = np.where( ((self.standardised_asset_list["surveyed"] > 0)), None, self.standardised_asset_list["cavity_reason"], ) else: for col in ["cavity_reason", "solar_reason"]: self.standardised_asset_list[col] = np.where( ( (self.standardised_asset_list["surveyed"] > 0) | (self.standardised_asset_list["installer refusal"] > 0) ), None, self.standardised_asset_list[col], ) if self.master_surveyed is not None: for col in ["cavity_reason", "solar_reason"]: self.standardised_asset_list[col] = np.where( ((~pd.isnull(self.standardised_asset_list["submission_status"]))), None, self.standardised_asset_list[col], ) if ( self.ecosurv is not None and "ecosurv_install_status" in self.standardised_asset_list.columns ): # If we didn't match anything to ecosurv, the ecosurv_install_status won't exist for col in ["cavity_reason", "solar_reason"]: self.standardised_asset_list[col] = np.where( ( ( ~pd.isnull( self.standardised_asset_list["ecosurv_install_status"] ) ) ), None, self.standardised_asset_list[col], ) # We prepare outcomes for output if self.outcomes is not None: logger.info("Preparing outcomes for output") identified_work = self.standardised_asset_list[ ~pd.isnull(self.standardised_asset_list["cavity_reason"]) | ~pd.isnull(self.standardised_asset_list["solar_reason"]) ][self.DOMNA_PROPERTY_ID].values if self.DOMNA_PROPERTY_ID in self.outcomes.columns: self.outcomes_for_output = self.outcomes[ self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work) ] # Finally, direct operations feedback has suggested that if a property is a flat that has a SAP rating of # 76 or above, we should exclude it because it's likely not going to be eligible for anyting self.standardised_asset_list["cavity_reason"] = np.where( (self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "flat") & (self.standardised_asset_list["SAP Category"] == "SAP Rating 76 or more"), self.standardised_asset_list["cavity_reason"] + " - (unlikely to quality)", self.standardised_asset_list["cavity_reason"], ) # Split cavity_reason on the colon and check if the first part is equal to one of the two options above # that indicates empties self.standardised_asset_list["identified_empty_cavity"] = ( self.standardised_asset_list["cavity_reason"] .str.split(":") .str[0] .isin( [ self.EMPTY_CAVITY_NON_INTRUSIVE, self.EMPTY_CAVITY_NON_INTRUSIVE_YEAR, self.EPC_EMPTY, ] ) ) def get_work_figures(self): blocks_of_flats = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats" ] non_blocks_of_flats = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats" ] # Produce some aggregate figures self.work_type_figures = { **non_blocks_of_flats["cavity_reason"].value_counts().to_dict(), **{ k + " (Block of flats)": v for k, v in blocks_of_flats["solar_reason"] .value_counts() .to_dict() .items() }, **self.standardised_asset_list["solar_reason"].value_counts().to_dict(), } pprint(self.work_type_figures) def fill_landlord_block_reference(self, has_blocks_of_flats): if not has_blocks_of_flats: return # If we have blocks of flats, we fill the landlord_block_reference field with address 1 + postcode self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE] = np.where( ( self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats" ) & (pd.isnull(self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE])), self.standardised_asset_list[self.STANDARD_ADDRESS_1] + " " + self.standardised_asset_list[self.STANDARD_POSTCODE], self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE], ) def split_blocks(self): """ Where we have a single row that is a block of flats, we split this into multiple rows, one for each unit. The data that we have will be copied across rows :return: """ blocks = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats" ].copy() if blocks.empty: return RANGE_RE = re.compile(r"\b(\d+[A-Za-z]?)\s*[-–]\s*(\d+[A-Za-z]?)\b") NUM_RE = re.compile(r"\b\d+[A-Za-z]?\b") # captures 12, 12A, etc. TO_RANGE_RE = re.compile( r"\b(\d+[A-Za-z]?)\s+(?:to|To|TO)\s+(\d+[A-Za-z]?)\b" ) # captures "13 to 15" LETTER_RANGE_RE = re.compile( r"\b(\d+)([A-Za-z]?)\s*[-–]\s*(\d+)([A-Za-z]?)\b" ) # captures "1A-3B" expanded_rows = [] for _, row in blocks.iterrows(): addr = str(row[self.STANDARD_ADDRESS_1]) full_addr = row[self.STANDARD_FULL_ADDRESS] # We also look for terms like "Odd", "even", "all" in the address to indicate if it should be just # the odds, evens or all of the numbers has_odd = ( "(odd)" in addr.lower() or "(odd)" in full_addr.lower() or "(odds)" in addr.lower() or "(odds)" in full_addr.lower() ) has_even = ( "(even)" in addr.lower() or "(even)" in full_addr.lower() or "(evens)" in addr.lower() or "(evens)" in full_addr.lower() ) # 1 ─ Range (e.g. 1-7) m_range = RANGE_RE.search(addr) to_range = TO_RANGE_RE.search(addr) if m_range or to_range: start, end = m_range.groups() if m_range else to_range.groups() start, end = int(re.match(r"\d+", start)[0]), int( re.match(r"\d+", end)[0] ) if start > end or (end - start) > 200: raise ValueError(f"Suspicious range '{addr}'") # We define the looping range on whether we have odd, even or all numbers house_number_range = range(start, end + 1) if has_odd: house_number_range = [x for x in house_number_range if x % 2 != 0] if has_even: house_number_range = [x for x in house_number_range if x % 2 == 0] for n in house_number_range: new = row.copy() range_text = m_range.group(0) if m_range else to_range.group(0) new_addr = addr.replace(range_text, str(n)) # Build the new full address by also swapping out the range_text original_full_address = new[self.STANDARD_FULL_ADDRESS] new_full_address = original_full_address.replace(range_text, str(n)) new[self.STANDARD_ADDRESS_1] = str(n) new[self.STANDARD_FULL_ADDRESS] = new_full_address new[self.STANDARD_PROPERTY_TYPE] = "flat" # Keep a record of the previous address 1 new["block_address1"] = addr new["block_full_address"] = original_full_address new["is_expended_block"] = True # We update the full address new[self.DOMNA_PROPERTY_ID] = ( f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}" ) expanded_rows.append(new.to_dict()) continue # 2 ─ Explicit list (e.g. 1, 2, 5 Block) or split by an ampersand (e.g. 1 & 2 Block) nums = NUM_RE.findall(addr) if len(nums) > 1 and ( "," in addr or "&" in addr or " and " in addr.lower() ): for n in nums: new = row.copy() new_addr = re.sub( NUM_RE, n, addr, count=1 ) # replace the first number only new[self.STANDARD_ADDRESS_1] = new_addr new[self.DOMNA_PROPERTY_ID] = ( f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}" ) expanded_rows.append(new.to_dict()) continue # Check for a range of lettered addresses e.g 31A - 31D letter_range = LETTER_RANGE_RE.search(full_addr) if letter_range: start_num, start_letter, end_num, end_letter = letter_range.groups() start_num, end_num = int(start_num), int(end_num) if start_num != end_num: raise NotImplementedError(f"Unusual range - handle me") # We define the looping range on whether we have odd, even or all numbers house_number_range = range(start_num, end_num + 1) if has_odd: house_number_range = [x for x in house_number_range if x % 2 != 0] if has_even: house_number_range = [x for x in house_number_range if x % 2 == 0] for n in house_number_range: for letter in range(ord(start_letter), ord(end_letter) + 1): new = row.copy() new_addr = f"{n}{chr(letter)}" new[self.STANDARD_ADDRESS_1] = new_addr new[self.DOMNA_PROPERTY_ID] = ( f"{row[self.DOMNA_PROPERTY_ID]}-{new_addr}" ) expanded_rows.append(new.to_dict()) continue # 4 ─ Single number or no number, treat as individual dwelling if (len(nums) == 1) or not nums: expanded_rows.append(row.to_dict()) continue # Anything else with digits is unrecognised raise NotImplementedError(f"Unhandled block format: '{addr}'") expanded_blocks = pd.DataFrame(expanded_rows) # Check for duplicated domna ids if expanded_blocks[self.DOMNA_PROPERTY_ID].duplicated().sum(): raise ValueError("expanded blocks has duplicated IDs") # We drop the blocks from the standardised asset list and append on the expanded blocks self.standardised_asset_list = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats" ] self.standardised_asset_list = pd.concat( [self.standardised_asset_list, expanded_blocks], ignore_index=True ) # As a final clean up, for any blocks that are size 1, we don't includr a project code sizes = ( expanded_blocks.groupby(self.STANDARD_BLOCK_REFERENCE)[ self.DOMNA_PROPERTY_ID ] .nunique() .reset_index() ) size_1 = sizes[sizes[self.DOMNA_PROPERTY_ID] <= 1] # Remove the size 1 blocks from the standardised asset list self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE] = np.where( self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin( size_1[self.STANDARD_BLOCK_REFERENCE].values ), None, self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE], ) def label_property_status(self): """ This function is designed to be run after identify_worktypes() has been run, and will create a "property_status" column, which will note where each property is (to be surveyed, surveyed, installed), using the stages we recognise within hubspot :return: """ # For anything that is ready to go, that gets set to ready to be scheduled self.standardised_asset_list["hubspot_status"] = np.where( ~pd.isnull(self.standardised_asset_list["cavity_reason"]) | ~pd.isnull(self.standardised_asset_list["solar_reason"]), hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label, None, ) # we step through the process of flagging completed surveys # We utilise submissions, ecosurv and outcomes to define the hubspot status # We'll take the maximum of these three columns, based on the enum integer value label_to_enum = {e.label: e for e in hubspot_config.HubspotProcessStatus} def get_max_status_from_columns(row): status_candidates = [] for col in [ "submission_status", "ecosurv_install_status", "outcome_status", ]: label = row.get(col) if label in label_to_enum: status_candidates.append(label_to_enum[label]) if not status_candidates: return row[ "hubspot_status" ] # fallback to existing status if no updates return max(status_candidates).label self.standardised_asset_list["hubspot_status"] = ( self.standardised_asset_list.apply(get_max_status_from_columns, axis=1) ) self.standardised_asset_list["project_code"] = None # if we have any blocks, where work is eligible, we flag them now # These blocks may be refecence via the landlord_block_reference field, or by property types being # blocks of flats has_landlord_block_reference = sum( ~pd.isnull(self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE]) ) if has_landlord_block_reference: # For blocks that have a 50% allocation, we create project codes self.block_analysis() # find any block refs with more than 50% emptires viable_empty_blocks = self.block_analysis_df[ self.block_analysis_df["Percentage of Empties"] >= 0.50 ] if not viable_empty_blocks.empty: project_code_lookup = viable_empty_blocks[["Block Reference"]].copy() self.standardised_asset_list = self.standardised_asset_list.merge( project_code_lookup, how="left", left_on=self.STANDARD_BLOCK_REFERENCE, right_on="Block Reference", ) self.standardised_asset_list["project_code"] = np.where( ~pd.isnull(self.standardised_asset_list["Block Reference"]), self.standardised_asset_list["Block Reference"], self.standardised_asset_list["project_code"], ) self.standardised_asset_list = self.standardised_asset_list.drop( columns=["Block Reference"] ) def analyse_geographies(self): cavity_programme = ( self.standardised_asset_list[["domna_postcode", "cavity_reason"]] .groupby(["domna_postcode"])["cavity_reason"] .count() .reset_index() ) solar_programme = ( self.standardised_asset_list[["domna_postcode", "solar_reason"]] .groupby(["domna_postcode"])["solar_reason"] .count() .reset_index() ) postcodes = ( self.standardised_asset_list[["domna_postcode", "landlord_property_id"]] .groupby("domna_postcode")["landlord_property_id"] .count() .reset_index() .rename(columns={"landlord_property_id": "n_properties"}) ) geographical_areas = ( postcodes.merge(cavity_programme, how="left", on="domna_postcode") .merge(solar_programme, how="left", on="domna_postcode") .fillna(0) ) geographical_areas["coverage"] = ( (geographical_areas["solar_reason"] + geographical_areas["cavity_reason"]) / geographical_areas["n_properties"] * 100 ) geographical_areas = geographical_areas.sort_values("coverage", ascending=False) self.geographical_areas = geographical_areas def block_analysis(self): # Reverse mapping: label -> enum LABEL_TO_ENUM = {e.label: e for e in hubspot_config.HubspotProcessStatus} # Threshold status - anything that is at this stage or beyond is considered surveyed threshold = ( hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.value ) block_analysis = [] for block_reference, group in self.standardised_asset_list.groupby( self.STANDARD_BLOCK_REFERENCE ): cavity_breakdown = ( group["cavity_reason"] .fillna("No Eligibility") .value_counts(normalize=True) * 100 ) if all(cavity_breakdown.index == "No Eligibility"): continue # We check the % of empty vs not empty as right now, we're focused on empty n_empties = ( (group["identified_empty_cavity"] == True) & (~pd.isnull(group["cavity_reason"])) & ( ~group["cavity_reason"].str.contains( "(unlikely to quality)", case=False, na=False, regex=False ) ) ).sum() n_empties_high_confidence = ( (group["identified_empty_cavity"] == True) & ( ~group["SAP Category"].isin( ["SAP Rating 69-75", "SAP Rating 76 or more"] ) ) & (~pd.isnull(group["cavity_reason"])) & ( ~group["cavity_reason"].str.contains( "(unlikely to quality)", case=False, na=False, regex=False ) ) ).sum() # Average age of the EPCs group["time_since_epc"] = ( pd.to_datetime("now") - pd.to_datetime(group[self.EPC_API_DATA_NAMES["inspection-date"]]) ).dt.days average_age_of_epc = group["time_since_epc"].mean() works = group["hubspot_status"] above_threshold = works.map(LABEL_TO_ENUM.get).dropna() count_above = (above_threshold >= threshold).sum() proportion_surveyed = count_above / len(works) proportion_empty = n_empties / len(works) proportion_empty_high_confidence = n_empties_high_confidence / len(works) # We auto-populate any blocks that have greater than 50% proportion empty block_analysis.append( { "Block Reference": block_reference, "Block Size": len(group), "average_age_of_epc": average_age_of_epc, "Proportion of properties suryeyed": proportion_surveyed, "Percentage of Empties": proportion_empty, "Percentage of Empties (high confidence)": proportion_empty_high_confidence, **cavity_breakdown.to_dict(), } ) block_analysis = pd.DataFrame(block_analysis) block_analysis = block_analysis.fillna(0) # We flag which properties are eligible for works. We need at least 50% block_analysis["Eligible for Works"] = ( block_analysis["Percentage of Empties"] >= 0.50 ) block_analysis = block_analysis.sort_values( "Percentage of Empties", ascending=False ) # For properties that are NOT eligible, we should update the cavity reason ineligible_blocks = block_analysis[~block_analysis["Eligible for Works"]][ "Block Reference" ].values eligible_blocks = block_analysis[block_analysis["Eligible for Works"]][ "Block Reference" ].values self.standardised_asset_list["cavity_reason"] = np.where( self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin( ineligible_blocks ), self.standardised_asset_list["cavity_reason"] + " (Flat in block with less than 50% eligible)", self.standardised_asset_list["cavity_reason"], ) # if the property is in a block of flats that eligible, but the property itself is not eligible, we flag this # The criteria is: # =The property should be in a block of flats self.standardised_asset_list["cavity_reason"] = np.where( self.standardised_asset_list[self.STANDARD_BLOCK_REFERENCE].isin( eligible_blocks ), self.standardised_asset_list["cavity_reason"] + " " + "(Flat in block with more than 50% eligible)", self.standardised_asset_list["cavity_reason"], ) self.block_analysis_df = block_analysis @staticmethod def split_full_name(x): if pd.isnull(x): return None, None, None x = x.lower() titles = ["mr", "mrs", "ms", "miss", "dr", "prof"] # Remove titles detected_title = [title for title in titles if x.startswith(title)] if detected_title: for title in detected_title: x = x.replace(title, "") x = x.strip() first_name, last_name = x.split(" ")[0], x.split(" ")[-1] title = detected_title[0].title() if detected_title else None return title, first_name.title(), last_name.title() def load_contact_details( self, local_filepath, sheet_name, landlord_property_id, phone_number_column=None, secondary_phone_number_column=None, secondary_contact_full_name=None, email_column=None, fullname_column=None, firstname_column=None, lastname_column=None, ): self.contact_detail_fields = { "landlord_property_id": landlord_property_id, "phone_number": phone_number_column, "secondary_phone_number": secondary_phone_number_column, "secondary_contact_full_name": secondary_contact_full_name, "email": email_column, "fullname": fullname_column, "firstname": firstname_column, "lastname": lastname_column, } details_colnames = [ phone_number_column, secondary_phone_number_column, email_column, fullname_column, firstname_column, lastname_column, ] # We'll fill them none_details = [x for x in details_colnames if x is None] details_colnames = [x for x in details_colnames if x is not None] if local_filepath is None: # Create an empty DataFrame based on the fields in self.contact_detail_fields self.contact_details = pd.DataFrame( columns=list(self.contact_detail_fields.keys()) ) return contact_details = pd.read_excel(local_filepath, sheet_name=sheet_name)[ [self.contact_detail_fields["landlord_property_id"]] + details_colnames ] contact_details = contact_details[ ~pd.isnull( contact_details[self.contact_detail_fields["landlord_property_id"]] ) ] # Fill anything we don't have for detail in none_details: contact_details[detail] = None if fullname_column and not (firstname_column and lastname_column): ( contact_details["title"], contact_details["first_name"], contact_details["last_name"], ) = zip(*contact_details[fullname_column].apply(self.split_full_name)) else: contact_details["title"] = None self.contact_details = contact_details @classmethod def load_standardised_asset_list(cls, filepath, sheet_name, header): """ This function is designed to load the standardised asset list from a file :return: """ # This is a placeholder for now # instantiate the class instance = cls( local_filepath=filepath, sheet_name=sheet_name, address1_colname=cls.STANDARD_ADDRESS_1, postcode_colname=cls.STANDARD_POSTCODE, full_address_colname=cls.STANDARD_FULL_ADDRESS, landlord_property_id=cls.STANDARD_LANDLORD_PROPERTY_ID, full_address_cols_to_concat=[], missing_postcodes_method=None, address1_extraction_method=None, landlord_year_built=cls.STANDARD_YEAR_BUILT, landlord_uprn=cls.STANDARD_UPRN, landlord_property_type=cls.STANDARD_PROPERTY_TYPE, landlord_built_form=cls.STANDARD_BUILT_FORM, landlord_wall_construction=cls.STANDARD_WALL_CONSTRUCTION, landlord_roof_construction=cls.STANDARD_ROOF_CONSTRUCTION, landlord_heating_system=cls.STANDARD_HEATING_SYSTEM, landlord_existing_pv=cls.STANDARD_EXISTING_PV, landlord_sap=cls.STANDARD_SAP, landlord_block_reference=cls.STANDARD_BLOCK_REFERENCE, phase=False, header=header, ) return instance def prepare_for_crm( self, company_domain, installer_name, reconcile_programme=False ): """ This function prepares the data for upload into Hubspot :param company_domain: The company domain name to be used in the CRM :param installer_name: The name of the installer to be used in the CRM :param reconcile_programme: If True, will include all properties with a project code, regardless of status :raises ValueError: If the installer name is not valid or if there are missing products :return: """ # This maps the opportunities as we reference them, to the product data as stored in Hubspot if not hubspot_config.Installer.is_valid_value(installer_name): raise ValueError( f"Installer name {installer_name} is not valid. Please check the installer name." ) # We check if all products are covered in the lookup table cavity_products = ( self.standardised_asset_list["cavity_reason"].unique().tolist() ) cavity_products = [x for x in cavity_products if not pd.isnull(x)] solar_products = self.standardised_asset_list["solar_reason"].unique().tolist() solar_products = [x for x in solar_products if not pd.isnull(x)] product_map = {} for identified_product in cavity_products + solar_products: if pd.isnull(identified_product): continue matched_product = None for product_prefix, crm_product in self.prefixes_to_products.items(): if identified_product.startswith(product_prefix): matched_product = crm_product product_map[identified_product] = matched_product # For each cavity and solar product, we iterate through the prexies and map to the products programme_data = self.standardised_asset_list.copy() programme_data["domna_full_address"] = ( programme_data["domna_full_address"] .str.replace(";", ", ", regex=False) .str.replace(" ", "") ) # Format the two date columns programme_data["survey_date"] = pd.to_datetime( programme_data["survey_date"], errors="coerce" ) programme_data[self.EPC_API_DATA_NAMES["inspection-date"]] = pd.to_datetime( programme_data[self.EPC_API_DATA_NAMES["inspection-date"]], errors="coerce" ) # Convert to dd/mm/yyyy format programme_data["survey_date"] = programme_data["survey_date"].dt.strftime( "%d/%m/%Y" ) programme_data[self.EPC_API_DATA_NAMES["inspection-date"]] = programme_data[ self.EPC_API_DATA_NAMES["inspection-date"] ].dt.strftime("%d/%m/%Y") # We take rows that have a survyor and a date for the survey # We include properties under 2 circumstances: # 1) The hubspot status is ready to be scheduled and there is an assigned surveyor and week for survey # 2) The hubspot status is something else, meaning this has been included in an existing programme # 3) reconcile programme is true, and therefore all proeprties with a project code will be included if reconcile_programme: programme_data = programme_data[~pd.isnull(programme_data["project_code"])] else: if programme_data["hubspot_status"].nunique() > 1: logger.info( "Multiple hubspot_status found - are you sure you don't want to reconcile the programme?" ) ready_to_be_scheduled = ( programme_data["hubspot_status"] == hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label ) # completed_works = ( # (programme_data["hubspot_status"] != # hubspot_config.HubspotProcessStatus.READY_TO_BE_SCHEDULED.label) & # (~pd.isnull(programme_data["hubspot_status"])) # ) programme_data = programme_data[ready_to_be_scheduled] # Merge on the contact details programme_data = programme_data.merge( self.contact_details, how="left", left_on=self.STANDARD_LANDLORD_PROPERTY_ID, right_on=self.landlord_property_id, ) programme_data["Company Domain Name "] = company_domain # Append the product data onto the programme data programme_data["cavity_product"] = programme_data["cavity_reason"].map( lambda x: product_map.get(x, {"name": None})["name"] ) programme_data["solar_product"] = programme_data["solar_reason"].map( lambda x: product_map.get(x, {"name": None})["name"] ) # We check if we have any missings cavity_missing = pd.isnull( programme_data[~pd.isnull(programme_data["cavity_reason"])][ "cavity_product" ] ).sum() solar_missing = pd.isnull( programme_data[~pd.isnull(programme_data["solar_reason"])]["solar_product"] ).sum() if cavity_missing > 0 or solar_missing > 0: raise ValueError( f"We have {cavity_missing} cavity products and {solar_missing} solar products that are not " "mapped to a product in the lookup table. Please check the mapping." ) programme_data["domna_product"] = programme_data["solar_product"].copy() programme_data["domna_product"] = np.where( pd.isnull(programme_data["domna_product"]), programme_data["cavity_product"], programme_data["domna_product"], ) # We filter just on rows where we have a product if reconcile_programme: # We include historical works, which will include hisorical cavity so we set these as extraction (as # this is the main work mix) programme_data["domna_product"] = programme_data["domna_product"].fillna( self.CRM_HISTORICAL_CAVITY_PRODUCT["name"] ) else: # We shouldn't have any missing products # programme_data = programme_data[ # ~pd.isnull(programme_data["survey_date"]) # ] if pd.isnull(programme_data["domna_product"]).sum(): raise ValueError("Missing products") programme_data = programme_data.drop( columns=["solar_product", "cavity_product"] ) product_df = ( pd.DataFrame(self.CRM_PRODUCTS) .T[["name", "id", "unit_price"]] .reset_index() .rename( columns={ "name": "Name ", "id": "Product ID ", "unit_price": "Unit price ", "index": "domna_product", } ) ) product_df["Quantity "] = 1 # Append on the product data programme_data = programme_data.merge( product_df, how="left", on="domna_product" ) # Add in deal and pipeline information programme_data["dealname"] = ( programme_data[self.STANDARD_FULL_ADDRESS] + ", " + programme_data[self.STANDARD_POSTCODE] + " : " + programme_data["domna_product"] ) programme_data["Pipeline "] = hubspot_config.CRM_PIPELINE_NAME programme_data["Associations: Listing"] = "Property Owner" # We determine which column we should use for the UPRN if self.STANDARD_UPRN not in programme_data.columns: uprn_column = self.EPC_API_DATA_NAMES["uprn"] # If we're working form the EPC, we don't have this information if the EPC is estimated programme_data[uprn_column] = np.where( programme_data["estimated"] == True, None, programme_data[uprn_column] ) else: # Use the value that has the most coverage uprn_column = "hubspot_uprn" programme_data[uprn_column] = programme_data[self.STANDARD_UPRN].fillna( programme_data[self.EPC_API_DATA_NAMES["uprn"]] ) # Remove any negative URPSN which are not valid programme_data[uprn_column] = np.where( programme_data["estimated"].isin([1, True]), None, programme_data[uprn_column], ) # Add in some columns if we have them date_of_inspections = ( "Non-Intrusives: Date of Inspection" if "Non-Intrusives: Date of Inspection" in programme_data.columns else None ) # Ammend the property type and built form columns programme_data["hubspot_property_type"] = programme_data[ self.STANDARD_PROPERTY_TYPE ].copy() # We don't already have this if self.STANDARD_BUILT_FORM in programme_data.columns: programme_data["hubspot_built_form"] = programme_data[ self.STANDARD_BUILT_FORM ].copy() else: programme_data["hubspot_built_form"] = None def _replace_property_description_data(programme_data, column_name): """ Helper function to replace property type or built form data with a specified value. """ if column_name == "hubspot_property_type": valid_values = ["house", "bungalow", "flat", "maisonette"] epc_fill_col = "property-type" elif column_name == "hubspot_built_form": valid_values = [ "detached", "semi-detached", "mid-terrace", "end-terrace", ] epc_fill_col = "built-form" else: raise ValueError( f"Invalid column name: {column_name}. Must be 'hubspot_property_type' or " f"'hubspot_built_form'." ) # Any vakue that is not house, bungalow, flat or maisonette is set to None programme_data[column_name] = np.where( ~programme_data[column_name].isin(valid_values), None, programme_data[column_name], ) # We fill with the EPC property type programme_data[column_name] = np.where( pd.isnull(programme_data[column_name]), programme_data[self.EPC_API_DATA_NAMES[epc_fill_col]], programme_data[column_name], ) programme_data[column_name] = programme_data[column_name].fillna("unknown") return programme_data # Clean up the property type and built form columns programme_data = _replace_property_description_data( programme_data, "hubspot_property_type" ) programme_data = _replace_property_description_data( programme_data, "hubspot_built_form" ) # We accomodate the old vs new inspections format if "non-intrusives: WFT Findings" in programme_data.columns: # We have the old format - we only have notes non_intrusives_surveyor_notes = "non-intrusives: WFT Findings" non_intrusives_construction = None non_intrusives_insulated = None non_intrusives_insulation_material = None non_intrusives_ciga_check_required = None non_intrusives_pv_access = None non_intrusives_roof_orientation = None non_intrusives_surveyor_name = None else: non_intrusives_surveyor_notes = "non-intrusives: Any further surveyor notes" non_intrusives_construction = "non-intrusives: Construction" non_intrusives_insulated = "non-intrusives: Insulated" non_intrusives_insulation_material = "non-intrusives: Material" non_intrusives_ciga_check_required = "non-intrusives: CIGA Check Required" non_intrusives_pv_access = "non-intrusives: PV, ACCESS ISSUE, SEE NOTES" non_intrusives_roof_orientation = ( "non-intrusives: OFF GAS - ROOF ORIENTATION" ) non_intrusives_surveyor_name = "non-intrusives: Surveyors Name" # This maps the hubspot schema to the template. Anything that is not covered in this will be flagged schema_mappings = { "Company Domain Name ": "Company Domain Name ", "Email ": ( self.contact_detail_fields["email"] if self.contact_detail_fields["email"] else None ), # TODO: Review "First Name ": ( self.contact_detail_fields["firstname"] if self.contact_detail_fields["firstname"] else None ), # TODO: Review "Last Name ": ( self.contact_detail_fields["lastname"] if self.contact_detail_fields["lastname"] else None ), # TODO: Review "Phone ": ( self.contact_detail_fields["phone_number"] if self.contact_detail_fields["phone_number"] else None ), # TODO: Review "Secondary Phone ": ( self.contact_detail_fields["secondary_phone_number"] if self.contact_detail_fields["secondary_phone_number"] else None ), "Secondary Contact Full Name ": ( self.contact_detail_fields["secondary_contact_full_name"] if self.contact_detail_fields["secondary_contact_full_name"] else None ), "Full Address ": self.STANDARD_FULL_ADDRESS, "Address 1 ": self.STANDARD_ADDRESS_1, "Address 2 ": None, # TODO: Don't have this for the moment "Postcode ": self.STANDARD_POSTCODE, "Property Type ": "hubspot_property_type", "Property Sub Type ": "hubspot_built_form", "Bedroom(s) ": None, # TODO: Don't have this for the moment "Domna Property ID ": self.DOMNA_PROPERTY_ID, # We populate this with the column that we have "National UPRN ": uprn_column, "Owner Property ID ": self.STANDARD_LANDLORD_PROPERTY_ID, "Wall Construction ": self.STANDARD_WALL_CONSTRUCTION, "Heating System ": self.STANDARD_HEATING_SYSTEM, "Year Built ": self.STANDARD_YEAR_BUILT, "Boiler Make ": None, # TODO: Don't have this for the moment "Boiler Model ": None, # TODO: Don't have this for the moment "Non-Intrusives: Date Checked ": date_of_inspections, "Non-Intrusives: Wall Type ": non_intrusives_construction, "Non-intrusives: Insulation ": non_intrusives_insulated, "Non-intrusives: Insulation Material ": non_intrusives_insulation_material, "Non-Intrusives: CIGA Check Required ": non_intrusives_ciga_check_required, "Non-Intrusives: PV Access Issues ": non_intrusives_pv_access, "Non-Intrusives: Roof Orientation ": non_intrusives_roof_orientation, "Non-Intrusives: Surveyor Notes ": non_intrusives_surveyor_notes, "Non-Intrusives: Surveyor Name ": non_intrusives_surveyor_name, "CIGA: Date Requested ": None, # TODO: Don't have this for the moment "CIGA: Cavity Guarantee Found ": None, "Last EPC: Is Estimated ": self.EPC_API_DATA_NAMES[ "estimated" ], "Last EPC: EPC Rating ": self.EPC_API_DATA_NAMES[ "current-energy-rating" ], "Last EPC: SAP Rating ": self.EPC_API_DATA_NAMES[ "current-energy-efficiency" ], "Last EPC: Main Heating Description ": self.EPC_API_DATA_NAMES[ "mainheat-description" ], "Last EPC: Heating Controls ": self.EPC_API_DATA_NAMES[ "mainheatcont-description" ], "Last EPC: Lodgement Date ": self.EPC_API_DATA_NAMES[ "inspection-date" ], "Last EPC: Floor Area ": self.EPC_API_DATA_NAMES[ "total-floor-area" ], "Last EPC: Wall ": self.EPC_API_DATA_NAMES[ "walls-description" ], "Last EPC: Roof ": self.EPC_API_DATA_NAMES[ "roof-description" ], "Last EPC: Floor ": self.EPC_API_DATA_NAMES[ "floor-description" ], "Last EPC: Room Height ": self.EPC_API_DATA_NAMES[ "floor-height" ], "Last EPC: Age Band ": self.EPC_API_DATA_NAMES[ "construction-age-band" ], "Pipeline ": "Pipeline ", "Expected Commencement Date ": "survey_date", "Deal Name ": "dealname", # Need to create this, "Product ID ": "Product ID ", "Name ": "Name ", "Unit price ": "Unit price ", "Quantity ": "Quantity ", "Deal Owner": "surveyor", "Project Code ": "project_code", "Associations: Listing": "Associations: Listing", "Deal Stage ": "hubspot_status", } # We sometimes columns if the landlord never provided them missed_mapping_cols = [ c for c in schema_mappings.values() if c not in programme_data.columns if c is not None ] for c in missed_mapping_cols: programme_data[c] = None # We now create the finalised dataset to be uploaded into Hubspot variables_required = list(schema_mappings.values()) variables_required = [v for v in variables_required if v is not None] # We now flag anything that has a none value, which is information we haven't got right now none_variables = [k for k, v in schema_mappings.items() if v is None] # We'll add placeholder columns for the None variables programme_data = programme_data[variables_required] for col in none_variables: programme_data[col] = None programme_data = programme_data.rename( columns={v: k for k, v in schema_mappings.items() if v is not None} ) programme_data["Postcode "] = programme_data[ "Postcode " ].copy() programme_data["Installer "] = installer_name programme_data["Name "] = ( programme_data["Full Address "] + " ," + programme_data["Postcode "] ) # The listing owner email is the same as the surveyor email (deal owner), so they can see the listing programme_data["Listing Owner Email "] = ( programme_data["Deal Owner"] ) programme_data["Amount "] = 0 programme_data["Deal Owner"] = np.where( ~pd.isnull(programme_data["Deal Owner"]), programme_data["Deal Owner"].astype(str).str.lower(), programme_data["Deal Owner"], ) # We make sure we have all of the columns that we need missed_columns = [ c for c in hubspot_config.CRM_UPLOAD_COLUMNS if c not in programme_data.columns ] if missed_columns: raise ValueError( f"We have the following columns that are not in the programme data: {missed_columns}. " "Please check the mapping and ensure all required columns are present." ) self.hubspot_data = programme_data def flag_ecosurv(self, ecosurv_landlords=None, landlords_to_ignore=None): """ This class will match ecosurv data to the asset list :return: """ if ecosurv_landlords is None: return # TODO: Fetch from Sharepoint ecosurv_filepath = ( "/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/07.05.2025.csv" ) logger.info("Getting Ecosurv data from %s", ecosurv_filepath) self.ecosurv = pd.read_csv(ecosurv_filepath, encoding="cp437") landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False) landlord_references = landlords[ landlords["Landlord"].str.lower().str.contains(ecosurv_landlords) ] landlord_ecosurv_data = self.ecosurv[ self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values) ] if landlords_to_ignore is not None: landlord_ecosurv_data = landlord_ecosurv_data[ ~landlord_ecosurv_data["Landlord"].isin(landlords_to_ignore) ] # Try and match to asset list matched = [] unmatched = [] for _, row in tqdm( landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0] ): postcode = row["Postcode"].lower() df = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_POSTCODE] .str.replace(" ", "") .str.lower() == postcode ) ].copy() if df.empty: unmatched.append(row["Reference"]) continue if df.shape[0] > 1: house_no = SearchEpc.get_house_number( row["Address Line 1"], row["Postcode"] ) df["house_no"] = df.apply( lambda x: SearchEpc.get_house_number( str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE] ), axis=1, ) df = df[df["house_no"] == house_no] if df.shape[0] > 1: # We compare address line 1 to full address if any( df[self.STANDARD_FULL_ADDRESS] .str.lower() .str.contains(row["Address Line 1"].lower(), na=False) ): df = df[ df[self.STANDARD_FULL_ADDRESS] .str.lower() .str.contains(row["Address Line 1"].lower(), na=False) ] if df.shape[0] > 1: df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"] if df.shape[0] == 1: matched.append( { self.STANDARD_LANDLORD_PROPERTY_ID: df[ self.STANDARD_LANDLORD_PROPERTY_ID ].values[0], "ecosurv_reference": row["Reference"], "ecosurv_address1": row["Address Line 1"], "ecosurv_postcode": row["Postcode"], } ) continue if df.shape[0] > 1: unmatched.append(row["Reference"]) continue logger.info("Matched %s properties to ecosurv data", len(matched)) logger.info("%s properties in Ecosurv remain unmatched", len(unmatched)) if not matched: return # We now match matched = pd.DataFrame(matched) # We'll possibly have duplicates here, where properties have been sold twice. Ww de-dupe if matched[self.STANDARD_LANDLORD_PROPERTY_ID].duplicated().sum(): # It doesn't matter too much which record we take matched = matched.drop_duplicates( subset=[self.STANDARD_LANDLORD_PROPERTY_ID] ) # We merge on the status of the property matched = matched.merge( self.ecosurv[["Reference", "Status", "Lead Status", "Tags"]].rename( columns={ "Reference": "ecosurv_reference", "Status": "ecosurv_status", "Lead Status": "ecosurv_lead_status", "Tags": "ecosurv_tags", "Installer": "ecosurv_installer", } ), how="left", on="ecosurv_reference", ) matched["ecosurv_install_status"] = ( hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER ) # This mapping is ordered by process order, where lodgment is the final step so if we have an indication # that the property is ready for lodgement, we set the status to that. We then proceed through the other # statuses where the penultimate status is install complete mapping = { "Cancelled": hubspot_config.HubspotProcessStatus.INSTALLER_CANCELLED_FINALIZED, "TrustMark: Lodged": hubspot_config.HubspotProcessStatus.LODGEMENT_COMPLETE, "Retrofit: Complete": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE, "Retrofit: Awaiting TrustMark": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE, "Retrofit: Awaiting post checks": hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE, "Installer Notification Sent": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "Submitted to RC": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "COONEY": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "Signed off for install": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "Retrofit: Signed off for install": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "Audit": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "Accepted": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, "Sold": hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER, } def get_max_status(tag_str): if pd.isna(tag_str): return None matched_statuses = [] for tag, status in mapping.items(): if tag in tag_str: matched_statuses.append(status) if not matched_statuses: return None return max(matched_statuses).label matched["ecosurv_install_status"] = matched["ecosurv_tags"].apply( get_max_status ) self.standardised_asset_list = self.standardised_asset_list.merge( matched, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID, ) # We keep a record of submissions that were NOT matches self.ecosurv_no_match = self.ecosurv[ self.ecosurv["Reference"].isin(unmatched) ].copy() def flag_outcomes( self, outcomes_filepaths, outcomes_sheetname, outcomes_address, outcomes_postcode, outcomes_houseno, outcomes_id, ): if not outcomes_filepaths: return self.outcomes = [] outcomes_no_match = [] lookup = [] for idx, outcomes_filepath in enumerate(outcomes_filepaths): outcomes = pd.read_excel( outcomes_filepath, sheet_name=outcomes_sheetname[idx] ) outcomes["row_id"] = outcomes.index if outcomes_houseno[idx] is None: outcomes_houseno[idx] = "houseno" outcomes["houseno"] = outcomes[outcomes_address[idx]].apply( lambda x: SearchEpc.get_house_number(x, outcomes[outcomes_postcode]) ) # We handle an edge case that occured for LHP if ( "Notes / Outcomes" in outcomes.columns and "Outcome" not in outcomes.columns ): # We use the re-mapper to handle this: outcomes["Notes / Outcomes"] = outcomes["Notes / Outcomes"].str.strip() values_to_remap = outcomes["Notes / Outcomes"].unique() # We want to map this to our standardised list of property types we're interested in remapper = DataRemapper( standard_values=outcomes_mappings.outcomes_values, standard_map=outcomes_mappings.outcomes_map, ) remap_dictionary = remapper.standardize_list( values_to_remap=values_to_remap.tolist() ) # Perform the remap outcomes["Outcome"] = outcomes["Notes / Outcomes"].map(remap_dictionary) outcomes["Outcome"] = outcomes["Outcome"].str.lower().str.strip() logger.info("Matching outcomes to asset list") # Merge the outcomes onto the asset list - we check we're able to match sufficiently well lookup_i = [] nomatch_i = [] for _, x in tqdm(outcomes.iterrows(), total=len(outcomes)): if pd.isnull(x[outcomes_address[idx]]) or not x[outcomes_address[idx]]: continue # Check if we have an id oid = x[outcomes_id[idx]] if outcomes_id[idx] is not None else None if oid is not None: matched = self.standardised_asset_list[ ( self.standardised_asset_list[ self.STANDARD_LANDLORD_PROPERTY_ID ].str.strip() == oid ) ] if matched.shape[0] == 1: lookup_i.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[ self.DOMNA_PROPERTY_ID ].values[0], } ) continue address_clean = ( x[outcomes_address[idx]].lower().replace(",", "").replace(" ", " ") ) matched = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_FULL_ADDRESS] .str.lower() .str.replace(",", "") .str.replace(" ", " ") == address_clean ) ] if matched.shape[0] == 1: lookup_i.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[ self.DOMNA_PROPERTY_ID ].values[0], } ) continue matched = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip() == x[outcomes_postcode[idx]] ) ].copy() if not matched.empty: matched["houseno"] = matched.apply( lambda x: SearchEpc.get_house_number( str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE]), ), axis=1, ) if pd.isnull(x[outcomes_houseno[idx]]): house_no_to_match = SearchEpc.get_house_number( str(x[outcomes_address[idx]]), str(x[outcomes_postcode[idx]]), ) if isinstance(house_no_to_match, str): house_no_to_match = house_no_to_match.lower() else: house_no_to_match = str(x[outcomes_houseno[idx]]).strip() matched = matched[ matched["houseno"].astype(str) == house_no_to_match ] if matched.shape[0] == 1: lookup_i.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[ self.DOMNA_PROPERTY_ID ].values[0], } ) continue elif not matched.empty: # Use levenstein distance to match matched["address"] = ( matched[self.STANDARD_ADDRESS_1] + " " + matched[self.STANDARD_POSTCODE] ) best_match = process.extractOne( x[outcomes_address[idx]], matched[self.STANDARD_FULL_ADDRESS].values, )[0] matched = matched[ matched[self.STANDARD_FULL_ADDRESS] == best_match ] lookup_i.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[ self.DOMNA_PROPERTY_ID ].values[0], } ) continue nomatch_i.append(x["row_id"]) outcomes_no_match_i = outcomes[outcomes["row_id"].isin(nomatch_i)] lookup_i = pd.DataFrame(lookup_i) outcomes_no_match.append(outcomes_no_match_i) lookup.append(lookup_i) self.outcomes.append(outcomes) lookup = pd.concat(lookup) self.outcomes_no_match = pd.concat(outcomes_no_match) self.outcomes = pd.concat(self.outcomes) if lookup.empty: return # We will have duplicated domna property IDs, where a surveyor has been to a property multiple times # Where we have multiple rows, we want to make a call on what the action should be. For example, # there may be properties that have been visited multiple times where the outcome was "See notes" implying # that the surveyor had a detailed explanation as to why they couldn't gain access so if this has # happened multiple times, in this case we judge that the work may not be viable if "Week Commencing" in self.outcomes.columns: date_col = "Week Commencing" elif "Survey Date" in self.outcomes.columns: date_col = "Survey Date" elif "Date letters sent" in self.outcomes.columns: date_col = "Date letters sent" elif "Date Letter sent" in self.outcomes.columns: date_col = "Date Letter sent" elif "WEEK COMMENCING" in self.outcomes.columns: date_col = "WEEK COMMENCING" else: raise NotImplementedError("Invalid date in outcomes - implement me") if "Notes" in self.outcomes.columns: notes_col = "Notes" elif "Notes / Outcomes" in self.outcomes.columns: notes_col = "Notes / Outcomes" elif "NOTES" in self.outcomes.columns: notes_col = "NOTES" else: raise NotImplementedError("Invalid notes in outcomes - implement me") lookup = lookup.merge( self.outcomes[["row_id", "Outcome", notes_col, date_col]], how="left", on="row_id", ) visit_counts = ( lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"] .count() .reset_index() .rename(columns={"row_id": "visit_count"}) .sort_values("visit_count", ascending=False) ) def extract_date(s): if isinstance(s, str): match = re.search(r"(\d{2}\.\d{2}\.\d{4})", s) if match: return pd.to_datetime( match.group(1), format="%d.%m.%Y", errors="coerce" ) return pd.NaT lookup["parsed_date"] = lookup[date_col].apply(extract_date) def get_latest_note(group): surveyed = group[group["Outcome"] == "surveyed"] if not surveyed.empty: return surveyed.sort_values("parsed_date", ascending=False).iloc[0] else: return group.sort_values("parsed_date", ascending=False).iloc[0] latest_note = ( lookup.groupby("domna_property_id", group_keys=False) .apply(get_latest_note) .reset_index(drop=True) ) latest_note = latest_note[["domna_property_id", notes_col, "Outcome"]].rename( columns={"Notes": "latest_outcome_note", "Outcome": "latest_outcome"} ) pivot_df = ( lookup.groupby(["domna_property_id", "Outcome"]) .size() .unstack(fill_value=0) .reset_index() ) pivot_df = pivot_df.merge(visit_counts, how="left", on="domna_property_id") pivot_df = pivot_df.merge(latest_note, how="left", on="domna_property_id") # We want the latest note if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum(): raise Exception("We have duplicated property IDs in the outcomes data") # We merge this data onto outcomes self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin( lookup["row_id"].values ) self.outcomes = self.outcomes.merge( lookup[["row_id", "domna_property_id"]], how="left", on="row_id" ) # We flag the outcome status, based on the outcome pivot_df["outcome_status"] = None if "surveyed" in pivot_df.columns: pivot_df["outcome_status"] = np.where( pivot_df["surveyed"] > 0, hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.label, pivot_df["outcome_status"], ) if "installer refusal" in pivot_df.columns: pivot_df["outcome_status"] = np.where( pivot_df["installer refusal"] > 0, hubspot_config.HubspotProcessStatus.NOT_VIABLE.label, pivot_df["outcome_status"], ) pivot_df["outcome_status"] = np.where( pivot_df["latest_outcome"].isin(["see notes"]) & ( pivot_df["outcome_status"] != hubspot_config.HubspotProcessStatus.SURVEYED_COMPLETED_SIGNED_OFF.label ), hubspot_config.HubspotProcessStatus.SURVEYED_NO_ACCESS_NEEDS_SIGN_OFF.label, pivot_df["outcome_status"], ) # We merge out pivoted outcomes onto the asset list self.standardised_asset_list = self.standardised_asset_list.merge( pivot_df, how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id", ) if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum(): raise ValueError("Duplicates appreared - something went wrong") self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False) def flag_survey_master( self, master_filepaths, master_id_colnames, master_to_asset_list_filepath=None ): # TODO: This probably needs further expansion if not master_filepaths: return if master_to_asset_list_filepath is not None: id_map = pd.read_csv(master_to_asset_list_filepath) else: id_map = pd.DataFrame() logger.info("Getting masters and merging onto asset list") master_surveyed = [] unmatched_submissions = [] for idx, filepath in enumerate(master_filepaths): master_data = pd.read_csv(filepath) # Strip columns master_data.columns = [c.strip() for c in master_data.columns] master_data.columns = [re.sub(r"\s+", " ", c) for c in master_data.columns] # Drop any unnamed columns unnamed_columns = [c for c in master_data.columns if "Unnamed:" in c] master_data = master_data.drop(columns=unnamed_columns) if not id_map.empty: master_data = master_data.merge( id_map, how="left", on=["NO.", "Street / Block Name", "Post Code"] ) if "INSTALLED OR CANCELLED" in master_data.columns: install_col = "INSTALLED OR CANCELLED" elif "INSTALL / CANCELLATION DATE" in master_data.columns: install_col = "INSTALL / CANCELLATION DATE" elif "INSTALL/ CANCELLATION DATE" in master_data.columns: install_col = "INSTALL/ CANCELLATION DATE" elif "INSTALL/CANCELLATION DATE" in master_data.columns: install_col = "INSTALL/CANCELLATION DATE" elif "Measure 1 Install Date" in master_data.columns: install_col = "Measure 1 Install Date" else: raise ValueError("No install or cancellation date") if "SUBMISSION DATE" in master_data.columns: submission_col = "SUBMISSION DATE" elif "SUBMISSION DATE TO INSTALLERS" in master_data.columns: submission_col = "SUBMISSION DATE TO INSTALLERS" elif "Submission Date" in master_data.columns: submission_col = "Submission Date" else: raise ValueError("No submission date column found in master data") master_data["row_id"] = master_data.index self.standardised_asset_list["house_no"] = ( self.standardised_asset_list.apply( lambda x: SearchEpc.get_house_number( str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE]) ), axis=1, ) ) if ( "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns ): scheme_col = "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" elif "AFFORDABLE WARMTH" in master_data.columns: scheme_col = "AFFORDABLE WARMTH" elif "Scheme" in master_data.columns: scheme_col = "Scheme" elif "Affordable Warmth" in master_data.columns: scheme_col = "Affordable Warmth" else: scheme_col = "OFFICE USE ONLY" postcode_col = ( "POSTCODE" if "POSTCODE" in master_data.columns else "Post Code" ) if "NO." in master_data.columns: house_no_col = "NO." elif "NO" in master_data.columns: house_no_col = "NO" else: house_no_col = "NUMBER" if "PROPERTY TYPE As per table emailed" in master_data.columns: property_type_col = "PROPERTY TYPE As per table emailed" elif "PROPERTY TYPE As per table emailed" in master_data.columns: property_type_col = "PROPERTY TYPE As per table emailed" elif "PROPERTY TYPE" in master_data.columns: property_type_col = "PROPERTY TYPE" elif "Property Type" in master_data.columns: property_type_col = "Property Type" else: property_type_col = "PROPERTY TYPE (SEE DEEMED SCORES SHEET) Eg. 3W_Flat_1 (As per Matrix)" if "INSTALLERS NOTES ; REASONS FOR CANCELLATIONS" in master_data.columns: installer_notes_col = "INSTALLERS NOTES ; REASONS FOR CANCELLATIONS" elif "INSTALLERS NOTES" in master_data.columns: installer_notes_col = "INSTALLERS NOTES" elif "Installers Notes" in master_data.columns: installer_notes_col = "Installers Notes" elif ( "NOTES ; REASONS FOR CANCELLATIONS OR WHERE INSTALL DATE WAS OBTAINED FROM" in master_data.columns ): installer_notes_col = "NOTES ; REASONS FOR CANCELLATIONS OR WHERE INSTALL DATE WAS OBTAINED FROM" elif ( "INSTALLERS NOTES / REASONS FOR CANCELLATIONS / WHERE INSTALL DATE WAS RECEIVED FROM" in master_data.columns ): installer_notes_col = ( "INSTALLERS NOTES / REASONS FOR CANCELLATIONS / WHERE INSTALL DATE WAS RECEIVED " "FROM" ) else: raise ValueError("No installer notes column found in master data") if "INSTALLER" in master_data.columns: installer_col = "INSTALLER" elif "Installer" in master_data.columns: installer_col = "Installer" else: raise ValueError("No installer column found in master data") measure_mix_col = "MEASURE COMBO" if "TOWN" in master_data.columns: town_colname = "TOWN" elif "Town/Area" in master_data.columns: town_colname = "Town/Area" else: town_colname = "Town/City" logger.info("Matching master data to asset list") matched = [] unmatched = [] for _, row in tqdm(master_data.iterrows(), total=len(master_data)): original_house_no = row[house_no_col] original_street = row["Street / Block Name"] original_postcode = row[postcode_col] if pd.isnull(row[postcode_col]): continue if master_id_colnames[idx] is not None: # Filter the standardised asset list on this df = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] == row[master_id_colnames[idx]] ] if df.shape[0] == 1: matched.append( { "row_id": row["row_id"], "original_house_no": original_house_no, "original_street": original_street, "original_postcode": original_postcode, self.STANDARD_LANDLORD_PROPERTY_ID: df[ self.STANDARD_LANDLORD_PROPERTY_ID ].values[0], } ) continue postcode_no_space = row[postcode_col].strip().replace(" ", "").lower() df = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_POSTCODE] .str.strip() .str.lower() .str.replace(" ", "") == postcode_no_space ) ] house_no = row[house_no_col] if pd.isnull(house_no): house_no = None if isinstance(house_no, (float, int)): house_no = str(int(house_no)) if house_no not in df["house_no"].values: # Handle postcode errors postal_region = row[postcode_col].split(" ")[0].lower() df = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_POSTCODE] .str.strip() .str.lower() .str.startswith(postal_region) ) ] if house_no not in df["house_no"].values: unmatched.append(row["row_id"]) continue df = df[df["house_no"] == house_no] if df.shape[0] > 1: df = df[ df[self.STANDARD_FULL_ADDRESS] .str.lower() .str.contains(row["Street / Block Name"].lower()) ] if df.shape[0] == 0: unmatched.append(row["row_id"]) continue matched.append( { "row_id": row["row_id"], "original_house_no": original_house_no, "original_street": original_street, "original_postcode": original_postcode, self.STANDARD_LANDLORD_PROPERTY_ID: df[ self.STANDARD_LANDLORD_PROPERTY_ID ].values[0], } ) continue if house_no in df["house_no"].values: df = df[df["house_no"] == house_no] if df.shape[0] != 1: # Levenstein distance if any( df[self.STANDARD_FULL_ADDRESS].str.contains( row["Street / Block Name"] ) ): df = df[ df[self.STANDARD_FULL_ADDRESS].str.contains( row["Street / Block Name"] ) ] else: # Levenstein distance df = df[ df[self.STANDARD_FULL_ADDRESS] .str.lower() .apply( lambda x: process.extractOne( " ".join( [ row[house_no_col], row["Street / Block Name"], row[town_colname], ] ).lower(), x, )[1] ) > 90 ] if df.shape[0] == 0: unmatched.append(row["row_id"]) continue if any( df[self.STANDARD_FULL_ADDRESS] .str.lower() .str.contains( " ".join( [row[house_no_col], row["Street / Block Name"]] ).lower() ) ): df = df[ df[self.STANDARD_FULL_ADDRESS] .str.lower() .str.contains( " ".join( [row[house_no_col], row["Street / Block Name"]] ).lower() ) ] if any( df[self.STANDARD_PROPERTY_TYPE].str.contains( row[property_type_col].split(" ")[-1].lower() ) ): # We ignore "block of flats" entries df = df[ df[self.STANDARD_PROPERTY_TYPE].str.contains( row[property_type_col].split(" ")[-1].lower() ) & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats") ] if df.shape[0] != 1: # We have multiple matches - it's likely because the landlord has a duplicate # that has been referenced in totally different ways so we just match to both for _, x in df.iterrows(): matched.append( { "row_id": row["row_id"], "original_house_no": original_house_no, "original_street": original_street, "original_postcode": original_postcode, self.STANDARD_LANDLORD_PROPERTY_ID: x[ self.STANDARD_LANDLORD_PROPERTY_ID ], } ) continue matched.append( { "row_id": row["row_id"], "original_house_no": original_house_no, "original_street": original_street, "original_postcode": original_postcode, self.STANDARD_LANDLORD_PROPERTY_ID: df[ self.STANDARD_LANDLORD_PROPERTY_ID ].values[0], } ) self.standardised_asset_list = self.standardised_asset_list.drop( columns="house_no" ) # We match the "UPRN" which is the landlords ID, onto the master sheet if measure_mix_col not in master_data.columns: master_data[measure_mix_col] = "Measure mix not recorded" matched = pd.DataFrame(matched) if matched.empty: continue master_to_append = ( master_data[ [ scheme_col, "row_id", install_col, submission_col, measure_mix_col, installer_notes_col, installer_col, ] ] .merge(matched, how="left", on="row_id") .rename( columns={ scheme_col: "funding_scheme", measure_mix_col: "measure_mix", install_col: "survey_status", submission_col: "submission_date", installer_notes_col: "submission_installer_notes", installer_col: "submission_installer", } ) ) master_to_append["submission_cancelled"] = ( master_to_append["survey_status"].str.lower().str.contains("cancel") ) master_to_append["submission_installed"] = ( master_to_append["survey_status"].str.lower().str.contains("installed") ) master_surveyed.append(master_to_append) unmatched_df = master_data[master_data["row_id"].isin(unmatched)] # The columns are massively different - we take just a few unmatched_df = unmatched_df[ [ scheme_col, house_no_col, "Street / Block Name", postcode_col, install_col, submission_col, ] ].rename( columns={ scheme_col: "Funding Scheme", house_no_col: "House Number", postcode_col: "Postcode", install_col: "survey_status", submission_col: "submission_date", } ) unmatched_submissions.append(unmatched_df) master_surveyed = pd.concat(master_surveyed) master_surveyed = master_surveyed[ ~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID]) ] master_surveyed = master_surveyed[ ~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin( ["NOT ON ASSET LIST", "Missing From Asset List"] ) ] master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[ self.STANDARD_LANDLORD_PROPERTY_ID ].astype(str) # We de-dupe crudely on landlord property id self.master_surveyed = master_surveyed.drop_duplicates( subset=[self.STANDARD_LANDLORD_PROPERTY_ID] ).copy() # We now add the submission status, based on the hubspot stages self.master_surveyed["submission_status"] = ( hubspot_config.HubspotProcessStatus.SUBMITTED_TO_INSTALLER.label ) self.master_surveyed["submission_status"] = np.where( self.master_surveyed["submission_cancelled"] == True, hubspot_config.HubspotProcessStatus.INSTALLER_CANCELLED_FINALIZED.label, self.master_surveyed["submission_status"], ) self.master_surveyed["submission_status"] = np.where( self.master_surveyed["submission_installed"] == True, hubspot_config.HubspotProcessStatus.INSTALL_COMPLETE.label, self.master_surveyed["submission_status"], ) self.standardised_asset_list = self.standardised_asset_list.merge( self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID ) # Make sure no dupes if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum(): raise ValueError("duplicated ids!") # Finally, we keep a record of the unmatched if unmatched_submissions: self.unmatched_submissions = pd.concat(unmatched_submissions)