diff --git a/.idea/terraform.xml b/.idea/terraform.xml new file mode 100644 index 00000000..cd46a3d3 --- /dev/null +++ b/.idea/terraform.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py index fde24fe2..e61cc89b 100644 --- a/asset_list/AssetList.py +++ b/asset_list/AssetList.py @@ -1,16 +1,200 @@ +import os import re +from datetime import datetime +from openai import OpenAI +import tiktoken +import numpy as np import pandas as pd +from fuzzywuzzy import process from utils.logger import setup_logger from backend.SearchEpc import SearchEpc +import asset_list.mappings.property_type as property_type_mappings +import asset_list.mappings.walls as walls_mappings +import asset_list.mappings.heating_systems as heating_mappings +import asset_list.mappings.exising_pv as existing_pv_mappings logger = setup_logger() +# OpenAI API Key (set this in your environment variables for security) +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") + + +class DataRemapper: + def __init__(self, standard_values, standard_map=None, max_tokens=1000): + """ + Initialize the remapper with standard values and a predefined mapping. + + :param standard_values: Set of allowed standardized values. + :param standard_map: Dictionary of common remappings {raw_value: standard_value}. + """ + self.standard_values = {v.lower() for v in standard_values} # Normalize to lowercase + self.standard_map = {k.lower(): v.lower() for k, v in (standard_map or {}).items()} # Predefined mappings + self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity + self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing + + # Tokenizer for counting tokens + self.tokenizer = tiktoken.encoding_for_model(self.ai_model) + + # Track token usage and remap dictionary + self.total_tokens_used = 0 + self.total_cost = 0 + self.remap_dict = {} # {original_value: standardized_value} + self.max_tokens = 1000 # Limit for OpenAI API + + # Memoization for AI calls + self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}} + # Capture the reponse for debugging + self.ai_response = None + + # OpenAI pricing (as of Feb 2024) + self.pricing = { + "gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000}, + "gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000}, + } + + self.openai_client = OpenAI(api_key=OPENAI_API_KEY) + + @staticmethod + def clean_string(text): + """Basic text cleaning: remove extra spaces, punctuation, and normalize case.""" + if not isinstance(text, str): + return None + text = text.strip().lower() + text = re.sub(r'[^\w\s]', '', text) # Remove punctuation + return text + + def fuzzy_match(self, text): + """Use fuzzy matching to find the closest standard value.""" + match, score = process.extractOne(text, self.standard_values) if text else (None, 0) + return match if score >= self.fuzzy_threshold else None + + def count_tokens(self, text): + """Estimate the number of tokens in a given text.""" + return len(self.tokenizer.encode(text)) if text else 0 + + def ai_standardize(self, unmapped_values): + """Call OpenAI API **once** for all unmapped values to minimize cost, with memoization.""" + if not unmapped_values: + return {} + + unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization + if unmapped_tuple in self.ai_cache: + return self.ai_cache[unmapped_tuple] # Return memoized result + + prompt = f""" + You are an expert in data classification. Standardize each of these values into one of the categories: + {list(self.standard_values)}. + + Return only a JSON dictionary where: + - The keys are the original values. + - The values are the standardized ones. + + Strictly return JSON **without markdown formatting** or extra text. + + Example Output: + {{ + "BLKHOUS": "block house", + "BEDSIT": "bedsit" + }} + + Values to standardize: + {unmapped_values} + """ + + # Count input tokens + input_tokens = self.count_tokens(prompt) + if input_tokens > self.max_tokens: + raise ValueError("Input tokens exceed the maximum limit.") + + response = self.openai_client.chat.completions.create( + model=self.ai_model, + messages=[{"role": "user", "content": prompt}], + max_tokens=self.max_tokens, + temperature=0.1, + ) + + output_text = response.choices[0].message.content.strip() + output_tokens = self.count_tokens(output_text) # Count output tokens + + # Track total token usage + self.total_tokens_used += input_tokens + output_tokens + + # Estimate cost + input_cost = input_tokens * self.pricing[self.ai_model]["input"] + output_cost = output_tokens * self.pricing[self.ai_model]["output"] + self.total_cost += input_cost + output_cost + + try: + # Parse response as dictionary + mapping = eval(output_text) # OpenAI should return a valid dictionary + except: + mapping = {val: "unknown" for val in unmapped_values} # Fallback + + # Memoize the AI response + self.ai_cache[unmapped_tuple] = mapping + # We store the raw AI response for debugging + logger.debug(f"AI Response: {mapping}") + self.ai_response = output_text + + return mapping + + def standardize_list(self, values_to_remap): + """ + Standardizes a list of values and returns a dictionary {original_value: standardized_value}. + + :param values_to_remap: List of raw values to standardize. + :return: Dictionary {original_value: standardized_value}. + """ + unique_values = set(values_to_remap) # Process only unique values + + unmapped_values = [] + for value in unique_values: + if pd.isna(value): # Handle NaN values + self.remap_dict[value] = "unknown" + continue + + cleaned_value = self.clean_string(value) + + # Rule-Based Check (Predefined Mapping) + if cleaned_value in self.standard_map: + self.remap_dict[value] = self.standard_map[cleaned_value] + continue + + # Exact Match in Standard Values + if cleaned_value in self.standard_values: + self.remap_dict[value] = cleaned_value + continue + + # Fuzzy Matching + fuzzy_match = self.fuzzy_match(cleaned_value) + if fuzzy_match: + self.remap_dict[value] = fuzzy_match + continue + + # Capture anything that wasn't mapped + unmapped_values.append(value) + + # AI Model - remap anything unmapped (batch request) + ai_mapping = self.ai_standardize(unmapped_values) + self.remap_dict.update(ai_mapping) + + return self.remap_dict + + def report_usage(self): + """Prints a summary of token usage and cost.""" + print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}") + print(f"💰 Estimated Cost: ${self.total_cost:.4f}") + class AssetList: """ This class is used to standardise asset lists so that we can process the core information in a consistent manner. """ + DATETIME_REMAP = { + "Pre 1900": datetime(year=1899, month=12, day=31), + } + # These are the accepted methods we have for cleaning the address1 column ADDRESS_1_CLEANING_METHODS = [ "first_two_words", # This method will split on the fist two words, where the separator is a space @@ -19,15 +203,6 @@ class AssetList: # "address1_extraction" # This method will use the NLP model to extract address1 ] - STANDARD_PROPERTY_TYPES = [ - "house", - "flat", - "bungalow", - "maisonette", - "park home", - "block house", - ] - # Standard column Names STANDARD_ADDRESS_1 = "domna_address_1" STANDARD_POSTCODE = "domna_postcode" @@ -44,6 +219,15 @@ class AssetList: # Regular expression for identifying if the address might point to multiple units MULTI_UNIT_REGEX = re.compile(r'\b([A-Za-z0-9]+)-([A-Za-z0-9]+)\b') + # List of columns relating to the non-intrusive data + NON_INTRUSIVES_COLNAMES = [ + "Archetype", "Construction", "Insulated", "Material", "CIGA Check Required", + "PV, ACCESS ISSUE, SEE NOTES", "OFF GAS - ROOF ORIENTATION", + "Any further surveyor notes", 'Surveyors Name' + ] + + #### Mapping for wall construction + def __init__( self, local_filepath, @@ -96,6 +280,8 @@ class AssetList: "existing_pv": None } + self.variable_mappings = {} + def _extract_address1(self, asset_list, full_address_col, postcode_col, method="first_two_words"): if method not in self.ADDRESS_1_CLEANING_METHODS: @@ -149,7 +335,7 @@ class AssetList: # We look for string in the form (x-y) return bool(cls.MULTI_UNIT_REGEX.search(address1_section)) - def standardise(self): + def init_standardise(self): """ This function is used to standardise the asset list :return: standardised asset list @@ -202,19 +388,110 @@ class AssetList: self.create_property_id() # We keep just the columns we care about and will work through the various columns and standardise - self.standardised_asset_list = self.standardised_asset_list[ - [ - self.landlord_property_id, - self.DOMNA_PROPERTY_ID, - self.address1_colname, - self.postcode_colname, - self.full_address_colname, - self.landlord_year_built, - self.landlord_uprn, - self.landlord_property_type, - ] + variables = [ + self.landlord_property_id, + self.DOMNA_PROPERTY_ID, + self.address1_colname, + self.postcode_colname, + self.full_address_colname, + self.landlord_uprn, + self.landlord_property_type, + self.landlord_year_built, + self.landlord_wall_construction, + self.landlord_heating_system, + self.landlord_existing_pv ] + rename = {} + + if self.non_intrusives_present: + variables += self.NON_INTRUSIVES_COLNAMES + rename = { + **rename, + **dict( + zip(self.NON_INTRUSIVES_COLNAMES, ["non-intrusives: " + c for c in self.NON_INTRUSIVES_COLNAMES]) + ) + } + + self.standardised_asset_list = self.standardised_asset_list[variables].rename( + columns=rename + ) # We idenfiy addresses which are likely to be multi-addresses (i.g are rooms x-y) + self.standardised_asset_list["is_multi_address"] = self.standardised_asset_list[ + self.full_address_colname + ].apply(lambda x: self._identify_multi_address(x)) - raise NotImplementedError + # We handle cleaning for walls, in the instance that the landlord provides us with EPC data and + # we see instances of "average thermal transmittance" in the description + self.standardised_asset_list[self.landlord_wall_construction] = np.where( + self.standardised_asset_list[self.landlord_wall_construction].str.lower().str.contains( + "average thermal transmittance" + ), + "new build - average thermal transmittance", + self.standardised_asset_list[self.landlord_wall_construction] + ) + + # Clear our build year column + + # We attempt to process the year built column + if self.landlord_year_built is not None: + # We check if we have a datetime + if isinstance(self.standardised_asset_list[self.landlord_year_built].iloc[0], datetime): + # We treat any string columns - with common values we see + self.standardised_asset_list[self.landlord_year_built] = ( + self.standardised_asset_list[self.landlord_year_built].replace(self.DATETIME_REMAP) + ) + + self.standardised_asset_list[self.landlord_year_built] = pd.to_datetime( + self.standardised_asset_list[self.landlord_year_built] + ) + # Convert this to year + self.standardised_asset_list[self.landlord_year_built] = ( + self.standardised_asset_list[self.landlord_year_built].dt.year + ) + else: + raise NotImplementedError("Year built column must be a datetime - implement me") + + # We now create standard lookups + to_remap = { + self.landlord_property_type: { + "standard_values": property_type_mappings.STANDARD_PROPERTY_TYPES, + "standard_map": property_type_mappings.PROPERTY_MAPPING + }, + self.landlord_wall_construction: { + "standard_values": walls_mappings.STANDARD_WALL_CONSTRUCTIONS, + "standard_map": walls_mappings.WALL_CONSTRUCTION_MAPPINGS + }, + self.landlord_heating_system: { + "standard_values": heating_mappings.STANDARD_HEATING_SYSTEMS, + "standard_map": heating_mappings.HEATING_MAPPINGS + }, + self.landlord_existing_pv: { + "standard_values": existing_pv_mappings.STANDARD_EXISTING_PV, + "standard_map": existing_pv_mappings.EXISTING_PV_MAPPINGS + } + } + + for variable, config in to_remap.items(): + logger.info("Standardising variable: %s", variable) + values_to_remap = self.standardised_asset_list[variable].unique() + # We want to map this to our standardised list of property types we're interested in + remapper = DataRemapper(standard_values=config["standard_values"], standard_map=config["standard_map"]) + remap_dictionary = remapper.standardize_list(values_to_remap=values_to_remap.tolist()) + self.variable_mappings[variable] = remap_dictionary + + # We now print out the variable mappings, which can be reviewed by the user, before the final standardised + # asset list is returned + + def apply_standardiation(self, override_empty_mappings=False): + """ + This function applies the standardisation to the asset list + :param override_empty_mappings: If true, will override the check for empty mappings. This is only relevant + if there are no categories which need remapping which is highly unlikely + :return: + """ + if not self.variable_mappings and not override_empty_mappings: + raise ValueError("Please run init_standardise first") + + def create_lookup_mappings(self): + pass diff --git a/asset_list/app.py b/asset_list/app.py new file mode 100644 index 00000000..21b405d8 --- /dev/null +++ b/asset_list/app.py @@ -0,0 +1 @@ +import os diff --git a/asset_list/mappings/exising_pv.py b/asset_list/mappings/exising_pv.py new file mode 100644 index 00000000..1e45bd83 --- /dev/null +++ b/asset_list/mappings/exising_pv.py @@ -0,0 +1,8 @@ +STANDARD_EXISTING_PV = { + "already has PV", "no PV", "unknown" +} + +EXISTING_PV_MAPPINGS = { + "NO": "no PV", + "YES": "already has PV", +} diff --git a/asset_list/mappings/heating_systems.py b/asset_list/mappings/heating_systems.py new file mode 100644 index 00000000..4fce39ab --- /dev/null +++ b/asset_list/mappings/heating_systems.py @@ -0,0 +1,46 @@ +STANDARD_HEATING_SYSTEMS = { + "gas combi boiler", + "electric storage heaters", + "district heating", + "gas condensing boiler", + "oil boiler", + "gas condensing combi", + "air source heat pump", + "boiler - other fuel", + "ground source heat pump", + "electric radiators", + "other", + "electric boiler", + "unknown", + "communal gas boiler", +} + +HEATING_MAPPINGS = { + "Combi - GAS": "gas combi boiler", + "E7 Storage Heaters": "electric storage heaters", + "District heating system": "district heating", + "Condensing Boiler - GAS": "gas condensing boiler", + "Boiler Oil/other": "oil boiler", + "Condensing Combi - Gas": "gas condensing combi", + "Air Source Source Heat Pump": "air source heat pump", + "Biomass Boiler": "boiler - other fuel", + "Ground Source Heat Pump": "ground source heat pump", + "Electric Oil filled radiators": "electric radiators", + "Solid Fuel": "other", + "LPG Boiler": "boiler - other fuel", + "Electric Boiler": "electric boiler", + "No data": "unknown", + "Boiler Communal/Commercial - GAS": "communal gas boiler", + "Eco Electric Radiators": "electric radiators", + "Gas fire": "other", + "Backboiler - Solid fuel": "other", +} + +# array(['Combi - GAS', 'E7 Storage Heaters', 'District heating system', +# 'Condensing Boiler - GAS', 'Boiler Oil/other', +# 'Condensing Combi - Gas', 'Air Source Source Heat Pump', +# 'Biomass Boiler', 'Ground Source Heat Pump', +# 'Electric Oil filled radiators', 'Solid Fuel', 'LPG Boiler', +# 'Electric Boiler', 'No data', 'Boiler Communal/Commercial - GAS', +# 'Eco Electric Radiators', 'Gas fire', 'Backboiler - Solid fuel'], +# dtype=object) diff --git a/asset_list/mappings/property_type.py b/asset_list/mappings/property_type.py new file mode 100644 index 00000000..bcad9ede --- /dev/null +++ b/asset_list/mappings/property_type.py @@ -0,0 +1,16 @@ +# These are the standard categories for property types +STANDARD_PROPERTY_TYPES = { + "house", "flat", "maisonette", "bungalow", "park home", "block house", "bedsit", "coach house", + "unknown", "other" +} + +# This is a basic mapping that we use to map values that we've seen commonly to standard values +PROPERTY_MAPPING = { + "HOUSE": "house", + "FLAT": "flat", + "MAISONET": "maisonette", + "BUNGALOW": "bungalow", + "BLKHOUS": "block house", + "BEDSIT": "bedsit", + "COACHSE": "coach house", +} diff --git a/asset_list/mappings/walls.py b/asset_list/mappings/walls.py new file mode 100644 index 00000000..7dec7d12 --- /dev/null +++ b/asset_list/mappings/walls.py @@ -0,0 +1,38 @@ +STANDARD_WALL_CONSTRUCTIONS = { + "uninsulated cavity", "filled cavity", "partial insulated cavity", "timber frame", "solid brick", + "system built", "granite or whinstone", "other", "unknown", "sandstone or limestone", "cob", + "new build - average thermal transmittance", +} + +WALL_CONSTRUCTION_MAPPINGS = { + "New Build - Average Thermal Transmittance": "new build - average thermal transmittance", + 'Average thermal transmittance 0.25 W/m?K': 'unknown', + 'Cavity wall, as built, insulated (assumed)': 'filled cavity', + 'Average thermal transmittance 0.31 W/m?K': 'unknown', + 'Cavity wall, as built, no insulation (assumed)': 'uninsulated cavity', + 'Average thermal transmittance 0.30 W/m?K': 'unknown', 'Average thermal transmittance 0.28 W/m-¦K': 'unknown', + 'Average thermal transmittance 0.25 W/m-¦K': 'unknown', 'Average thermal transmittance 0.21 W/m-¦K': 'unknown', + 'Average thermal transmittance 0.20 W/m-¦K': 'unknown', 'Average thermal transmittance 0.29 W/m?K': 'unknown', + 'Average thermal transmittance 0.16 W/m?K': 'unknown', + 'Average thermal transmittance 0.27 W/m²K': 'unknown', + 'Average thermal transmittance 0.15 W/m-¦K': 'unknown', 'Average thermal transmittance 0.23 W/m-¦K': 'unknown', + 'Average thermal transmittance 0.18 W/m?K': 'unknown', + 'Granite or whin, with internal insulation': 'granite or whinstone', + 'Average thermal transmittance 0.22 W/m-¦K': 'unknown', 'Average thermal transmittance 0.24 W/m?K': 'unknown', + 'Average thermal transmittance 0.16 W/m-¦K': 'unknown', 'Average thermal transmittance 0.35 W/m?K': 'unknown', + 'Average thermal transmittance 0.26 W/m-¦K': 'unknown', 'Average thermal transmittance 0.62 W/m?K': 'unknown', + 'Average thermal transmittance 0.64 W/m?K': 'unknown', 'Average thermal transmittance 0.61 W/m?K': 'unknown', + 'Sandstone or limestone, as built, no insulation (assumed)': 'sandstone or limestone', + 'Average thermal transmittance 0.33 W/m?K': 'unknown', 'Cavity wall,': 'unknown', + 'Cavity wall, as built, partial insulation (assumed)': 'partial insulated cavity', + 'Average thermal transmittance 0.29 W/m-¦K': 'unknown', 'Average thermal transmittance 0.32 W/m-¦K': 'unknown', + 'Average thermal transmittance 0.19 W/m-¦K': 'unknown', 'Average thermal transmittance 0.27 W/m?K': 'unknown', + 'Average thermal transmittance 0.22 W/m?K': 'unknown', 'Average thermal transmittance 0.38 W/m?K': 'unknown', + 'Average thermal transmittance 0.26 W/m?K': 'unknown', 'Average thermal transmittance 0.27 W/m-¦K': 'unknown', + 'Average thermal transmittance 0.18 W/m-¦K': 'unknown', 'Average thermal transmittance = 0.27 W/m?K': 'unknown', + 'Cavity wall, with external insulation': 'filled cavity', 'Average thermal transmittance 0.21 W/m?K': 'unknown', + 'Average thermal transmittance 0.23 W/m?K': 'unknown', 'Average thermal transmittance 0.20 W/m?K': 'unknown', + 'Average thermal transmittance 0.32 W/m?K': 'unknown', 'Average thermal transmittance 0.24 W/m-¦K': 'unknown', + 'Cavity wall, with internal insulation': 'filled cavity', + 'Average thermal transmittance 0.17 W/m-¦K': 'unknown', 'Average thermal transmittance 0.28 W/m?K': 'unknown' +} diff --git a/asset_list/requirements.txt b/asset_list/requirements.txt index d6d64471..0c16c43a 100644 --- a/asset_list/requirements.txt +++ b/asset_list/requirements.txt @@ -5,4 +5,6 @@ pydantic-settings==2.6.0 epc-api-python==1.0.2 fuzzywuzzy boto3 -openpyxl \ No newline at end of file +openpyxl +openai +tiktoken \ No newline at end of file diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py index fcf11765..ca5195d6 100644 --- a/etl/route_march_data_pull/app.py +++ b/etl/route_march_data_pull/app.py @@ -364,10 +364,11 @@ def app(): landlord_heating_system="Heat Source", landlord_existing_pv="PV (Y/N)" ) - self.standardised_asset_list( - # In here, we might want to pass some specific remaps + self.init_standardise( ) + self.apply_transformations() + # DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester" # DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx" # SHEET_NAME = "Sheet1"