getting asset list class live

This commit is contained in:
Khalim Conn-Kowlessar 2025-02-19 22:12:29 +00:00
parent 0a643d80ad
commit ecf8e46c65
9 changed files with 420 additions and 25 deletions

6
.idea/terraform.xml generated Normal file
View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="TerraformProjectSettings">
<option name="toolPath" value="/opt/homebrew/bin/terraform" />
</component>
</project>

View file

@ -1,16 +1,200 @@
import os
import re
from datetime import datetime
from openai import OpenAI
import tiktoken
import numpy as np
import pandas as pd
from fuzzywuzzy import process
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
import asset_list.mappings.property_type as property_type_mappings
import asset_list.mappings.walls as walls_mappings
import asset_list.mappings.heating_systems as heating_mappings
import asset_list.mappings.exising_pv as existing_pv_mappings
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = {v.lower() for v in standard_values} # Normalize to lowercase
self.standard_map = {k.lower(): v.lower() for k, v in (standard_map or {}).items()} # Predefined mappings
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = 1000 # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map:
self.remap_dict[value] = self.standard_map[cleaned_value]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")
class AssetList:
"""
This class is used to standardise asset lists so that we can process the core information in a consistent manner.
"""
DATETIME_REMAP = {
"Pre 1900": datetime(year=1899, month=12, day=31),
}
# These are the accepted methods we have for cleaning the address1 column
ADDRESS_1_CLEANING_METHODS = [
"first_two_words", # This method will split on the fist two words, where the separator is a space
@ -19,15 +203,6 @@ class AssetList:
# "address1_extraction" # This method will use the NLP model to extract address1
]
STANDARD_PROPERTY_TYPES = [
"house",
"flat",
"bungalow",
"maisonette",
"park home",
"block house",
]
# Standard column Names
STANDARD_ADDRESS_1 = "domna_address_1"
STANDARD_POSTCODE = "domna_postcode"
@ -44,6 +219,15 @@ class AssetList:
# Regular expression for identifying if the address might point to multiple units
MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b')
# List of columns relating to the non-intrusive data
NON_INTRUSIVES_COLNAMES = [
"Archetype", "Construction", "Insulated", "Material", "CIGA Check Required",
"PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION",
"Any further surveyor notes", 'Surveyors Name'
]
#### Mapping for wall construction
def __init__(
self,
local_filepath,
@ -96,6 +280,8 @@ class AssetList:
"existing_pv": None
}
self.variable_mappings = {}
def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"):
if method not in self.ADDRESS_1_CLEANING_METHODS:
@ -149,7 +335,7 @@ class AssetList:
# We look for string in the form (x-y)
return bool(cls.MULTI_UNIT_REGEX.search(address1_section))
def standardise(self):
def init_standardise(self):
"""
This function is used to standardise the asset list
:return: standardised asset list
@ -202,19 +388,110 @@ class AssetList:
self.create_property_id()
# We keep just the columns we care about and will work through the various columns and standardise
self.standardised_asset_list = self.standardised_asset_list[
[
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_year_built,
self.landlord_uprn,
self.landlord_property_type,
]
variables = [
self.landlord_property_id,
self.DOMNA_PROPERTY_ID,
self.address1_colname,
self.postcode_colname,
self.full_address_colname,
self.landlord_uprn,
self.landlord_property_type,
self.landlord_year_built,
self.landlord_wall_construction,
self.landlord_heating_system,
self.landlord_existing_pv
]
rename = {}
if self.non_intrusives_present:
variables += self.NON_INTRUSIVES_COLNAMES
rename = {
**rename,
**dict(
zip(self.NON_INTRUSIVES_COLNAMES, ["non-intrusives: " + c for c in self.NON_INTRUSIVES_COLNAMES])
)
}
self.standardised_asset_list = self.standardised_asset_list[variables].rename(
columns=rename
)
# We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y)
self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[
self.full_address_colname
].apply(lambda x: self._identify_multi_address(x))
raise NotImplementedError
# We handle cleaning for walls, in the instance that the landlord provides us with EPC data and
# we see instances of "average thermal transmittance" in the description
self.standardised_asset_list[self.landlord_wall_construction] = np.where(
self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains(
"average thermal transmittance"
),
"new build - average thermal transmittance",
self.standardised_asset_list[self.landlord_wall_construction]
)
# Clear our build year column
# We attempt to process the year built column
if self.landlord_year_built is not None:
# We check if we have a datetime
if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime):
# We treat any string columns - with common values we see
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP)
)
self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime(
self.standardised_asset_list[self.landlord_year_built]
)
# Convert this to year
self.standardised_asset_list[self.landlord_year_built] = (
self.standardised_asset_list[self.landlord_year_built].dt.year
)
else:
raise NotImplementedError("Year built column must be a datetime - implement me")
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
"standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES,
"standard_map": property_type_mappings.PROPERTY_MAPPING
},
self.landlord_wall_construction: {
"standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS,
"standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS
},
self.landlord_heating_system: {
"standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS,
"standard_map": heating_mappings.HEATING_MAPPINGS
},
self.landlord_existing_pv: {
"standard_values": existing_pv_mappings.STANDARD_EXISTING_PV,
"standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS
}
}
for variable, config in to_remap.items():
logger.info("Standardising variable: %s", variable)
values_to_remap = self.standardised_asset_list[variable].unique()
# We want to map this to our standardised list of property types we're interested in
remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"])
remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist())
self.variable_mappings[variable] = remap_dictionary
# We now print out the variable mappings, which can be reviewed by the user, before the final standardised
# asset list is returned
def apply_standardiation(self, override_empty_mappings=False):
"""
This function applies the standardisation to the asset list
:param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant
if there are no categories which need remapping which is highly unlikely
:return:
"""
if not self.variable_mappings and not override_empty_mappings:
raise ValueError("Please run init_standardise first")
def create_lookup_mappings(self):
pass

1
asset_list/app.py Normal file
View file

@ -0,0 +1 @@
import os

View file

@ -0,0 +1,8 @@
STANDARD_EXISTING_PV = {
"already has PV", "no PV", "unknown"
}
EXISTING_PV_MAPPINGS = {
"NO": "no PV",
"YES": "already has PV",
}

View file

@ -0,0 +1,46 @@
STANDARD_HEATING_SYSTEMS = {
"gas combi boiler",
"electric storage heaters",
"district heating",
"gas condensing boiler",
"oil boiler",
"gas condensing combi",
"air source heat pump",
"boiler - other fuel",
"ground source heat pump",
"electric radiators",
"other",
"electric boiler",
"unknown",
"communal gas boiler",
}
HEATING_MAPPINGS = {
"Combi - GAS": "gas combi boiler",
"E7 Storage Heaters": "electric storage heaters",
"District heating system": "district heating",
"Condensing Boiler - GAS": "gas condensing boiler",
"Boiler Oil/other": "oil boiler",
"Condensing Combi - Gas": "gas condensing combi",
"Air Source Source Heat Pump": "air source heat pump",
"Biomass Boiler": "boiler - other fuel",
"Ground Source Heat Pump": "ground source heat pump",
"Electric Oil filled radiators": "electric radiators",
"Solid Fuel": "other",
"LPG Boiler": "boiler - other fuel",
"Electric Boiler": "electric boiler",
"No data": "unknown",
"Boiler Communal/Commercial - GAS": "communal gas boiler",
"Eco Electric Radiators": "electric radiators",
"Gas fire": "other",
"Backboiler - Solid fuel": "other",
}
# array(['Combi - GAS', 'E7 Storage Heaters', 'District heating system',
# 'Condensing Boiler - GAS', 'Boiler Oil/other',
# 'Condensing Combi - Gas', 'Air Source Source Heat Pump',
# 'Biomass Boiler', 'Ground Source Heat Pump',
# 'Electric Oil filled radiators', 'Solid Fuel', 'LPG Boiler',
# 'Electric Boiler', 'No data', 'Boiler Communal/Commercial - GAS',
# 'Eco Electric Radiators', 'Gas fire', 'Backboiler - Solid fuel'],
# dtype=object)

View file

@ -0,0 +1,16 @@
# These are the standard categories for property types
STANDARD_PROPERTY_TYPES = {
"house", "flat", "maisonette", "bungalow", "park home", "block house", "bedsit", "coach house",
"unknown", "other"
}
# This is a basic mapping that we use to map values that we've seen commonly to standard values
PROPERTY_MAPPING = {
"HOUSE": "house",
"FLAT": "flat",
"MAISONET": "maisonette",
"BUNGALOW": "bungalow",
"BLKHOUS": "block house",
"BEDSIT": "bedsit",
"COACHSE": "coach house",
}

View file

@ -0,0 +1,38 @@
STANDARD_WALL_CONSTRUCTIONS = {
"uninsulated cavity", "filled cavity", "partial insulated cavity", "timber frame", "solid brick",
"system built", "granite or whinstone", "other", "unknown", "sandstone or limestone", "cob",
"new build - average thermal transmittance",
}
WALL_CONSTRUCTION_MAPPINGS = {
"New Build - Average Thermal Transmittance": "new build - average thermal transmittance",
'Average thermal transmittance 0.25 W/m?K': 'unknown',
'Cavity wall, as built, insulated (assumed)': 'filled cavity',
'Average thermal transmittance 0.31 W/m?K': 'unknown',
'Cavity wall, as built, no insulation (assumed)': 'uninsulated cavity',
'Average thermal transmittance 0.30 W/m?K': 'unknown', 'Average thermal transmittance 0.28 W/m-¦K': 'unknown',
'Average thermal transmittance 0.25 W/m-¦K': 'unknown', 'Average thermal transmittance 0.21 W/m-¦K': 'unknown',
'Average thermal transmittance 0.20 W/m-¦K': 'unknown', 'Average thermal transmittance 0.29 W/m?K': 'unknown',
'Average thermal transmittance 0.16 W/m?K': 'unknown',
'Average thermal transmittance 0.27 W/m&#0178;K': 'unknown',
'Average thermal transmittance 0.15 W/m-¦K': 'unknown', 'Average thermal transmittance 0.23 W/m-¦K': 'unknown',
'Average thermal transmittance 0.18 W/m?K': 'unknown',
'Granite or whin, with internal insulation': 'granite or whinstone',
'Average thermal transmittance 0.22 W/m-¦K': 'unknown', 'Average thermal transmittance 0.24 W/m?K': 'unknown',
'Average thermal transmittance 0.16 W/m-¦K': 'unknown', 'Average thermal transmittance 0.35 W/m?K': 'unknown',
'Average thermal transmittance 0.26 W/m-¦K': 'unknown', 'Average thermal transmittance 0.62 W/m?K': 'unknown',
'Average thermal transmittance 0.64 W/m?K': 'unknown', 'Average thermal transmittance 0.61 W/m?K': 'unknown',
'Sandstone or limestone, as built, no insulation (assumed)': 'sandstone or limestone',
'Average thermal transmittance 0.33 W/m?K': 'unknown', 'Cavity wall,': 'unknown',
'Cavity wall, as built, partial insulation (assumed)': 'partial insulated cavity',
'Average thermal transmittance 0.29 W/m-¦K': 'unknown', 'Average thermal transmittance 0.32 W/m-¦K': 'unknown',
'Average thermal transmittance 0.19 W/m-¦K': 'unknown', 'Average thermal transmittance 0.27 W/m?K': 'unknown',
'Average thermal transmittance 0.22 W/m?K': 'unknown', 'Average thermal transmittance 0.38 W/m?K': 'unknown',
'Average thermal transmittance 0.26 W/m?K': 'unknown', 'Average thermal transmittance 0.27 W/m-¦K': 'unknown',
'Average thermal transmittance 0.18 W/m-¦K': 'unknown', 'Average thermal transmittance = 0.27 W/m?K': 'unknown',
'Cavity wall, with external insulation': 'filled cavity', 'Average thermal transmittance 0.21 W/m?K': 'unknown',
'Average thermal transmittance 0.23 W/m?K': 'unknown', 'Average thermal transmittance 0.20 W/m?K': 'unknown',
'Average thermal transmittance 0.32 W/m?K': 'unknown', 'Average thermal transmittance 0.24 W/m-¦K': 'unknown',
'Cavity wall, with internal insulation': 'filled cavity',
'Average thermal transmittance 0.17 W/m-¦K': 'unknown', 'Average thermal transmittance 0.28 W/m?K': 'unknown'
}

View file

@ -5,4 +5,6 @@ pydantic-settings==2.6.0
epc-api-python==1.0.2
fuzzywuzzy
boto3
openpyxl
openpyxl
openai
tiktoken

View file

@ -364,10 +364,11 @@ def app():
landlord_heating_system="Heat Source",
landlord_existing_pv="PV (Y/N)"
)
self.standardised_asset_list(
# In here, we might want to pass some specific remaps
self.init_standardise(
)
self.apply_transformations()
# DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester"
# DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx"
# SHEET_NAME = "Sheet1"