Model/asset_list/AssetList.py
2025-02-19 22:51:48 +00:00

542 lines
22 KiB
Python

import hashlib
import os
import re
import tiktoken
from pprint import pprint
from datetime import datetime
from openai import OpenAI
import numpy as np
import pandas as pd
from fuzzywuzzy import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
import asset_list.mappings.exising_pv as existing_pv_mappings
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = standard_values
self.standard_map = standard_map
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = max_tokens # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
# Replace double strings
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map or value in self.standard_map:
self.remap_dict[value] = (
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
)
continue
if value.lower() in self.standard_map:
self.remap_dict[value] = self.standard_map[value.lower()]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
class AssetList:
"""
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
"""
DATETIME_REMAP = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
# These are the accepted methods we have for cleaning the address1 column
ADDRESS_1_CLEANING_METHODS = [
"first_two_words", # This method will split on the fist two words, where the separator is a space
"first_word", # This method will split on the first word, where the separator is a space
"house_number_extraction", # This method will use the NLP model in SearchEPC to extract the housenumber
# "address1_extraction" # This method will use the NLP model to extract address1
]
# Standard column Names
STANDARD_ADDRESS_1 = "domna_address_1"
STANDARD_POSTCODE = "domna_postcode"
STANDARD_FULL_ADDRESS = "domna_full_address"
STANDARD_YEAR_BUILT = "domna_year_built"
STANDARD_UPRN = "ordnance_survey_uprn"
STANDARD_PROPERTY_TYPE = "landlord_property_type"
STANDARD_WALL_CONSTRUCTION = "landlord_wall_construction"
STANDARD_HEATING_SYSTEM = "landlord_heating_system"
STANDARD_EXISTING_PV = "landlord_existing_pv"
DOMNA_PROPERTY_ID = "domna_property_id"
# Regular expression for identifying if the address might point to multiple units
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
# List of columns relating to the non-intrusive data
NON_INTRUSIVES_COLNAMES = [
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
"Any further surveyor notes", 'Surveyors Name'
]
#### Mapping for wall construction
def __init__(
self,
local_filepath,
sheet_name,
address1_colname,
postcode_colname,
full_address_colname,
landlord_property_id=None,
full_address_cols_to_concat=None,
missing_postcodes_method=None,
address1_extraction_method=None,
landlord_year_built=None,
landlord_uprn=None,
landlord_property_type=None,
landlord_wall_construction=None,
landlord_heating_system=None,
landlord_existing_pv=None,
header=0
):
self.local_filepath = local_filepath
self.sheet_name = sheet_name
# Read in the data
self.raw_asset_list = pd.read_excel(local_filepath, header=header, sheet_name=sheet_name)
self.standardised_asset_list = self.raw_asset_list.copy()
# We detect the presence of the non-intrusive columns
self.non_intrusives_present = True if "CIGA Check Required" in self.raw_asset_list.columns else False
# Names of columns
self.landlord_property_id = landlord_property_id
self.address1_colname = address1_colname
self.postcode_colname = postcode_colname
self.full_address_colname = full_address_colname
self.landlord_year_built = landlord_year_built
self.landlord_uprn = landlord_uprn
self.landlord_property_type = landlord_property_type
self.landlord_wall_construction = landlord_wall_construction
self.landlord_heating_system = landlord_heating_system
self.landlord_existing_pv = landlord_existing_pv
# parameters for cleaning
self.full_address_cols_to_concat = full_address_cols_to_concat
self.missing_postcodes_method = missing_postcodes_method
self.address1_extraction_method = address1_extraction_method
self.debug_information = {
"property_type": None,
"wall_construction": None,
"heating_system": None,
"existing_pv": None
}
self.variable_mappings = {}
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
if method not in self.ADDRESS_1_CLEANING_METHODS:
raise ValueError(f"Method {method} for producing address1 not recognized")
if method == "first_two_words":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list[self.address1_colname] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list[self.address1_colname] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
raise ValueError(f"Method {method} not recognized")
@staticmethod
def _address1_extraction(x):
pass
def create_property_id(self):
"""
This function creates the domna property ID, which is simply a hash of the full address and postcode
We want all figures to be positive
:return:
"""
# We'll remove punctuation and whitespace from the address, before hashing to produce an ID
def _make_hash(value):
"""Generates a stable SHA256 hash suffix and appends it to a cleaned version of the value."""
# Normalize and remove special characters for cleaner ID
cleaned_value = re.sub(r"[^\w\s-]", "", value).replace(" ", "_").lower()
# Generate SHA-256 hash and truncate it
short_hash = hashlib.sha256(value.encode()).hexdigest()[:12]
return f"{cleaned_value}-{short_hash}"
# Apply transformation
self.standardised_asset_list[self.DOMNA_PROPERTY_ID] = (
self.standardised_asset_list[self.full_address_colname] +
self.standardised_asset_list[self.postcode_colname]
).str.strip().str.replace(r"[^\w\s]", "", regex=True).str.replace(" ", "").str.lower().apply(_make_hash)
@staticmethod
def _strip_postcode_from_full_address(full_address, postcode):
cleaned = full_address.replace(postcode, "")
# Remove any trailing commas and spaces
cleaned = cleaned.rstrip(", ").strip(",").strip()
return cleaned
@classmethod
def _identify_multi_address(cls, address):
# We check if the address is comma separated
if "," in address:
address1_section = address.split(",")[0]
# We look for string in the form (x-y)
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
def init_standardise(self):
"""
This function is used to standardise the asset list
:return: standardised asset list
"""
# Remove rows without a postcode
if self.postcode_colname is not None:
self.standardised_asset_list = self.standardised_asset_list.dropna(subset=[self.postcode_colname])
# We clean up portential non-breaking spaces, and double spaces
for col in [
c for c in [self.postcode_colname, self.full_address_colname, self.address1_colname] if
c is not None
]:
self.standardised_asset_list[col] = self.standardised_asset_list[col].astype(str)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace('\xa0', ' ', regex=False)
self.standardised_asset_list[col] = self.standardised_asset_list[col].str.replace(' ', ' ', regex=False)
if self.address1_colname is None:
if self.address1_extraction_method is None:
raise ValueError("Missing address 1 - please specify an extraction method")
self.address1_colname = self.STANDARD_ADDRESS_1
# If we do not have this, we produce it
self.standardised_asset_list = self._extract_address1(
asset_list=self.standardised_asset_list,
full_address_col=self.full_address_colname,
postcode_col=self.postcode_colname,
method=self.address1_extraction_method
)
if self.full_address_colname is None:
if not self.full_address_cols_to_concat:
raise ValueError("Missing full address - please specify columns to concatenate")
self.full_address_colname = self.STANDARD_FULL_ADDRESS
self.standardised_asset_list[self.full_address_colname] = (
self.standardised_asset_list[self.full_address_cols_to_concat].apply(lambda x: ", ".join(x), axis=1)
)
else:
# Make sure to strip the postcode out of the full address
self.standardised_asset_list[self.full_address_colname] = self.standardised_asset_list.apply(
lambda x: self._strip_postcode_from_full_address(
full_address=x[self.full_address_colname],
postcode=x[self.postcode_colname]
),
axis=1
)
# We create the domna property id
self.create_property_id()
# We keep just the columns we care about and will work through the various columns and standardise
variables = [
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_uprn,
self.landlord_property_type,
self.landlord_year_built,
self.landlord_wall_construction,
self.landlord_heating_system,
self.landlord_existing_pv
]
rename = {}
if self.non_intrusives_present:
variables += self.NON_INTRUSIVES_COLNAMES
rename = {
**rename,
**dict(
zip(self.NON_INTRUSIVES_COLNAMES, ["non-intrusives: " + c for c in self.NON_INTRUSIVES_COLNAMES])
)
}
self.standardised_asset_list = self.standardised_asset_list[variables].rename(
columns=rename
)
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
].apply(lambda x: self._identify_multi_address(x))
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
# we see instances of "average thermal transmittance" in the description
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
"average thermal transmittance"
),
"new build - average thermal transmittance",
self.standardised_asset_list[self.landlord_wall_construction]
)
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
)
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
self.standardised_asset_list[self.landlord_year_built]
)
# Convert this to year
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].dt.year
)
else:
raise NotImplementedError("Year built column must be a datetime - implement me")
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
"standard_map": property_type_mappings.PROPERTY_MAPPING
},
self.landlord_wall_construction: {
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
},
self.landlord_heating_system: {
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
"standard_map": heating_mappings.HEATING_MAPPINGS
},
self.landlord_existing_pv: {
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
}
}
for variable, config in to_remap.items():
logger.info("Standardising variable: %s", variable)
values_to_remap = self.standardised_asset_list[variable].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
self.variable_mappings[variable] = remap_dictionary
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
# asset list is returned
for variable, mapping in self.variable_mappings.items():
pprint(f"Variable: {variable}")
pprint(mapping)
# Print a space
print("\n")
pprint("=======================================")
def apply_standardiation(self, override_empty_mappings=False):
"""
This function applies the standardisation to the asset list
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
if there are no categories which need remapping which is highly unlikely
:return:
"""
if not self.variable_mappings and not override_empty_mappings:
raise ValueError("Please run init_standardise first")
logger.info("Applying standardisation to asset list")
for variable, mapping in self.variable_mappings.items():
self.standardised_asset_list[variable] = self.standardised_asset_list[variable].map(mapping)
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
# Drop the dupes
pprint(
f"There are {self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum()} duplicated "
f"addresses - dropping"
)
self.standardised_asset_list = self.standardised_asset_list[
~self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated()
]
def create_lookup_mappings(self):
pass