import hashlib import os import re import tiktoken from pprint import pprint from datetime import datetime from openai import OpenAI import numpy as np import pandas as pd from tqdm import tqdm from fuzzywuzzy import process from utils.logger import setup_logger from backend.SearchEpc import SearchEpc from BaseUtility import Definitions import asset_list.mappings.property_type as property_type_mappings import asset_list.mappings.walls as walls_mappings import asset_list.mappings.heating_systems as heating_mappings import asset_list.mappings.exising_pv as existing_pv_mappings import asset_list.mappings.built_form as built_form_mappings import asset_list.mappings.roof as roof_mappings from recommendations.recommendation_utils import ( estimate_perimeter, estimate_external_wall_area, estimate_number_of_floors ) from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes logger = setup_logger() # OpenAI API Key (set this in your environment variables for security) OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") class DataRemapper: def __init__(self, standard_values, standard_map=None, max_tokens=1000): """ Initialize the remapper with standard values and a predefined mapping. :param standard_values: Set of allowed standardized values. :param standard_map: Dictionary of common remappings {raw_value: standard_value}. """ self.standard_values = standard_values self.standard_map = standard_map self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing # Tokenizer for counting tokens self.tokenizer = tiktoken.encoding_for_model(self.ai_model) # Track token usage and remap dictionary self.total_tokens_used = 0 self.total_cost = 0 self.remap_dict = {} # {original_value: standardized_value} self.max_tokens = max_tokens # Limit for OpenAI API # Memoization for AI calls self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}} # Capture the reponse for debugging self.ai_response = None # OpenAI pricing (as of Feb 2024) self.pricing = { "gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000}, "gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000}, } self.openai_client = OpenAI(api_key=OPENAI_API_KEY) @staticmethod def clean_string(text): """Basic text cleaning: remove extra spaces, punctuation, and normalize case.""" if not isinstance(text, str): return None text = text.strip().lower() text = re.sub(r'[^\w\s]', '', text) # Remove punctuation # Replace double strings text = re.sub(r'\s+', ' ', text) return text def fuzzy_match(self, text): """Use fuzzy matching to find the closest standard value.""" match, score = process.extractOne(text, self.standard_values) if text else (None, 0) return match if score >= self.fuzzy_threshold else None def count_tokens(self, text): """Estimate the number of tokens in a given text.""" return len(self.tokenizer.encode(text)) if text else 0 def ai_standardize(self, unmapped_values): """Call OpenAI API **once** for all unmapped values to minimize cost, with memoization.""" if not unmapped_values: return {} unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization if unmapped_tuple in self.ai_cache: return self.ai_cache[unmapped_tuple] # Return memoized result prompt = f""" You are an expert in data classification. Standardize each of these values into one of the categories: {list(self.standard_values)}. Return only a JSON dictionary where: - The keys are the original values. - The values are the standardized ones. Strictly return JSON **without markdown formatting** or extra text. Example Output: {{ "BLKHOUS": "block house", "BEDSIT": "bedsit" }} Values to standardize: {unmapped_values} """ # Count input tokens input_tokens = self.count_tokens(prompt) if input_tokens > self.max_tokens: raise ValueError("Input tokens exceed the maximum limit.") logger.info("Calling OpenAI API for standardization...") response = self.openai_client.chat.completions.create( model=self.ai_model, messages=[{"role": "user", "content": prompt}], max_tokens=self.max_tokens, temperature=0.1, ) output_text = response.choices[0].message.content.strip() output_tokens = self.count_tokens(output_text) # Count output tokens # Track total token usage self.total_tokens_used += input_tokens + output_tokens # Estimate cost input_cost = input_tokens * self.pricing[self.ai_model]["input"] output_cost = output_tokens * self.pricing[self.ai_model]["output"] self.total_cost += input_cost + output_cost try: # Parse response as dictionary mapping = eval(output_text) # OpenAI should return a valid dictionary except: mapping = {val: "unknown" for val in unmapped_values} # Fallback # Memoize the AI response self.ai_cache[unmapped_tuple] = mapping # We store the raw AI response for debugging logger.debug(f"AI Response: {mapping}") self.ai_response = output_text return mapping def standardize_list(self, values_to_remap): """ Standardizes a list of values and returns a dictionary {original_value: standardized_value}. :param values_to_remap: List of raw values to standardize. :return: Dictionary {original_value: standardized_value}. """ unique_values = set(values_to_remap) # Process only unique values unmapped_values = [] for value in unique_values: if pd.isna(value): # Handle NaN values self.remap_dict[value] = "unknown" continue cleaned_value = self.clean_string(value) # Rule-Based Check (Predefined Mapping) if cleaned_value in self.standard_map or value in self.standard_map: self.remap_dict[value] = ( self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value] ) continue if value.lower() in self.standard_map: self.remap_dict[value] = self.standard_map[value.lower()] continue # Exact Match in Standard Values if cleaned_value in self.standard_values: self.remap_dict[value] = cleaned_value continue # Fuzzy Matching fuzzy_match = self.fuzzy_match(cleaned_value) if fuzzy_match: self.remap_dict[value] = fuzzy_match continue # Capture anything that wasn't mapped unmapped_values.append(value) # AI Model - remap anything unmapped (batch request) ai_mapping = self.ai_standardize(unmapped_values) self.remap_dict.update(ai_mapping) return self.remap_dict def report_usage(self): """Prints a summary of token usage and cost.""" print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}") print(f"💰 Estimated Cost: ${self.total_cost:.4f}") class AssetList: """ This class is used to standardise asset lists so that we can process the core information in a consistent manner. """ EPC_API_DATA_NAMES = { "uprn": "epc_os_uprn", "address1": "epc_address1", "address": "epc_address", "postcode": "epc_postcode", "inspection-date": "epc_inspection_date", "current-energy-efficiency": "epc_sap_score_on_register", "current-energy-rating": "epc_rating_on_register", "property-type": "epc_property_type", "built-form": "epc_archetype", "total-floor-area": "epc_total_floor_area", "construction-age-band": "epc_age_band", "floor-height": "epc_floor_height", "number-habitable-rooms": "epc_number_habitable_rooms", "walls-description": "epc_wall_construction", "roof-description": "epc_roof_construction", "floor-description": "epc_floor_construction", "mainheat-description": "epc_heating_type", 'mainheatcont-description': "epc_heating_controls", "secondheat-description": "epc_secondary_heating", "transaction-type": "epc_reason", "energy-consumption-current": "epc_heat_demand", "photo-supply": "epc_photo_supply", "estimated": "estimated" } FIND_EPC_DATA_NAMES = { "heating_text": "epc_estiamted_heating_kwh", "hot_water_text": "epc_estimated_hotwater_kwh", 'Assessor’s name': "epc_assessor_name", "Assessor's Telephone": "epc_assessor_telephone", "Assessor's Email": "epc_assessor_email", "Accreditation scheme": "epc_assessor_accreditation", "Assessor’s ID": "epc_assessor_id", "Solar photovoltaics": "epc_solar_pv" } DATETIME_REMAP = { "Pre 1900": datetime(year=1899, month=12, day=31), } # These are the accepted methods we have for cleaning the address1 column ADDRESS_1_CLEANING_METHODS = [ "first_two_words", # This method will split on the fist two words, where the separator is a space "first_word", # This method will split on the first word, where the separator is a space "house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber # "address1_extraction" # This method will use the NLP model to extract address1 ] # Standard column Names STANDARD_ADDRESS_1 = "domna_address_1" STANDARD_POSTCODE = "domna_postcode" STANDARD_FULL_ADDRESS = "domna_full_address" STANDARD_YEAR_BUILT = "landlord_year_built" STANDARD_UPRN = "ordnance_survey_uprn" STANDARD_LANDLORD_PROPERTY_ID = "landlord_property_id" STANDARD_PROPERTY_TYPE = "landlord_property_type" STANDARD_BUILT_FORM = "landlord_built_form" STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction" STANDARD_ROOF_CONSTRUCTION = "landlord_roof_construction" STANDARD_HEATING_SYSTEM = "landlord_heating_system" STANDARD_EXISTING_PV = "landlord_existing_pv" STANDARD_SAP = "landlord_sap_rating" DOMNA_PROPERTY_ID = "domna_property_id" # Regular expression for identifying if the address might point to multiple units MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b') # List of columns relating to the non-intrusive data NON_INTRUSIVES_COLNAMES = [ "Archetype", "Construction", "Insulated", "Material", "CIGA Check Required", "PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION", "Any further surveyor notes", 'Surveyors Name' ] NON_INTRUSIVES_ELIGIBILITY_COLUMN = "Eligibility (Red/Yellow/Green)" OLD_FORMAT_NON_INTRUSIVE_COLNAMES = ['WFT Findings', 'ECO Eligibility'] # This SAP threshold is a key search criteria for properties that may be eligible for extraction FILLED_CAVITY_SAP_THRESHOLD = 75 # This SAP the EMPTY_CAVITY_SAP_THRESHOLD = 75 # Any EPC deemed to have been conducted prior to this year is deemed to be unreliable EPC_YEAR_THRESHOLD = pd.Timestamp.now().year - 5 # Properties before this year are more likely to have lower EPC ratings and more likely to qualify EMPTY_CAVITY_YEAR_THRESHOLD = 2002 # Attributes - these are columns that we produce, calcualted based on other pieces of data ATTRIBUTE_HAS_SOLAR = "attribute_has_solar" ATTRIBUTE_NUMBER_OF_FLOORS = "attribute_est_number_floors" ATTRIBUTE_ESTIMATED_PERIMETER = "attribute_est_perimter" ATTRIBUTE_HEAT_LOSS_AREA = "attribute_heat_loss_area" ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS = "attribute_epc_roof_insulation_thickness" ATTRIBUTE_SAP_THRESHOLD_AND_BELOW = f"sap_rating_{FILLED_CAVITY_SAP_THRESHOLD}_and_below" ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD = f"epc_is_pre_{EPC_YEAR_THRESHOLD}" # These are the descriptions that we look for in the EPC data that are indicative of no insulation EPC_NO_WALL_INSULATION_DESCRIPTIONS = [ "cavity wall, as built, no insulation (assumed)", "cavity wall, as built, partial insulation (assumed)", "cavity wall, as built, partial insulation", "cavity wall, as built, no insulation", ] # List of strings that we look for in the EPC data, where substrings indicate that the wall is insulated EPC_INSULATED_WALLS_SUBSTRINGS = [ ", insulated", "with external insulation", "with internal insulation", "filled cavity" ] # List of strings that we look for in the EPC data, where substrings indicate that the roof is insulated EPC_INSULATED_ROOF_SUBSTRINGS = [ "(another dwelling above)", ", insulated", ", insulated (assumed) ", ", ceiling insulated", ] # List of strings we look for in the EPC data, where substrings indicate that the cavity is empty UNINSULATED_CAVITY_SUBSTRINGS = [ "cavity wall, as built, no insulation (assumed)", "cavity wall, as built, no insulation", "cavity wall, as built, partial insulation (assumed)", "cavity wall, as built, partial insulation", ] def __init__( self, local_filepath, sheet_name, address1_colname, postcode_colname, full_address_colname, landlord_property_id=None, full_address_cols_to_concat=None, missing_postcodes_method=None, address1_extraction_method=None, landlord_year_built=None, landlord_uprn=None, landlord_property_type=None, landlord_built_form=None, landlord_wall_construction=None, landlord_roof_construction=None, landlord_heating_system=None, landlord_existing_pv=None, landlord_sap=None, phase=False, header=0 ): self.local_filepath = local_filepath self.sheet_name = sheet_name # Read in the data if local_filepath.endswith(".xlsx"): self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name) else: self.raw_asset_list = pd.read_csv(local_filepath) self.standardised_asset_list = self.raw_asset_list.copy() # Will be used to store aggregated figures against the various work types self.work_type_figures = {} self.flat_data = None self.duplicated_addresses = None self.contact_details = None self.contact_detail_fields = None self.outcomes = None self.outcomes_no_match = pd.DataFrame() self.outcomes_for_output = pd.DataFrame() self.master_surveyed = None self.unmatched_submissions = pd.DataFrame() self.ecosurv = None self.ecosurv_no_match = pd.DataFrame() # When this is True, we intend to break the programme into multiple phases. We may need to review # how this is structured in the future, as depending on how we get future data, we may need to # remove some existing phases from the reporting, or specifically highlight the phase (1 to n-1) # properties, assuming the current phase is n. self.phase = phase # We detect the presence of the non-intrusive columns self.non_intrusives_present = "CIGA Check Required" in self.raw_asset_list.columns # We detect if we have the old format of non-intruvies self.old_format_non_intrusives_present = "WFT Findings" in self.raw_asset_list.columns self.non_intrusives_eligibility = "Eligibility (Red/Yellow/Green)" in self.raw_asset_list.columns # Names of columns self.landlord_property_id = landlord_property_id self.address1_colname = address1_colname self.postcode_colname = postcode_colname self.full_address_colname = full_address_colname self.landlord_year_built = landlord_year_built self.landlord_uprn = landlord_uprn self.landlord_property_type = landlord_property_type self.landlord_built_form = landlord_built_form self.landlord_wall_construction = landlord_wall_construction self.landlord_roof_construction = landlord_roof_construction self.landlord_heating_system = landlord_heating_system self.landlord_existing_pv = landlord_existing_pv self.landlord_sap = landlord_sap # parameters for cleaning self.full_address_cols_to_concat = full_address_cols_to_concat self.missing_postcodes_method = missing_postcodes_method self.address1_extraction_method = address1_extraction_method self.debug_information = { "property_type": None, "wall_construction": None, "heating_system": None, "existing_pv": None } self.variable_mappings = {} self.hubspot_data = None self.rename_map = {} self.keep_variables = [] # Finally, we handle the case where the landlord's property ID is actually the OS UPRN if (self.landlord_uprn == self.landlord_property_id) and (self.landlord_property_id is not None): self.standardised_asset_list[self.STANDARD_UPRN] = self.standardised_asset_list[self.landlord_uprn].copy() # Update the reference to landlord UPRn self.landlord_uprn = self.STANDARD_UPRN # Handle the case when full address and address 1 are the same if self.full_address_colname == self.address1_colname: self.full_address_colname = self.STANDARD_FULL_ADDRESS self.standardised_asset_list[self.full_address_colname] = ( self.standardised_asset_list[self.address1_colname].copy() ) # Handle the case where the property type column is the same as the built type if self.landlord_property_type == self.landlord_built_form: self.landlord_built_form = self.STANDARD_BUILT_FORM self.standardised_asset_list[self.landlord_built_form] = ( self.standardised_asset_list[self.landlord_property_type].copy() ) # If landlord built form is None (which it often is) we use the built for from inspections if (self.landlord_built_form is None) and self.non_intrusives_present: self.landlord_built_form = self.STANDARD_BUILT_FORM self.standardised_asset_list[self.landlord_built_form] = ( self.standardised_asset_list["Archetype"].copy() ) def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"): if method not in self.ADDRESS_1_CLEANING_METHODS: raise ValueError(f"Method {method} for producing address1 not recognized") if method == "first_two_words": asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") return asset_list if method == "first_word": asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0] return asset_list if method == "house_number_extraction": asset_list[self.address1_colname] = asset_list.apply( lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]), axis=1 ) for _, x in asset_list.iterrows(): SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]) return asset_list raise ValueError(f"Method {method} not recognized") @staticmethod def _address1_extraction(x): pass def create_property_id(self): """ This function creates the domna property ID, which is simply a hash of the full address and postcode We want all figures to be positive :return: """ # We'll remove punctuation and whitespace from the address, before hashing to produce an ID def _make_hash(value): """Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value.""" # Normalize and remove special characters for cleaner ID cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower() # Generate SHA-256 hash and truncate it short_hash = hashlib.sha256(value.encode()).hexdigest()[:12] return f"{cleaned_value}-{short_hash}" # Apply transformation self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = ( self.standardised_asset_list[self.full_address_colname] + self.standardised_asset_list[self.postcode_colname] ).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash) @staticmethod def _strip_postcode_from_full_address(full_address, postcode): cleaned = full_address.replace(postcode, "") # Remove any trailing commas and spaces cleaned = cleaned.rstrip(", ").strip(",").strip() return cleaned @classmethod def _identify_multi_address(cls, address): # We check if the address is comma separated if "," in address: address1_section = address.split(",")[0] # We look for string in the form (x-y) return bool(cls.MULTI_UNIT_REGEX.search(address1_section)) @staticmethod def _convert_uprn(x): """ Used to convert UPRNS to integer strings :param x: uprn to convert :return: converted uprn """ if pd.isnull(x): return x # check if numeric if np.isreal(x): return str(int(x)) if str(x).isdigit(): return str(int(x)) return x @staticmethod def _clean_postcode(postcode): # Remove double spaces postcode = postcode.replace(" ", " ") if " " not in postcode: # Restructure it return " ".join( [postcode[:-3], postcode[-3:]] ) return postcode def init_standardise(self): """ This function is used to standardise the asset list :return: standardised asset list """ # Remove rows without a postcode if self.postcode_colname is not None: self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname]) # We also clean postcode columns where if there is not space, we create one self.standardised_asset_list[self.postcode_colname] = self.standardised_asset_list[ self.postcode_colname ].apply(self._clean_postcode) # We clean up portential non-breaking spaces, and double spaces for col in [ c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if c is not None ]: self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str) self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False) self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False) if self.address1_colname is None: if self.address1_extraction_method is None: raise ValueError("Missing address 1 - please specify an extraction method") self.address1_colname = self.STANDARD_ADDRESS_1 # If we do not have this, we produce it self.standardised_asset_list = self._extract_address1( asset_list=self.standardised_asset_list, full_address_col=self.full_address_colname, postcode_col=self.postcode_colname, method=self.address1_extraction_method ) if self.full_address_colname is None: if not self.full_address_cols_to_concat: raise ValueError("Missing full address - please specify columns to concatenate") self.full_address_colname = self.STANDARD_FULL_ADDRESS self.standardised_asset_list[self.full_address_colname] = ( self.standardised_asset_list[self.full_address_cols_to_concat].apply( lambda x: ", ".join([y for y in x if not pd.isnull(y)]), axis=1 ) ) else: # Make sure to strip the postcode out of the full address self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply( lambda x: self._strip_postcode_from_full_address( full_address=x[self.full_address_colname], postcode=x[self.postcode_colname] ), axis=1 ) # We create the domna property id self.create_property_id() # Clean up the UPRN column, if the landlord has provided them if self.landlord_uprn is not None: self.standardised_asset_list[self.landlord_uprn] = ( self.standardised_asset_list[self.landlord_uprn].apply(self._convert_uprn) ) # We keep just the columns we care about and will work through the various columns and standardise variables = [ self.landlord_property_id, self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname, self.full_address_colname, self.landlord_uprn, self.landlord_property_type, self.landlord_built_form, self.landlord_year_built, self.landlord_wall_construction, self.landlord_roof_construction, self.landlord_heating_system, self.landlord_existing_pv, self.landlord_sap, ] # Keep just non-null variables (e.g landlord may not provide uprn self.keep_variables = [v for v in variables if v is not None] self.rename_map = { self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID, self.address1_colname: self.STANDARD_ADDRESS_1, self.postcode_colname: self.STANDARD_POSTCODE, self.full_address_colname: self.STANDARD_FULL_ADDRESS, self.landlord_uprn: self.STANDARD_UPRN, self.landlord_property_type: self.STANDARD_PROPERTY_TYPE, self.landlord_built_form: self.STANDARD_BUILT_FORM, self.landlord_year_built: self.STANDARD_YEAR_BUILT, self.landlord_wall_construction: self.STANDARD_WALL_CONSTRUCTION, self.landlord_roof_construction: self.STANDARD_ROOF_CONSTRUCTION, self.landlord_heating_system: self.STANDARD_HEATING_SYSTEM, self.landlord_existing_pv: self.STANDARD_EXISTING_PV, self.landlord_sap: self.STANDARD_SAP, } self.rename_map = {k: v for k, v in self.rename_map.items() if k is not None} non_intrusive_columns = [] if self.non_intrusives_present: non_intrusive_columns = self.NON_INTRUSIVES_COLNAMES if self.non_intrusives_eligibility: non_intrusive_columns.append(self.NON_INTRUSIVES_ELIGIBILITY_COLUMN) if self.old_format_non_intrusives_present: # We check if we have the ECO Eligibility column, which we might not have non_intrusive_columns = [ c for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES if c in self.standardised_asset_list.columns ] self.keep_variables += non_intrusive_columns self.rename_map = { **self.rename_map, **dict( zip(non_intrusive_columns, ["non-intrusives: " + c for c in non_intrusive_columns]) ) } # We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y) self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[ self.full_address_colname ].apply(lambda x: self._identify_multi_address(x)) # We handle cleaning for walls, in the instance that the landlord provides us with EPC data and # we see instances of "average thermal transmittance" in the description if self.landlord_wall_construction is not None: self.standardised_asset_list[self.landlord_wall_construction] = np.where( self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains( "average thermal transmittance" ) == True, "new build - average thermal transmittance", self.standardised_asset_list[self.landlord_wall_construction] ) else: # We want to make sure that we have a column for wall construction self.landlord_wall_construction = self.STANDARD_WALL_CONSTRUCTION self.standardised_asset_list[self.landlord_wall_construction] = None if self.landlord_roof_construction is None: self.landlord_roof_construction = self.STANDARD_ROOF_CONSTRUCTION self.standardised_asset_list[self.landlord_roof_construction] = None # Clear our build year column # We attempt to process the year built column if self.landlord_year_built is not None: # We check if we have a datetime - year built has not been renamed if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime): # We treat any string columns - with common values we see self.standardised_asset_list[self.landlord_year_built] = ( self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP) ) self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime( self.standardised_asset_list[self.landlord_year_built] ) # Convert this to year self.standardised_asset_list[self.landlord_year_built] = ( self.standardised_asset_list[self.landlord_year_built].dt.year ) else: # We attempt to convert the year built to a datetime, by detecting the format and converting def extract_year(date_str): """ Extracts the year from a date string in the format '01-Jul-YYYY'. Returns the extracted year as an integer or None if the format is incorrect. """ known_errors = [ "#MULTIVALUE", "This cell has an external reference that can't be shown or edited. Editing this cell will " "remove the external reference.", "ND", 'PIMSS EMPTY' ] if pd.isnull(date_str) or date_str in known_errors or (date_str == 0): return None if isinstance(date_str, str): match = re.match(r"\d{1,2}-[A-Za-z]{3}-(\d{4})", date_str) if match: return int(match.group(1)) # Extract the year and convert to integer if "-" in date_str: # Count the number of times we have "-", as we've seen double ranges # (when we have extensions) so the format is like this: # 'G: 1983-1990, H: 1991-1995' if date_str.count("-") == 2: # We have a range return int(date_str.split("-")[1].split(",")[0]) # We probably have a range return int(date_str.split("-")[1].strip()) if isinstance(date_str, datetime): return date_str.year if isinstance(date_str, float): if str(int(date_str)).isdigit() & (len(str(int(date_str))) == 4): return int(date_str) # Check if date_str is a year itself if str(date_str).isdigit() & (len(str(date_str)) == 4): return int(date_str) # Remove any non-numeric characters date_str = re.sub(r"\D", "", str(date_str)) if str(date_str).isdigit() & (len(str(date_str)) == 4): return int(date_str) raise NotImplementedError(f"Unhandled format for year built, value is {date_str} - implement me") self.standardised_asset_list[self.landlord_year_built] = self.standardised_asset_list[ self.landlord_year_built ].apply(extract_year) # We now create standard lookups to_remap = { self.landlord_property_type: { "standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES, "standard_map": property_type_mappings.PROPERTY_MAPPING }, self.landlord_built_form: { "standard_values": built_form_mappings.STANDARD_BUILT_FORMS, "standard_map": built_form_mappings.BUILT_FORM_MAPPINGS }, self.landlord_wall_construction: { "standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS, "standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS }, self.landlord_heating_system: { "standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS, "standard_map": heating_mappings.HEATING_MAPPINGS }, self.landlord_existing_pv: { "standard_values": existing_pv_mappings.STANDARD_EXISTING_PV, "standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS }, self.landlord_roof_construction: { "standard_values": roof_mappings.STANDARD_ROOF_CONSTRUCTIONS, "standard_map": roof_mappings.ROOF_CONSTRUCTION_MAPPINGS } } # Keep just entries where the key is not None to_remap = {k: v for k, v in to_remap.items() if k is not None} for variable, config in to_remap.items(): logger.info("Standardising variable: %s", variable) # Strip each of these columns self.standardised_asset_list[variable] = self.standardised_asset_list[variable].str.strip() values_to_remap = self.standardised_asset_list[variable].unique() # We want to map this to our standardised list of property types we're interested in remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"]) remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist()) self.variable_mappings[variable] = remap_dictionary # We now print out the variable mappings, which can be reviewed by the user, before the final standardised # asset list is returned for variable, mapping in self.variable_mappings.items(): pprint(f"Variable: {variable}") pprint(mapping) # Print a space print("\n") pprint("=======================================") def apply_standardiation(self, override_empty_mappings=False): """ This function applies the standardisation to the asset list :param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant if there are no categories which need remapping which is highly unlikely :return: """ if self.phase: # We filter on just the properties that have had an inspection self.standardised_asset_list = self.standardised_asset_list[ ~self.standardised_asset_list['Surveyors Name'].isin(["YET TO BE SURVEYED"]) ] if not self.variable_mappings and not override_empty_mappings: raise ValueError("Please run init_standardise first") logger.info("Applying standardisation to asset list") for variable, mapping in self.variable_mappings.items(): self.standardised_asset_list[variable + "_original_from_landlord"] = ( self.standardised_asset_list[variable].copy() ) self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping) if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum(): # Drop the dupes pprint( f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated " f"addresses - dropping" ) # Keep a record of duplicates self.duplicated_addresses = self.standardised_asset_list[ self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated() ][[self.DOMNA_PROPERTY_ID, self.address1_colname, self.postcode_colname]].copy() self.standardised_asset_list = self.standardised_asset_list[ ~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated() ] # Apply renames to our standard names # Perform final variable selection and renaming: # We add the original columns to the keep variables self.keep_variables += [ k + "_original_from_landlord" for k in self.variable_mappings.keys() ] self.standardised_asset_list = self.standardised_asset_list[self.keep_variables].rename( columns=self.rename_map ) # We fill any standard columns that are not in the data because they were not provided by the landlord missing_variables = [ v for v in [ self.STANDARD_EXISTING_PV, self.STANDARD_HEATING_SYSTEM, self.STANDARD_UPRN, self.STANDARD_PROPERTY_TYPE, self.STANDARD_YEAR_BUILT, self.STANDARD_WALL_CONSTRUCTION, self.STANDARD_HEATING_SYSTEM, self.STANDARD_EXISTING_PV ] if v not in self.standardised_asset_list.columns ] for v in missing_variables: self.standardised_asset_list[v] = None # Convert to string self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID] = ( self.standardised_asset_list[self.STANDARD_LANDLORD_PROPERTY_ID].astype(str) ) def merge_data(self, df: pd.DataFrame): """ Used to insert data into the standardised asset list, based on the domna property id :return: """ if self.DOMNA_PROPERTY_ID not in df.columns: raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}") if df[self.DOMNA_PROPERTY_ID].duplicated().sum(): raise ValueError(f"{self.DOMNA_PROPERTY_ID} contains duplicated IDs") self.standardised_asset_list = self.standardised_asset_list.merge( df, how="left", on=self.DOMNA_PROPERTY_ID ) def extract_attributes(self, pull_epc=True): # Used to extracty the typical attributes that we use to identify viable work self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR] = ( self.standardised_asset_list[self.FIND_EPC_DATA_NAMES["Solar photovoltaics"]] | ~self.standardised_asset_list[self.EPC_API_DATA_NAMES["photo-supply"]].isin(["0.0", 0, None, "", np.nan]) ) accepted_epc_property_types = ["House", "Flat", "Bungalow", "Maisonette"] # The logic here is: # 1) Take the property type provided by the HA themselves # 2) In absence of that, take the EPC property type # 3) Otherwise use None self.standardised_asset_list[self.ATTRIBUTE_NUMBER_OF_FLOORS] = self.standardised_asset_list.apply( lambda x: estimate_number_of_floors( property_type=( str(x[self.STANDARD_PROPERTY_TYPE]).title() if str(x[self.STANDARD_PROPERTY_TYPE]).title() in accepted_epc_property_types else ( x[self.EPC_API_DATA_NAMES["property-type"]] if not pd.isnull(x[self.EPC_API_DATA_NAMES["property-type"]]) else None ) ) ), axis=1 ) self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].astype(float) ) # Replace "" value with None self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].replace("", None) ) self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["number-habitable-rooms"]].astype(float) ) # Estimate the perimeter self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply( lambda x: estimate_perimeter( floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS], num_rooms=x[self.EPC_API_DATA_NAMES["number-habitable-rooms"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS], ), axis=1 ) self.standardised_asset_list[self.ATTRIBUTE_HEAT_LOSS_AREA] = self.standardised_asset_list.apply( lambda x: estimate_external_wall_area( num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS], floor_height=( float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5 ), perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER], built_form=x[self.EPC_API_DATA_NAMES["built-form"]] ), axis=1 ) self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = self.standardised_asset_list.apply( lambda x: RoofAttributes(description=x[self.EPC_API_DATA_NAMES["roof-description"]]).process()[ "insulation_thickness"] if not pd.isnull( x[self.EPC_API_DATA_NAMES["roof-description"]]) else None, axis=1 ) self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS] = ( self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].str.replace("+", "") ) # We produce some additional fields # 1) Is the SAP rating below C75 self.standardised_asset_list[self.ATTRIBUTE_SAP_THRESHOLD_AND_BELOW] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]].astype(float) <= self.FILLED_CAVITY_SAP_THRESHOLD ) # 2) Flag anything where the EPC is older than 5 years self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] = ( pd.to_datetime( self.standardised_asset_list[self.EPC_API_DATA_NAMES["inspection-date"]] ).dt.year < self.EPC_YEAR_THRESHOLD ) self.process_age_band() def process_age_band(self): processed_age_band = [] for _, x in self.standardised_asset_list.iterrows(): if pd.isnull(x[self.EPC_API_DATA_NAMES["construction-age-band"]]) or ( x[self.EPC_API_DATA_NAMES["construction-age-band"]] in Definitions.DATA_ANOMALY_MATCHES ): processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": None, "epc_year_upper_bound": None, "does_age_band_match_epc_age_band": "No EPC Age Band" } ) continue # We exatract the upper and lower bounds if x[self.EPC_API_DATA_NAMES["construction-age-band"]] in [ "England and Wales: 2007 onwards", "England and Wales: 2012 onwards" ]: year_lower_bound = 2007 if x[self.EPC_API_DATA_NAMES[ "construction-age-band"]] == "England and Wales: 2007 onwards" else 2012 if pd.isnull(x[self.STANDARD_YEAR_BUILT]): age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] >= year_lower_bound else "EPC Age Band is older than Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": year_lower_bound, "epc_year_upper_bound": None, "does_age_band_match_epc_age_band": age_band_matches } ) continue if x[self.EPC_API_DATA_NAMES["construction-age-band"]] == "England and Wales: before 1900": if pd.isnull(x[self.STANDARD_YEAR_BUILT]): age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] < 1900 else "EPC Age Band is newer than Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": None, "epc_year_upper_bound": 1899, "does_age_band_match_epc_age_band": age_band_matches } ) continue if x[self.EPC_API_DATA_NAMES["construction-age-band"]].isdigit(): if pd.isnull(x[self.STANDARD_YEAR_BUILT]): age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if x[self.STANDARD_YEAR_BUILT] == int( x[self.EPC_API_DATA_NAMES["construction-age-band"]] ) else "EPC Age Band is different from Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]), "epc_year_upper_bound": int(x[self.EPC_API_DATA_NAMES["construction-age-band"]]), "does_age_band_match_epc_age_band": age_band_matches } ) continue # Oherwise, we extract the upper and lower bounds age_band = x[self.EPC_API_DATA_NAMES["construction-age-band"]].split(": ")[1] lower_date, upper_date = age_band.split("-") if not x[self.STANDARD_YEAR_BUILT]: age_band_matches = "No Year Built From Landlord" else: age_band_matches = ( "EPC Age Band Matches Year Built" if (x[self.STANDARD_YEAR_BUILT] >= float(lower_date)) and ( x[self.STANDARD_YEAR_BUILT] <= float(upper_date) ) else "EPC Age Band is older than Year Built" if x[self.STANDARD_YEAR_BUILT] > float(upper_date) else "EPC Age Band is newer than Year Built" ) processed_age_band.append( { self.DOMNA_PROPERTY_ID: x[self.DOMNA_PROPERTY_ID], "epc_year_lower_bound": int(lower_date), "epc_year_upper_bound": int(upper_date), "does_age_band_match_epc_age_band": age_band_matches } ) processed_age_band = pd.DataFrame(processed_age_band) self.standardised_asset_list = self.standardised_asset_list.merge( processed_age_band, how="left" ) def identify_worktypes(self, cleaned): if self.landlord_sap is not None: # We add a SAP category for all work type identification self.standardised_asset_list["SAP Category"] = np.where( ( (self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68) | (self.standardised_asset_list[self.STANDARD_SAP] <= 68) ), "SAP Rating 68 or less", np.where( ( ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= self.EMPTY_CAVITY_SAP_THRESHOLD ) | (self.standardised_asset_list[self.STANDARD_SAP] <= self.EMPTY_CAVITY_SAP_THRESHOLD) ), f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}", f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more" ) ) else: # We add a SAP category for all work type identification # We break into 4 categories (54 or less, 55-68, 69-74, 75 or more) self.standardised_asset_list["SAP Category"] = np.where( (self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 54), "SAP Rating 54 or less", np.where( (self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 68), "SAP Rating 55-68", np.where( ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= self.EMPTY_CAVITY_SAP_THRESHOLD ), f"SAP Rating 69-{self.EMPTY_CAVITY_SAP_THRESHOLD}", f"SAP Rating {self.EMPTY_CAVITY_SAP_THRESHOLD + 1} or more" ), ) ) # Before we being, we identify if a property has solar already as we use this # for identifying cavity jobs if self.non_intrusives_present: existing_solar_non_intrusives_check = ( self.standardised_asset_list["non-intrusives: PV, ACCESS ISSUE, SEE NOTES"] == "SOLAR PV ON ROOF" ) elif self.old_format_non_intrusives_present: existing_solar_non_intrusives_check = ( self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin( ["solar pv on roof"] ) ) else: # We don't have an indication existing_solar_non_intrusives_check = False self.standardised_asset_list["property_has_solar"] = ( (self.standardised_asset_list[self.STANDARD_EXISTING_PV] == "already has PV") | existing_solar_non_intrusives_check | (self.standardised_asset_list[self.ATTRIBUTE_HAS_SOLAR]) ) # If we have non-intrusives completed, we can use this to identify work types ###################################################### # Empty cavity: ###################################################### # 1) Has been flagged on the non-intrusives as being a cavity wall, empty or partially filled # 2) The age is before 1995 # 3) We don't remove anything that haas access issues yet if self.non_intrusives_present: non_intrusives_wall_filter = ( (self.standardised_asset_list['non-intrusives: Construction'] == "CAVITY") & self.standardised_asset_list['non-intrusives: Insulated'].isin(["EMPTY", "PARTIAL"]) ) elif self.old_format_non_intrusives_present: non_intrusives_wall_filter = ( self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin( ["empty cavity", "partial fill"] ) | ( ( self.standardised_asset_list['non-intrusives: WFT Findings'] .str.lower().str.strip().str.contains("empty cavity|partial fill") & ~self.standardised_asset_list['non-intrusives: WFT Findings'] .astype(str).str.lower().str.strip().str.contains("major access issues") ) ) ) else: # We set the filter to False, as we have no non-intrusives non_intrusives_wall_filter = False if self.landlord_year_built is None: year_built_filter = self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD else: year_built_filter = ( (self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) | (self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) ) # Criteria: # The property isn't a bedsit # Non-intrusives indicate it needs a fill # The EPC year is before 2002 # We also flag where the property has solar on the roof, because this is a signal of a high EPC rating self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] = ( (~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) & non_intrusives_wall_filter & year_built_filter & ( ~self.standardised_asset_list["property_has_solar"] ) ) self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] = ( pd.isnull(self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]) & (~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) & non_intrusives_wall_filter & year_built_filter & ( # If the property has solar, there's a chance it won't qualify self.standardised_asset_list["property_has_solar"] ) ) # We also add a filter on anything that was generally identified by the non-intrusives self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_year_filter"] = ( pd.isnull(self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]) & pd.isnull(self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"]) & (~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) & non_intrusives_wall_filter ) self.standardised_asset_list["epc_indicates_empty_cavity"] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin( self.EPC_NO_WALL_INSULATION_DESCRIPTIONS ) & ( self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD ) & ( ~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD] ) & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"]) ) ) self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = ( self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) & ( (self.standardised_asset_list[self.STANDARD_YEAR_BUILT] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) | (self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD) ) & ( ~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"]) ) ) # Finally, we create a flag to indicate that the cavity is empty, based on the criteria above self.standardised_asset_list["cavity_is_empty"] = ( non_intrusives_wall_filter | self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin( self.EPC_NO_WALL_INSULATION_DESCRIPTIONS ) | self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) ) ###################################################### # Extraction ###################################################### # as needing a CIGA check. What is the logic we should be applying here? if self.non_intrusives_present: extraction_wall_filter = ( (self.standardised_asset_list["non-intrusives: Construction"] == "CAVITY") & (self.standardised_asset_list["non-intrusives: Insulated"].isin(["RETRO DRILLED", "FILLED AT BUILD"])) & (~self.standardised_asset_list['non-intrusives: Material'].isin( ["GREY LOOSE BEAD", "COMPACTED BEAD", "FIBRE BATT NO CAVITY", "EMPTY NARROW BELOW 30mm"] )) ) if self.non_intrusives_eligibility: # If we have the eligibility column, we check if the wall is eligible extraction_wall_filter = ( extraction_wall_filter & ~self.standardised_asset_list["non-intrusives: Eligibility (Red/Yellow/Green)"].isin( ["RED"] ) ) self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = ( extraction_wall_filter & year_built_filter ) elif self.old_format_non_intrusives_present: print("Review these categories!!!!") extraction_wall_filter = ( self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin( ["retro drilled", "retro filled", "fibre from build", "polybead", "retro drilled and filled", "retro drilled & filled", "blown in white wool", "blown in yellow wool"] ) ) self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = ( extraction_wall_filter ) else: self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = False self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] = False ###################################################### # Solar ###################################################### # Criteria: # Check 1: Does the property have a valid heating system? self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] = ( self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin( [ "air source heat pump", "ground source heat pump", "high heat retention storage heaters", "electric boiler" ] ) ) self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] = ( self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM].isin( ["electric storage heaters", "room heaters", "electric radiators", "no heating"] ) ) self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] = ( ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]] .str.lower().str.contains("air source heat pump|ground source heat pump|boiler and radiators, electric") ) | ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains( "electric storage heaters" ) & ( self.standardised_asset_list[self.EPC_API_DATA_NAMES[ "mainheatcont-description"]] == "Controls for high heat retention storage heaters" ) ) ) self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]].str.lower().str.contains( "electric storage heaters|room heaters" ) & ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["mainheatcont-description"] ] != "Controls for high heat retention storage heaters" ) ) # Basic check - both of the previous two shouldn't be true simultaneously if ( self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] & self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] ).sum(): raise ValueError("Both heating system checks are true - this should not be possible") # Check 3: Does the property meet the fabric condition # Solar PV installs are subject to the minimum insulation requirements which means: # 1) one of the following insulation measures must be installed as part of the same # ECO4 project: # • roof insulation (flat roof, pitched roof, room-in-roof) # • exterior facing wall insulation (cavity wall, solid wall) # • party cavity wall insulation # • floor insulation (solid and underfloor) # # OR # # all measures (except any exempted measure referred to in paragraph 4.28) # listed in paragraph a) must already be installed # # With this in mind, we look for 2 clases # 1) The property is fully insulated apart from the loft (<200mm insulation) # 2) THe property is fully insulated print("Should we include cavity properties where they might be uninsulated?") self.standardised_asset_list["solar_landlord_walls_insulated"] = ( self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin( [ "filled cavity", "insulated solid brick", "insulated timber frame", ] ) ) if self.non_intrusives_present: self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = ( self.standardised_asset_list["non-intrusives: Insulated"].isin( ["EWI", "RETRO DRILLED", "FILLED AT BUILD"] ) ) elif self.old_format_non_intrusives_present: self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = ( self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().isin( [ "retro drilled", "retro filled", "ewi", "retro drilled/ solid", "retro drilled and filled", ] ) | self.standardised_asset_list["non-intrusives: WFT Findings"].str.lower().str.strip().str.contains( "retro drilled" ) ) else: self.standardised_asset_list["solar_non_intrusives_walls_insulated"] = False # We merge on the u-value for average thermal transmittance walls_uvalue_data = pd.DataFrame(cleaned["walls-description"]) walls_uvalue_data = walls_uvalue_data[ ~pd.isnull(walls_uvalue_data["thermal_transmittance"]) ][["original_description", "thermal_transmittance"]].rename( columns={ "original_description": self.EPC_API_DATA_NAMES["walls-description"], "thermal_transmittance": "walls_u_value" } ) self.standardised_asset_list = self.standardised_asset_list.merge( walls_uvalue_data, how="left", on=self.EPC_API_DATA_NAMES["walls-description"] ) self.standardised_asset_list["solar_epc_walls_insulated"] = ( ( self.standardised_asset_list[ self.EPC_API_DATA_NAMES["walls-description"]].str.lower().str.contains( "|".join(self.EPC_INSULATED_WALLS_SUBSTRINGS) ) ) | ( self.standardised_asset_list["walls_u_value"].apply(lambda x: x <= 0.7 if not pd.isnull(x) else False) ) ) # We merge on the u-value for average thermal transmittance roof_roof_data = pd.DataFrame(cleaned["roof-description"])[ ["original_description", "thermal_transmittance", "is_pitched", "is_loft"] ].rename( columns={ "original_description": self.EPC_API_DATA_NAMES["roof-description"], "thermal_transmittance": "roof_u_value", } ) self.standardised_asset_list = self.standardised_asset_list.merge( roof_roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"] ) # If the u-value of a roof is less than 0.7 we consider it insulated self.standardised_asset_list["solar_epc_roof_insulated"] = ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["roof-description"]].str.lower().str.contains( "|".join(self.EPC_INSULATED_ROOF_SUBSTRINGS), ) | ( self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply( lambda x: int(x) >= 200 if str(x).isdigit() else False ) ) | ( self.standardised_asset_list["roof_u_value"].apply( lambda x: x <= 0.7 if not pd.isnull(x) else False ) ) ) self.standardised_asset_list["solar_epc_loft_needs_topup"] = ( self.standardised_asset_list[ self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].apply( lambda x: int(x) < 200 if str(x).isdigit() else False ) | ( ( self.standardised_asset_list["is_loft"] | self.standardised_asset_list["is_pitched"] ) & ( self.standardised_asset_list[self.ATTRIBUTE_EPC_ROOF_INSULATION_THICKNESS].isin( ["below average", "none"] ) ) ) ) self.standardised_asset_list["epc_has_floor_recommendation"] = ( self.standardised_asset_list["epc_has_floor_recommendation"].fillna(False) ) # Check if the boiler is electric # We check if it contains both the terms boiler & electric self.standardised_asset_list["has_electric_boiler"] = ( ( self.standardised_asset_list[self.EPC_API_DATA_NAMES["mainheat-description"]] .str.lower().isin( ["boiler and radiators, electric"]) ) | ( self.standardised_asset_list[self.STANDARD_HEATING_SYSTEM] == "electric boiler" ) ) #################################### # Check solar eligibility #################################### # Set up the filters to stop repetition correct_heating_system = ( self.standardised_asset_list["solar_landlord_data_indicates_correct_heating_system"] | self.standardised_asset_list["solar_epc_data_indicates_correct_heating_system"] | self.standardised_asset_list["has_electric_boiler"] ) needs_heating_upgrade = ( self.standardised_asset_list["solar_landlord_data_indicates_needs_heating_upgrade"] | self.standardised_asset_list["solar_epc_data_indicates_requires_heating_upgrade"] ) # The requirements for walls are: # 1) walls are insulated # 2) property is a cavity (can be done insulated or not) walls_meet_solar_requirements = ( # The landlord is saying the walls are insulated self.standardised_asset_list["solar_landlord_walls_insulated"] | # EPC data is saying the walls are insulated self.standardised_asset_list["solar_epc_walls_insulated"] | # Non-intrusives are saying the walls are insulated self.standardised_asset_list["solar_non_intrusives_walls_insulated"] | # It's empty cavity self.standardised_asset_list["cavity_is_empty"] | # It's a cavity wall (self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].str.contains("cavity")) ) not_a_flat = ( self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "flat" ) solar_roof_meets_criteria = ( self.standardised_asset_list["solar_epc_roof_insulated"] | self.standardised_asset_list["solar_epc_loft_needs_topup"] ) self.standardised_asset_list["solar_eligible"] = ( # Property isn't a flag not_a_flat & # Landlord data or EPC data indicates the heating system is appropriate correct_heating_system & # The property doesn't currently have solar ~self.standardised_asset_list["property_has_solar"] & # The walls are insulated walls_meet_solar_requirements & # Roof meets criteria solar_roof_meets_criteria ) # With heating upgrade self.standardised_asset_list["solar_eligible_needs_heating_upgrade"] = ( not_a_flat & # Needs heating upgrade needs_heating_upgrade & # The property doesn't currently have solar ~self.standardised_asset_list["property_has_solar"] & # The walls are insulated walls_meet_solar_requirements & # Roof meets criteria solar_roof_meets_criteria ) # We check for a specific sub-set of properties which are uninsulated solid wall properties that are EPC E # or below (we'll use 57 as a threshold) - These are for a pilot with Net Zero Renewables self.standardised_asset_list["solar_eligible_solid_wall_uninsulated"] = ( not_a_flat & # Landlord data or EPC data indicates the heating system is appropriate - in this case, we can also take # electric boilers correct_heating_system & # The property doesn't currently have solar ~self.standardised_asset_list["property_has_solar"] & # The walls are uninsulated solid ~walls_meet_solar_requirements & (self.standardised_asset_list[self.EPC_API_DATA_NAMES["current-energy-efficiency"]] <= 57) ) # Drop anything we don't need self.standardised_asset_list = self.standardised_asset_list.drop( columns=["walls_u_value", "roof_u_value"] ) # Adjust flagged extraction jobs to remove anything for solar self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = ( self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] & ~self.standardised_asset_list["solar_eligible"] ) # Finally, we note why each property has been flagged self.standardised_asset_list["cavity_reason"] = None empty_cavity_map = { "non_intrusive_indicates_empty_cavity": "Non-Intrusive Data Shows Empty Cavity: ", "non_intrusive_indicates_empty_cavity_has_solar": "Non-Intrusive Data Shows Empty Cavity - property " "already has solar: ", "non_intrusive_indicates_empty_cavity_no_year_filter": f"Non-Intrusive Data Shows Empty Cavity, " f"built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: ", } for variable, description in empty_cavity_map.items(): self.standardised_asset_list["cavity_reason"] = np.where( self.standardised_asset_list[variable] & pd.isnull(self.standardised_asset_list["cavity_reason"]), description + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"] ) # We break the cavity reason into a few different categories, when the EPC is different from inspections if self.old_format_non_intrusives_present: self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & (self.standardised_asset_list['non-intrusives: WFT Findings'].str.lower().str.strip().isin( [ "retro drilled and filled", "retro drilled", "retro filled", "retro drilled & filled", ] )) & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[ "SAP Category"], self.standardised_asset_list["cavity_reason"] ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & self.standardised_asset_list['non_intrusive_indicates_cavity_extraction'] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "EPC Shows Empty Cavity, inspections show filled or other: " + self.standardised_asset_list[ "SAP Category"], self.standardised_asset_list["cavity_reason"] ) else: self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & (self.standardised_asset_list['non-intrusives: Insulated'] == "RETRO DRILLED") & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "EPC Shows Empty Cavity, inspections show retro drilled: " + self.standardised_asset_list[ "SAP Category"], self.standardised_asset_list["cavity_reason"] ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & (self.standardised_asset_list['non-intrusives: Insulated'] == "FILLED AT BUILD") & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "EPC Shows Empty Cavity, inspections show filled at build: " + self.standardised_asset_list[ "SAP Category"], self.standardised_asset_list["cavity_reason"] ) self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["epc_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "EPC Shows Empty Cavity, inspections show non-cavity build: " + self.standardised_asset_list[ "SAP Category"], self.standardised_asset_list["cavity_reason"] ) # Landlord data: The landlord's data indicates that the wall is an uninsulated cavity wall, but EPC and # inspections show filled self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["landlord_data_indicates_empty_cavity"] & ~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] & ~self.standardised_asset_list["epc_indicates_empty_cavity"] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "Landlord Data Shows Empty Cavity, EPC & Inspections Shows Filled: " + self.standardised_asset_list[ "SAP Category"], self.standardised_asset_list["cavity_reason"] ) # Flag extraction self.standardised_asset_list["cavity_reason"] = np.where( ( self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] & pd.isnull(self.standardised_asset_list["cavity_reason"]) ), "Non-Intrusive Data Shows Cavity Extraction: " + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["cavity_reason"] ) ###################################################### # Flag solar ###################################################### self.standardised_asset_list["solar_reason"] = None # Map of variables and fill values for the solar_reason variable solar_reason_map = { "solar_eligible": "Solar Eligible: ", "solar_eligible_needs_heating_upgrade": ( "Solar Eligible, Needs Heating Upgrade: " ), "solar_eligible_solid_wall_uninsulated": "Solar Eligible, Solid Wall Uninsulated, EPC E or Below: ", } for variable, reason in solar_reason_map.items(): self.standardised_asset_list["solar_reason"] = np.where( self.standardised_asset_list[variable], reason + self.standardised_asset_list["SAP Category"], self.standardised_asset_list["solar_reason"] ) # Flag anything that has existing outcomes if (self.outcomes is not None) and ("surveyed" in self.standardised_asset_list.columns): if "installer refusal" not in self.standardised_asset_list.columns: self.standardised_asset_list["cavity_reason"] = np.where( ( (self.standardised_asset_list["surveyed"] > 0) ), None, self.standardised_asset_list["cavity_reason"] ) else: for col in ["cavity_reason", "solar_reason"]: self.standardised_asset_list[col] = np.where( ( (self.standardised_asset_list["surveyed"] > 0) | (self.standardised_asset_list["installer refusal"] > 0) ), None, self.standardised_asset_list[col] ) if self.master_surveyed is not None: for col in ["cavity_reason", "solar_reason"]: self.standardised_asset_list[col] = np.where( ( (~pd.isnull(self.standardised_asset_list["submission_date"])) ), None, self.standardised_asset_list[col] ) if self.ecosurv is not None: for col in ["cavity_reason", "solar_reason"]: self.standardised_asset_list[col] = np.where( ( (~pd.isnull(self.standardised_asset_list["ecosurv_reference"])) ), None, self.standardised_asset_list[col] ) blocks_of_flats = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] == "block of flats" ] non_blocks_of_flats = self.standardised_asset_list[ self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats" ] # Produce some aggregate figures self.work_type_figures = { **non_blocks_of_flats["cavity_reason"].value_counts().to_dict(), **{ k + " (Block of flats)": v for k, v in blocks_of_flats["solar_reason"].value_counts().to_dict().items() }, **self.standardised_asset_list["solar_reason"].value_counts().to_dict() } # We prepare outcomes for output if self.outcomes is not None: logger.info("Preparing outcomes for output") identified_work = self.standardised_asset_list[ ~pd.isnull(self.standardised_asset_list["cavity_reason"]) | ~pd.isnull(self.standardised_asset_list["solar_reason"]) ][self.DOMNA_PROPERTY_ID].values if self.DOMNA_PROPERTY_ID in self.outcomes.columns: self.outcomes_for_output = self.outcomes[ self.outcomes[self.DOMNA_PROPERTY_ID].isin(identified_work) ] def flat_analysis(self): # We need to deduce the building name - we strip out the house number # We want to deduce if flats have 50% of the properties below C75 # We group by postcode and property type grouped = self.standardised_asset_list.groupby( [self.STANDARD_POSTCODE, self.STANDARD_PROPERTY_TYPE] ) flat_data = [] for _, group in grouped: if "flat" in group[self.STANDARD_PROPERTY_TYPE].values: num_flats = group[self.STANDARD_PROPERTY_TYPE].shape[0] num_below_c75 = group[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ].lt(self.FILLED_CAVITY_SAP_THRESHOLD).sum() # Check if any flats are below C69 num_flats_below_c69 = group[ self.EPC_API_DATA_NAMES["current-energy-efficiency"] ].lt(69).sum() flat_data.append( { "Postcode": group[self.STANDARD_POSTCODE].iloc[0], "Property Type": "Flat", "Number of Flats with EPC": num_flats, "Number of Flats below C75": num_below_c75, "Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats), "Number of Flats Below C69": num_flats_below_c69, } ) flat_data = pd.DataFrame(flat_data) self.flat_data = flat_data @staticmethod def split_full_name(x): if pd.isnull(x): return None, None, None x = x.lower() titles = ["mr", "mrs", "ms", "miss", "dr", "prof"] # Remove titles detected_title = [title for title in titles if x.startswith(title)] if detected_title: for title in detected_title: x = x.replace(title, "") x = x.strip() first_name, last_name = x.split(" ")[0], x.split(" ")[-1] title = detected_title[0].title() if detected_title else None return title, first_name.title(), last_name.title() def load_contact_details( self, local_filepath, sheet_name, landlord_property_id, phone_number_column=None, email_column=None, fullname_column=None, firstname_column=None, lastname_column=None ): self.contact_detail_fields = { "landlord_property_id": landlord_property_id, "phone_number": phone_number_column, "email": email_column, "fullname": fullname_column, "firstname": firstname_column, "lastname": lastname_column } details_colnames = [ phone_number_column, email_column, fullname_column, firstname_column, lastname_column ] # We'll fill them none_details = [x for x in details_colnames if x is None] details_colnames = [x for x in details_colnames if x is not None] contact_details = pd.read_excel( local_filepath, sheet_name=sheet_name )[[self.contact_detail_fields["landlord_property_id"]] + details_colnames] contact_details = contact_details[ ~pd.isnull(contact_details[self.contact_detail_fields["landlord_property_id"]]) ] # Fill anything we don't have for detail in none_details: contact_details[detail] = None if fullname_column and not (firstname_column and lastname_column): contact_details["title"], contact_details["first_name"], contact_details["last_name"] = zip( *contact_details[fullname_column].apply(self.split_full_name) ) else: raise NotImplementedError("Implement me") self.contact_details = contact_details def prepare_for_crm(self, company_domain, crm_pipeline_name, first_dealstage, assigned_surveyors): """ This function prepares the data for upload into Hubspot :return: """ # This is a placeholder for now # This maps the opportunities as we reference them, to the product data as stored in Hubspot product_lookup_table = { "Non-Intrusive Data Showed Cavity Extraction": { "name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500 }, "Non-Intrusive Data Showed Empty Cavity": { "name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000 }, "Non-Intrusive Data Showed Empty Cavity but all SAP scores allowed": { "name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000 }, "Non-Intrusive Data Showed Cavity Extraction but all SAP scores allowed": { "name": "Extract & Fill - ECO4", "id": 100307905778, "unit_price": 500 }, "EPC Data Showed Empty Cavity": { "name": "Empty Cavity & Loft - ECO4", "id": 82733738177, "unit_price": 1000 }, "Solid Floor, Insulated, No Solar": { "name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608 }, "Solid Floor, Insulated, Needs Loft": { "name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608 }, "Other Floor, Insulated, No Solar": { "name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608 }, "Other Floor, Insulated, Needs Loft": { "name": "Solar PV - ECO4", "id": 82623589564, "unit_price": 1608 } } # We check if all products are covered in the lookup table cavity_products = self.standardised_asset_list["cavity_reason"].unique() solar_products = self.standardised_asset_list["solar_reason"].unique() # Check if there any options not in out lookup table if ( any(x for x in cavity_products if x not in product_lookup_table) or any(x for x in solar_products if x not in product_lookup_table) ): raise ValueError("We have products not referenced in the lookup table - check this") programme_data = self.standardised_asset_list.copy() # Exclusions - these are properties we won't treat for the moment product_exclusions = [ "Other Floor, Insulated, No Solar", "Other Floor, Insulated, Needs Loft" ] if product_exclusions: logger.warning("Excluding products: %s", product_exclusions) programme_data = programme_data[programme_data["solar_reason"].isin(product_exclusions) == False] # Merge on the contact details programme_data = programme_data.merge( self.contact_details, how="left", left_on=self.STANDARD_LANDLORD_PROPERTY_ID, right_on=self.landlord_property_id, ) programme_data["Company Domain Name "] = company_domain # Append the product data onto the programme data programme_data["cavity_product"] = programme_data["cavity_reason"].map( lambda x: product_lookup_table.get(x, {"name": None})["name"] ) programme_data["solar_product"] = programme_data["solar_reason"].map( lambda x: product_lookup_table.get(x, {"name": None})["name"] ) programme_data["domna_product"] = programme_data["solar_reason"].copy() programme_data["domna_product"] = np.where( pd.isnull(programme_data["domna_product"]), programme_data["solar_product"], programme_data["domna_product"] ) # We filter just on rows where we have a product programme_data = programme_data[ ~pd.isnull(programme_data["domna_product"]) ] programme_data = programme_data.drop(columns=["solar_product", "cavity_product"]) product_df = ( pd.DataFrame(product_lookup_table).T[["name", "id", "unit_price"]] .reset_index() .rename( columns={ "name": "Name ", "id": 'Product ID ', "unit_price": 'Unit price ', "index": "domna_product" } ) ) product_df['Quantity '] = 1 # Append on the product data programme_data = programme_data.merge( product_df, how="left", on="domna_product", ) # Add in deal and pipeline information programme_data["dealname"] = programme_data[self.STANDARD_FULL_ADDRESS] + " : " + programme_data[ "domna_product"] programme_data['Pipeline '] = crm_pipeline_name programme_data['Deal Stage '] = first_dealstage programme_data['Associations: Listing'] = "Property Owner" programme_data = programme_data.merge( assigned_surveyors.rename( columns={self.landlord_property_id: self.STANDARD_LANDLORD_PROPERTY_ID} ), how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID ) # This maps the hubspot schema to the template. Anything that is not covered in this will be flagged schema_mappings = { 'Name ': self.DOMNA_PROPERTY_ID, # TODO: Maybe change this? 'Company Domain Name ': 'Company Domain Name ', 'Email ': ( self.contact_detail_fields["email"] if self.contact_detail_fields["email"] else None ), # TODO: Review 'First Name ': ( self.contact_detail_fields["firstname"] if self.contact_detail_fields["firstname"] else None ), # TODO: Review 'Last Name ': ( self.contact_detail_fields["lastname"] if self.contact_detail_fields["lastname"] else None ), # TODO: Review 'Phone ': ( self.contact_detail_fields["phone_number"] if self.contact_detail_fields["phone_number"] else None ), # TODO: Review 'Full Address ': self.STANDARD_FULL_ADDRESS, 'Address 1 ': self.STANDARD_ADDRESS_1, 'Address 2 ': None, # TODO: Don't have this for the moment 'Postcode ': self.STANDARD_POSTCODE, 'Property Type ': self.STANDARD_PROPERTY_TYPE, 'Property Sub Type ': None, # TODO: Don't have this for the moment 'Bedroom(s) ': None, # TODO: Don't have this for the moment 'Domna Property ID ': self.DOMNA_PROPERTY_ID, 'National UPRN ': ( self.STANDARD_UPRN if self.STANDARD_UPRN is not None else self.EPC_API_DATA_NAMES["uprn"] ), 'Owner Property ID ': self.STANDARD_LANDLORD_PROPERTY_ID, 'Wall Construction ': self.STANDARD_WALL_CONSTRUCTION, 'Heating System ': self.STANDARD_HEATING_SYSTEM, 'Year Built ': self.STANDARD_YEAR_BUILT, 'Boiler Make ': None, # TODO: Don't have this for the moment 'Boiler Model ': None, # TODO: Don't have this for the moment 'Non-Intrusives: Date Checked ': None, # TODO: Don't have this for the moment 'Non-Intrusives: Wall Type ': ( "non-intrusives: Construction" if self.non_intrusives_present else None ), 'Non-intrusives: Insulation ': ( "non-intrusives: Insulated" if self.non_intrusives_present else None ), 'Non-intrusives: Insulation Material ': ( "non-intrusives: Material" if self.non_intrusives_present else None ), 'Non-Intrusives: CIGA Check Required ': ( 'non-intrusives: CIGA Check Required' if self.non_intrusives_present else None ), 'Non-Intrusives: PV Access Issues ': ( 'non-intrusives: PV, ACCESS ISSUE, SEE NOTES' if self.non_intrusives_present else None ), 'Non-Intrusives: Roof Orientation ': ( 'non-intrusives: OFF GAS - ROOF ORIENTATION' if self.non_intrusives_present else None ), 'Non-Intrusives: Surveyor Notes ': ( 'non-intrusives: Any further surveyor notes' if self.non_intrusives_present else None ), 'Non-Intrusives: Surveyor Name ': ( 'non-intrusives: Surveyors Name' if self.non_intrusives_present else None ), 'CIGA: Date Requested ': None, # TODO: Don't have this for the moment 'CIGA: Cavity Guarantee Found ': None, 'Last EPC: Is Estimated ': self.EPC_API_DATA_NAMES["estimated"], 'Last EPC: EPC Rating ': self.EPC_API_DATA_NAMES["current-energy-rating"], 'Last EPC: SAP Rating ': self.EPC_API_DATA_NAMES["current-energy-efficiency"], 'Last EPC: Main Heating Description ': self.EPC_API_DATA_NAMES[ "mainheat-description"], 'Last EPC: Heating Controls ': self.EPC_API_DATA_NAMES[ "mainheatcont-description"], 'Last EPC: Lodgement Date ': self.EPC_API_DATA_NAMES["inspection-date"], 'Last EPC: Floor Area ': self.EPC_API_DATA_NAMES["total-floor-area"], 'Last EPC: Wall ': self.EPC_API_DATA_NAMES["walls-description"], 'Last EPC: Roof ': self.EPC_API_DATA_NAMES["roof-description"], 'Last EPC: Floor ': self.EPC_API_DATA_NAMES["floor-description"], 'Last EPC: Room Height ': self.EPC_API_DATA_NAMES["floor-height"], 'Last EPC: Age Band ': self.EPC_API_DATA_NAMES["construction-age-band"], 'Deal Stage ': 'Deal Stage ', 'Pipeline ': 'Pipeline ', 'Expected Commencement Date ': None, # TODO: Need to set this, 'Deal Name ': "dealname", # Need to create this, 'Product ID ': 'Product ID ', 'Name ': 'Name ', 'Unit price ': 'Unit price ', 'Quantity ': 'Quantity ', 'Deal Owner': 'surveyor_email', 'Amount ': 'Unit price ', } # We now create the finalised dataset to be uploaded into Hubspot variables_required = list(schema_mappings.values()) variables_required = [v for v in variables_required if v is not None] # We now flag anything that has a none value, which is information we haven't got right now none_variables = [k for k, v in schema_mappings.items() if v is None] # We'll add placeholder columns for the None variables programme_data = programme_data[variables_required] for col in none_variables: programme_data[col] = None programme_data = programme_data.rename( columns={v: k for k, v in schema_mappings.items() if v is not None} ) self.hubspot_data = programme_data def flag_ecosurv(self, ecosurv_landlords=None): """ This class will match ecosurv data to the asset list :return: """ if ecosurv_landlords is None: return # TODO: Fetch from Sharepoint ecosurv_filepath = "/Users/khalimconn-kowlessar/Documents/hestia/Ecosurv/15.04.csv" logger.info("Getting Ecosurv data from %s", ecosurv_filepath) self.ecosurv = pd.read_csv( ecosurv_filepath, encoding="cp437" ) landlords = self.ecosurv["Landlord"].value_counts().reset_index(drop=False) landlord_references = landlords[ landlords["Landlord"].str.lower().str.contains(ecosurv_landlords) ] landlord_ecosurv_data = self.ecosurv[ self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values) ] # Try and match to asset list matched = [] unmatched = [] for _, row in tqdm(landlord_ecosurv_data.iterrows(), total=landlord_ecosurv_data.shape[0]): postcode = row["Postcode"].lower() df = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_POSTCODE].str.replace(" ", "").str.lower() == postcode ) ].copy() if df.empty: unmatched.append(row["Reference"]) continue if df.shape[0] > 1: house_no = SearchEpc.get_house_number(row["Address Line 1"], row["Postcode"]) df["house_no"] = df.apply( lambda x: SearchEpc.get_house_number( str(x[self.STANDARD_ADDRESS_1]), x[self.STANDARD_POSTCODE] ), axis=1 ) df = df[df["house_no"] == house_no] if df.shape[0] > 1: # We compare address line 1 to full address if any( df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains( row["Address Line 1"].lower(), na=False) ): df = df[ df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains( row["Address Line 1"].lower(), na=False ) ] if df.shape[0] > 1: df = df[df[self.STANDARD_PROPERTY_TYPE] != "other"] if df.shape[0] == 1: matched.append( { self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0], "ecosurv_reference": row["Reference"], "ecosurv_address1": row["Address Line 1"], "ecosurv_postcode": row["Postcode"], } ) continue if df.shape[0] > 1: unmatched.append(row["Reference"]) continue logger.info("Matched %s properties to ecosurv data", len(matched)) logger.info("%s properties in Ecosurv remain unmatched", len(unmatched)) # We now match matched = pd.DataFrame(matched) self.standardised_asset_list = self.standardised_asset_list.merge( matched, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID, ) # We keep a record of submissions that were NOT matches self.ecosurv_no_match = self.ecosurv[ self.ecosurv["Reference"].isin(unmatched) ].copy() def flag_outcomes( self, outcomes_filepath, outcomes_sheetname, outcomes_address, outcomes_postcode, outcomes_houseno, outcomes_id ): if outcomes_filepath is None: return self.outcomes = pd.read_excel(outcomes_filepath, sheet_name=outcomes_sheetname) self.outcomes["row_id"] = self.outcomes.index if outcomes_houseno is None: outcomes_houseno = "houseno" self.outcomes["houseno"] = self.outcomes[outcomes_address].apply( lambda x: SearchEpc.get_house_number(x, self.outcomes[outcomes_postcode]) ) logger.info("Matching outcomes to asset list") # Merge the outcomes onto the asset list - we check we're able to match sufficiently well lookup = [] nomatch = [] for _, x in tqdm(self.outcomes.iterrows(), total=len(self.outcomes)): if pd.isnull(x[outcomes_address]): continue # Check if we have an id oid = x[outcomes_id] if outcomes_id is not None else None if oid is not None: matched = self.standardised_asset_list[ (self.standardised_asset_list[ self.STANDARD_LANDLORD_PROPERTY_ID ].str.strip() == oid) ] if matched.shape[0] == 1: lookup.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0] } ) continue address_clean = x[outcomes_address].lower().replace(",", "").replace(" ", " ") self.outcomes["Outcome"] = self.outcomes["Outcome"].str.lower() matched = self.standardised_asset_list[ (self.standardised_asset_list[ self.STANDARD_FULL_ADDRESS ].str.lower().str.replace(",", "").str.replace(" ", " ") == address_clean) ] if matched.shape[0] == 1: lookup.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0] } ) continue matched = self.standardised_asset_list[ (self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip() == x[outcomes_postcode]) ].copy() if not matched.empty: matched["houseno"] = matched.apply( lambda x: SearchEpc.get_house_number( str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE]) ), axis=1 ) matched = matched[ matched["houseno"].astype(str) == str(x[outcomes_houseno]) ] if matched.shape[0] == 1: lookup.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0] } ) continue elif not matched.empty: # Use levenstein distance to match matched["address"] = matched[self.STANDARD_ADDRESS_1] + " " + matched[self.STANDARD_POSTCODE] best_match = process.extractOne(x["Address"], matched[self.STANDARD_FULL_ADDRESS].values)[0] matched = matched[matched[self.STANDARD_FULL_ADDRESS] == best_match] lookup.append( { "row_id": x["row_id"], self.DOMNA_PROPERTY_ID: matched[self.DOMNA_PROPERTY_ID].values[0] } ) continue nomatch.append(x["row_id"]) self.outcomes_no_match = self.outcomes[self.outcomes["row_id"].isin(nomatch)] lookup = pd.DataFrame(lookup) if lookup.empty: return # We will have duplicated domna property IDs, where a surveyor has been to a property multiple times # Where we have multiple rows, we want to make a call on what the action should be. For example, # there may be properties that have been visited multiple times where the outcome was "See notes" implying # that the surveyor had a detailed explanation as to why they couldn't gain access so if this has # happened multiple times, in this case we judge that the work may not be viable date_col = "Week Commencing" if "Week Commencing" in self.outcomes else "Survey Date" lookup = lookup.merge( self.outcomes[["row_id", "Outcome", "Notes", date_col]], how="left", on="row_id" ) visit_counts = ( lookup.groupby(self.DOMNA_PROPERTY_ID)["row_id"] .count() .reset_index() .rename(columns={"row_id": "visit_count"}) .sort_values("visit_count", ascending=False) ) pivot_df = lookup.groupby(["domna_property_id", "Outcome"]).size().unstack(fill_value=0).reset_index() pivot_df = pivot_df.merge( visit_counts, how="left", on="domna_property_id" ) if pivot_df[self.DOMNA_PROPERTY_ID].duplicated().sum(): raise Exception("We have duplicated property IDs in the outcomes data") # We merge this data onto outcomes self.outcomes["matched_to_asset_list"] = self.outcomes["row_id"].isin(lookup["row_id"].values) self.outcomes = self.outcomes.merge(lookup[["row_id", "domna_property_id"]], how="left", on="row_id") # We merge out pivoted outcomes onto the asset list self.standardised_asset_list = self.standardised_asset_list.merge( pivot_df, how="left", left_on=self.DOMNA_PROPERTY_ID, right_on="domna_property_id" ) self.outcomes = self.outcomes.sort_values("domna_property_id", ascending=False) def flag_survey_master( self, master_filepaths, master_to_asset_list_filepath=None ): # TODO: This probably needs further expansion if not master_filepaths: return if master_to_asset_list_filepath is not None: id_map = pd.read_csv(master_to_asset_list_filepath) else: id_map = pd.DataFrame() logger.info("Getting masters and merging onto asset list") master_surveyed = [] unmatched_submissions = [] for filepath in master_filepaths: master_data = pd.read_csv(filepath) # Strip columns master_data.columns = [c.strip() for c in master_data.columns] if not id_map.empty: master_data = master_data.merge( id_map, how="left", on=['NO.', 'Street / Block Name', 'Post Code'] ) install_col = ( "INSTALLED OR CANCELLED" if "INSTALLED OR CANCELLED" in master_data.columns else "INSTALL / CANCELLATION DATE" ) submission_col = ( "SUBMISSION DATE" if "SUBMISSION DATE" in master_data.columns else "SUBMISSION DATE TO INSTALLERS" ) if "UPRN" in master_data.columns: # We just need to check if any were cancelled master_to_append = master_data[ ["UPRN", install_col, submission_col] ].rename( columns={ "UPRN": self.STANDARD_LANDLORD_PROPERTY_ID, install_col: "survey_status", submission_col: "submission_date" } ) master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel") master_surveyed.append(master_to_append) continue master_data["row_id"] = master_data.index self.standardised_asset_list["house_no"] = self.standardised_asset_list.apply( lambda x: SearchEpc.get_house_number( str(x[self.STANDARD_ADDRESS_1]), str(x[self.STANDARD_POSTCODE]) ), axis=1 ) postcode_col = "POSTCODE" if "POSTCODE" in master_data.columns else "Post Code" house_no_col = 'NO.' if 'NO.' in master_data.columns else "NO" # Otherwise, we need to match algorithmically logger.info("Matching master data to asset list") matched = [] unmatched = [] for _, row in tqdm(master_data.iterrows(), total=len(master_data)): if pd.isnull(row[postcode_col]): continue postcode_no_space = row[postcode_col].strip().replace(" ", "").lower() df = self.standardised_asset_list[ ( self.standardised_asset_list[self.STANDARD_POSTCODE].str.strip().str.lower().str.replace(" ", "") == postcode_no_space ) ] house_no = row[house_no_col] if house_no in df["house_no"].values: df = df[df["house_no"] == house_no] if df.shape[0] != 1: # Levenstein distance if any(df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"])): df = df[ df[self.STANDARD_FULL_ADDRESS].str.contains(row["Street / Block Name"]) ] else: # Levenstein distance df = df[ df[self.STANDARD_FULL_ADDRESS].str.lower().apply( lambda x: process.extractOne( " ".join([row[house_no_col], row["Street / Block Name"], row["TOWN"]]).lower(), x )[1] ) > 90 ] if df.shape[0] == 0: unmatched.append(row["row_id"]) continue if any(df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains( " ".join([row[house_no_col], row["Street / Block Name"]]).lower() )): df = df[ df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains( " ".join([row[house_no_col], row["Street / Block Name"]]).lower() ) ] if any( df[self.STANDARD_PROPERTY_TYPE].str.contains( row["PROPERTY TYPE As per table emailed"].split(" ")[-1].lower() ) ): # We ignore "block of flats" entries df = df[ df[self.STANDARD_PROPERTY_TYPE].str.contains( row["PROPERTY TYPE As per table emailed"].split(" ")[-1].lower() ) & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats") ] if df.shape[0] != 1: # We have multiple matches raise NotImplementedError("FIX ME") matched.append( { "row_id": row["row_id"], self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0], } ) self.standardised_asset_list = self.standardised_asset_list.drop(columns="house_no") # We match the "UPRN" which is the landlords ID, onto the master sheet matched = pd.DataFrame(matched) master_to_append = master_data[["row_id", install_col, submission_col]].merge( matched, how="left", on="row_id" ).rename( columns={ install_col: "survey_status", submission_col: "submission_date" } ) master_to_append["cancelled"] = master_to_append["survey_status"].str.lower().str.contains("cancel") master_surveyed.append(master_to_append) unmatched_df = master_data[ master_data["row_id"].isin(unmatched) ] scheme_col = ( "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" if "AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns else "AFFORDABLE WARMTH" ) # The columns are massively different - we take just a few unmatched_df = unmatched_df[ [ scheme_col, house_no_col, "Street / Block Name", postcode_col, install_col, submission_col ] ].rename( columns={ scheme_col: "Funding Scheme", house_no_col: "House Number", postcode_col: "Postcode", install_col: "survey_status", submission_col: "submission_date" } ) unmatched_submissions.append(unmatched_df) master_surveyed = pd.concat(master_surveyed) master_surveyed = master_surveyed[~pd.isnull(master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID])] master_surveyed = master_surveyed[ ~master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID].isin( ["NOT ON ASSET LIST", "Missing From Asset List"] ) ] master_surveyed[self.STANDARD_LANDLORD_PROPERTY_ID] = master_surveyed[ self.STANDARD_LANDLORD_PROPERTY_ID ].astype(str) # We de-dupe crudely on landlord property id self.master_surveyed = master_surveyed.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID]) self.standardised_asset_list = self.standardised_asset_list.merge( self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID ) # Finally, we keep a record of the unmatched if unmatched_submissions: self.unmatched_submissions = pd.concat( unmatched_submissions )