diff --git a/.idea/Model.iml b/.idea/Model.iml
index 96ad7a95..df6c4faa 100644
--- a/.idea/Model.iml
+++ b/.idea/Model.iml
@@ -7,7 +7,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index fb10c6b0..50cad4ca 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/asset_list/DataMapper.py b/asset_list/DataMapper.py
new file mode 100644
index 00000000..ac1b8db3
--- /dev/null
+++ b/asset_list/DataMapper.py
@@ -0,0 +1,178 @@
+# OpenAI API Key (set this in your environment variables for security)
+OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
+
+
+class DataRemapper:
+ def __init__(self, standard_values, standard_map=None, max_tokens=1000):
+ """
+ Initialize the remapper with standard values and a predefined mapping.
+
+ :param standard_values: Set of allowed standardized values.
+ :param standard_map: Dictionary of common remappings {raw_value: standard_value}.
+ """
+ self.standard_values = standard_values
+ self.standard_map = standard_map
+ self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
+ self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
+
+ # Tokenizer for counting tokens
+ self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
+
+ # Track token usage and remap dictionary
+ self.total_tokens_used = 0
+ self.total_cost = 0
+ self.remap_dict = {} # {original_value: standardized_value}
+ self.max_tokens = max_tokens # Limit for OpenAI API
+
+ # Memoization for AI calls
+ self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
+ # Capture the reponse for debugging
+ self.ai_response = None
+
+ # OpenAI pricing (as of Feb 2024)
+ self.pricing = {
+ "gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
+ "gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
+ }
+
+ self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
+
+ @staticmethod
+ def clean_string(text):
+ """Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
+ if not isinstance(text, str):
+ return None
+ text = text.strip().lower()
+ text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
+ # Replace double strings
+ text = re.sub(r'\s+', ' ', text)
+ return text
+
+ def fuzzy_match(self, text):
+ """Use fuzzy matching to find the closest standard value."""
+ match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
+ return match if score >= self.fuzzy_threshold else None
+
+ def count_tokens(self, text):
+ """Estimate the number of tokens in a given text."""
+ return len(self.tokenizer.encode(text)) if text else 0
+
+ def ai_standardize(self, unmapped_values):
+ """Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
+ if not unmapped_values:
+ return {}
+
+ unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
+ if unmapped_tuple in self.ai_cache:
+ return self.ai_cache[unmapped_tuple] # Return memoized result
+
+ prompt = f"""
+ You are an expert in data classification. Standardize each of these values into one of the categories:
+ {list(self.standard_values)}.
+
+ Return only a JSON dictionary where:
+ - The keys are the original values.
+ - The values are the standardized ones.
+
+ Strictly return JSON **without markdown formatting** or extra text.
+
+ Example Output:
+ {{
+ "BLKHOUS": "block house",
+ "BEDSIT": "bedsit"
+ }}
+
+ Values to standardize:
+ {unmapped_values}
+ """
+
+ # Count input tokens
+ input_tokens = self.count_tokens(prompt)
+ if input_tokens > self.max_tokens:
+ raise ValueError("Input tokens exceed the maximum limit.")
+
+ logger.info("Calling OpenAI API for standardization...")
+ response = self.openai_client.chat.completions.create(
+ model=self.ai_model,
+ messages=[{"role": "user", "content": prompt}],
+ max_tokens=self.max_tokens,
+ temperature=0.1,
+ )
+
+ output_text = response.choices[0].message.content.strip()
+ output_tokens = self.count_tokens(output_text) # Count output tokens
+
+ # Track total token usage
+ self.total_tokens_used += input_tokens + output_tokens
+
+ # Estimate cost
+ input_cost = input_tokens * self.pricing[self.ai_model]["input"]
+ output_cost = output_tokens * self.pricing[self.ai_model]["output"]
+ self.total_cost += input_cost + output_cost
+
+ try:
+ # Parse response as dictionary
+ mapping = eval(output_text) # OpenAI should return a valid dictionary
+ except:
+ mapping = {val: "unknown" for val in unmapped_values} # Fallback
+
+ # Memoize the AI response
+ self.ai_cache[unmapped_tuple] = mapping
+ # We store the raw AI response for debugging
+ logger.debug(f"AI Response: {mapping}")
+ self.ai_response = output_text
+
+ return mapping
+
+ def standardize_list(self, values_to_remap):
+ """
+ Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
+
+ :param values_to_remap: List of raw values to standardize.
+ :return: Dictionary {original_value: standardized_value}.
+ """
+ unique_values = set(values_to_remap) # Process only unique values
+
+ unmapped_values = []
+ for value in unique_values:
+ if pd.isna(value): # Handle NaN values
+ self.remap_dict[value] = "unknown"
+ continue
+
+ cleaned_value = self.clean_string(value)
+
+ # Rule-Based Check (Predefined Mapping)
+ if cleaned_value in self.standard_map or value in self.standard_map:
+ self.remap_dict[value] = (
+ self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
+ )
+ continue
+
+ if value.lower() in self.standard_map:
+ self.remap_dict[value] = self.standard_map[value.lower()]
+ continue
+
+ # Exact Match in Standard Values
+ if cleaned_value in self.standard_values:
+ self.remap_dict[value] = cleaned_value
+ continue
+
+ # Fuzzy Matching
+ fuzzy_match = self.fuzzy_match(cleaned_value)
+ if fuzzy_match:
+ self.remap_dict[value] = fuzzy_match
+ continue
+
+ # Capture anything that wasn't mapped
+ unmapped_values.append(value)
+
+ # AI Model - remap anything unmapped (batch request)
+ ai_mapping = self.ai_standardize(unmapped_values)
+ self.remap_dict.update(ai_mapping)
+
+ return self.remap_dict
+
+ def report_usage(self):
+ """Prints a summary of token usage and cost."""
+ print(f"\nš¹ Total Tokens Used: {self.total_tokens_used}")
+ print(f"š° Estimated Cost: ${self.total_cost:.4f}")
diff --git a/asset_list/app.py b/asset_list/app.py
index e9cd7c3f..088f1603 100644
--- a/asset_list/app.py
+++ b/asset_list/app.py
@@ -1,9 +1,6 @@
import os
-import time
import json
import pandas as pd
-import numpy as np
-from tqdm import tqdm
from pprint import pprint
import msgpack
from utils.s3 import read_from_s3
@@ -13,181 +10,15 @@ from asset_list.mappings.built_form import BUILT_FORM_MAPPINGS
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
+from asset_list.utils import get_data
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
-from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
-def get_data(
- df, manual_uprn_map, epc_api_only=False, row_id_name="row_id"
-):
- uprn_column = AssetList.STANDARD_UPRN
- fulladdress_column = AssetList.STANDARD_FULL_ADDRESS
- address1_column = AssetList.STANDARD_ADDRESS_1
- postcode_column = AssetList.STANDARD_POSTCODE
-
- # These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
- property_type_map = {
- "house": "House",
- "flat": "Flat",
- "maisonette": "Maisonette",
- "bungalow": "Bungalow",
- "block house": "House",
- "coach house": "House",
- "bedsit": "Flat"
- }
-
- built_form_map = {
- "mid-terrace": "Mid-Terrace",
- "end-terrace": "End-Terrace",
- "semi-detached": "Semi-Detached",
- "detached": "Detached"
- }
-
- epc_data = []
- errors = []
- no_epc = []
- for _, home in tqdm(df.iterrows(), total=len(df)):
- try:
-
- # If we have a block of flats, we cannot retrieve this data
- if home[AssetList.STANDARD_PROPERTY_TYPE] == "block of flats":
- no_epc.append(home[row_id_name])
- continue
-
- postcode = home[postcode_column]
- house_number = str(home[address1_column]).strip()
- full_address = home[fulladdress_column].strip()
- house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
- if house_no is None:
- house_no = house_number
- uprn = manual_uprn_map.get(full_address, None)
- if uprn is None and home.get(uprn_column):
- uprn = home[uprn_column]
-
- if pd.isnull(uprn):
- uprn = None
-
- property_type = property_type_map.get(home[AssetList.STANDARD_PROPERTY_TYPE], None)
- built_form = built_form_map.get(home[AssetList.STANDARD_BUILT_FORM])
-
- searcher = SearchEpc(
- address1=str(house_no),
- postcode=postcode,
- auth_token=EPC_AUTH_TOKEN,
- os_api_key="",
- property_type=None,
- fast=True,
- full_address=full_address,
- max_retries=5,
- uprn=uprn
- )
- # Force the skipping of estimating the EPC
- searcher.ordnance_survey_client.property_type = None
- searcher.ordnance_survey_client.built_form = None
-
- searcher.find_property(skip_os=True)
-
- # Check if we have a flat or appartment
- if searcher.newest_epc is None and uprn is None:
- # Try again:
- if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
- # Backup
- add1 = full_address.split(",")
- if len(add1) > 1:
- add1 = add1[1].strip()
- else:
- # Try splitting on space
- add1 = full_address.split(" ")[0].strip()
-
- else:
- add1 = str(house_number)
- searcher = SearchEpc(
- address1=add1,
- postcode=postcode,
- auth_token=EPC_AUTH_TOKEN,
- os_api_key="",
- property_type=None,
- fast=True,
- full_address=full_address,
- max_retries=5
- )
-
- if (
- "flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
- house_number.lower()
- ):
- searcher.ordnance_survey_client.property_type = "Flat"
-
- searcher.find_property(skip_os=True)
-
- # As a final resort, we estimate the EPC
- if property_type is not None and searcher.newest_epc is None:
- searcher.ordnance_survey_client.property_type = property_type
- searcher.ordnance_survey_client.built_form = built_form
- searcher.find_property(skip_os=True)
-
- if searcher.newest_epc is None:
- no_epc.append(home[row_id_name])
- continue
-
- # Look for EPC recommendatons
- try:
- property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
- except:
- property_recommendations = {"rows": []}
-
- if epc_api_only:
- epc = {
- row_id_name: home[row_id_name],
- **searcher.newest_epc.copy(),
- "recommendations": property_recommendations["rows"]
- }
-
- epc_data.append(epc)
- continue
-
- # Retrieve data from FindMyEPC
- try:
- find_epc_searcher = RetrieveFindMyEpc(
- address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
- )
- find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
- except ValueError as e:
- if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
- try:
- find_epc_searcher = RetrieveFindMyEpc(
- address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
- )
- find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
- except ValueError as e:
- if "No EPC found" in str(e):
- find_epc_data = {}
- else:
- find_epc_data = {}
- except Exception as e:
- raise Exception(f"Error retrieving FindMyEPC data: {e}")
- time.sleep(np.random.uniform(0.1, 1))
-
- epc = {
- row_id_name: home[row_id_name],
- **searcher.newest_epc.copy(),
- "recommendations": property_recommendations["rows"],
- "find_my_epc_data": find_epc_data,
- }
-
- epc_data.append(epc)
- except Exception as e:
- errors.append(home[row_id_name])
- time.sleep(5)
-
- return epc_data, errors, no_epc
-
-
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
@@ -507,6 +338,12 @@ def app():
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
+ uprn_column=AssetList.STANDARD_UPRN,
+ fulladdress_column=AssetList.STANDARD_FULL_ADDRESS,
+ address1_column=AssetList.STANDARD_ADDRESS_1,
+ postcode_column=AssetList.STANDARD_POSTCODE,
+ property_type_column=AssetList.STANDARD_PROPERTY_TYPE,
+ built_form_column=AssetList.STANDARD_BUILT_FORM,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only
)
@@ -516,6 +353,10 @@ def app():
epc_data_failed, _, _ = get_data(
df=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
+ uprn_column=AssetList.STANDARD_UPRN,
+ fulladdress_column=AssetList.STANDARD_FULL_ADDRESS,
+ address1_column=AssetList.STANDARD_ADDRESS_1,
+ postcode_column=AssetList.STANDARD_POSTCODE,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only
)
diff --git a/asset_list/utils.py b/asset_list/utils.py
new file mode 100644
index 00000000..ff9db3f8
--- /dev/null
+++ b/asset_list/utils.py
@@ -0,0 +1,183 @@
+import time
+import numpy as np
+import pandas as pd
+from backend.SearchEpc import SearchEpc
+from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
+from tqdm import tqdm
+from utils.logger import setup_logger
+
+logger = setup_logger()
+
+
+def get_data(
+ df,
+ manual_uprn_map,
+ epc_auth_token,
+ uprn_column,
+ fulladdress_column,
+ address1_column,
+ postcode_column,
+ property_type_column,
+ built_form_column,
+ epc_api_only=False,
+ row_id_name="row_id",
+):
+ # These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
+ property_type_map = {
+ "house": "House",
+ "flat": "Flat",
+ "maisonette": "Maisonette",
+ "bungalow": "Bungalow",
+ "block house": "House",
+ "coach house": "House",
+ "bedsit": "Flat"
+ }
+
+ built_form_map = {
+ "mid-terrace": "Mid-Terrace",
+ "end-terrace": "End-Terrace",
+ "semi-detached": "Semi-Detached",
+ "detached": "Detached"
+ }
+
+ epc_data = []
+ errors = []
+ no_epc = []
+ for _, home in tqdm(df.iterrows(), total=len(df)):
+ try:
+
+ # If we have a block of flats, we cannot retrieve this data
+ if home.get(property_type_column) == "block of flats":
+ no_epc.append(home[row_id_name])
+ continue
+
+ postcode = home[postcode_column]
+ house_number = str(home[address1_column]).strip()
+ full_address = home[fulladdress_column].strip()
+ house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
+ if house_no is None:
+ house_no = house_number
+ uprn = manual_uprn_map.get(full_address, None)
+ if uprn is None and home.get(uprn_column):
+ uprn = home[uprn_column]
+
+ if pd.isnull(uprn):
+ uprn = None
+
+ property_type = property_type_map.get(home.get(property_type_column), None)
+ built_form = built_form_map.get(home.get(built_form_column))
+
+ searcher = SearchEpc(
+ address1=str(house_no),
+ postcode=postcode,
+ auth_token=epc_auth_token,
+ os_api_key="",
+ property_type=None,
+ fast=True,
+ full_address=full_address,
+ max_retries=5,
+ uprn=uprn
+ )
+ # Force the skipping of estimating the EPC
+ searcher.ordnance_survey_client.property_type = None
+ searcher.ordnance_survey_client.built_form = None
+
+ searcher.find_property(skip_os=True)
+
+ # Check if we have a flat or appartment
+ if searcher.newest_epc is None and uprn is None:
+ # Try again:
+ if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
+ # Backup
+ add1 = full_address.split(",")
+ if len(add1) > 1:
+ add1 = add1[1].strip()
+ else:
+ # Try splitting on space
+ add1 = full_address.split(" ")[0].strip()
+
+ else:
+ add1 = str(house_number)
+ searcher = SearchEpc(
+ address1=add1,
+ postcode=postcode,
+ auth_token=epc_auth_token,
+ os_api_key="",
+ property_type=None,
+ fast=True,
+ full_address=full_address,
+ max_retries=5
+ )
+
+ if (
+ "flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
+ house_number.lower()
+ ):
+ searcher.ordnance_survey_client.property_type = "Flat"
+
+ searcher.find_property(skip_os=True)
+
+ # As a final resort, we estimate the EPC
+ if property_type is not None and searcher.newest_epc is None:
+ searcher.ordnance_survey_client.property_type = property_type
+ searcher.ordnance_survey_client.built_form = built_form
+ searcher.find_property(skip_os=True)
+
+ if searcher.newest_epc is None:
+ no_epc.append(home[row_id_name])
+ continue
+
+ # Look for EPC recommendatons
+ try:
+ property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
+ except:
+ property_recommendations = {"rows": []}
+
+ if epc_api_only:
+ epc = {
+ row_id_name: home[row_id_name],
+ **searcher.newest_epc.copy(),
+ "recommendations": property_recommendations["rows"]
+ }
+
+ epc_data.append(epc)
+ continue
+
+ # Retrieve data from FindMyEPC
+ try:
+ find_epc_searcher = RetrieveFindMyEpc(
+ address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
+ )
+ find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
+ except ValueError as e:
+ if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
+ try:
+ find_epc_searcher = RetrieveFindMyEpc(
+ address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
+ )
+ find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
+ except ValueError as e:
+ if "No EPC found" in str(e):
+ find_epc_data = {}
+ else:
+ logger.error(f"Error retrieving FindMyEPC data: {e}")
+ raise Exception(f"Error retrieving FindMyEPC data: {e}")
+ else:
+ find_epc_data = {}
+ except Exception as e:
+ raise Exception(f"Error retrieving FindMyEPC data: {e}")
+ time.sleep(np.random.uniform(0.1, 1))
+
+ epc = {
+ row_id_name: home[row_id_name],
+ **searcher.newest_epc.copy(),
+ "recommendations": property_recommendations["rows"],
+ "find_my_epc_data": find_epc_data,
+ }
+
+ epc_data.append(epc)
+ except Exception as e:
+ errors.append(home[row_id_name])
+ time.sleep(5)
+
+ return epc_data, errors, no_epc
diff --git a/backend/Property.py b/backend/Property.py
index eaffd54d..498fe0e0 100644
--- a/backend/Property.py
+++ b/backend/Property.py
@@ -226,25 +226,20 @@ class Property:
# as we collect more data from the energy assessment
n_bathrooms = kwargs.get("n_bathrooms", None)
- if n_bathrooms not in [None, ""]:
- # We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
- n_bathrooms = int(round(float(n_bathrooms) + 1e-5))
+ # We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
+ n_bathrooms = int(round(float(n_bathrooms) + 1e-5)) if n_bathrooms not in [None, ""] else None
n_bedrooms = kwargs.get("n_bedrooms", None)
- if n_bedrooms not in [None, ""]:
- n_bedrooms = int(round(float(n_bedrooms) + 1e-5))
+ n_bedrooms = int(round(float(n_bedrooms) + 1e-5)) if n_bedrooms not in [None, ""] else None
number_of_floors = kwargs.get("number_of_floors", None)
- if number_of_floors not in [None, ""]:
- number_of_floors = int(round(float(number_of_floors) + 1e-5))
+ number_of_floors = int(round(float(number_of_floors) + 1e-5)) if number_of_floors not in [None, ""] else None
insulation_floor_area = kwargs.get("insulation_floor_area", None)
- if insulation_floor_area not in [None, ""]:
- insulation_floor_area = float(insulation_floor_area)
+ insulation_floor_area = float(insulation_floor_area) if insulation_floor_area not in [None, ""] else None
insulation_wall_area = kwargs.get("insulation_wall_area", None)
- if insulation_wall_area not in [None, ""]:
- insulation_wall_area = float(insulation_wall_area)
+ insulation_wall_area = float(insulation_wall_area) if insulation_wall_area not in [None, ""] else None
return {
"n_bathrooms": n_bathrooms,
diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py
index 0d921bec..d33b2e70 100644
--- a/backend/SearchEpc.py
+++ b/backend/SearchEpc.py
@@ -308,12 +308,20 @@ class SearchEpc:
self.data = output["response"]
return output["msg"]
+ if not self.uprn and not self.address1 and not self.postcode:
+ raise ValueError("No search parameters provided")
+
uprn_params = {"uprn": self.uprn} if self.uprn else {}
- address_params = {"address": self.address1, "postcode": self.postcode}
+ address_params = {}
+ if self.address1:
+ address_params["address"] = self.address1
+ if self.postcode:
+ address_params["postcode"] = self.postcode
# We attempt the search with uprn params
data = {"rows": []}
+ api_response = {}
if uprn_params:
api_response = self._get_epc(params=uprn_params, size=size)
if api_response["msg"]["status"] == 200:
@@ -321,14 +329,15 @@ class SearchEpc:
# If we were unsuccessful, we then make a second attempt to fetch the data. We find that
# properties are sometimes listed under the wrong UPRN
- api_response = self._get_epc(params=address_params, size=size)
- if api_response["msg"]["status"] == 200:
- # We update the data with the correct uprn
- if self.uprn:
- for x in api_response["response"]["rows"]:
- x["uprn"] = self.uprn
+ if address_params:
+ api_response = self._get_epc(params=address_params, size=size)
+ if api_response["msg"]["status"] == 200:
+ # We update the data with the correct uprn
+ if self.uprn:
+ for x in api_response["response"]["rows"]:
+ x["uprn"] = self.uprn
- data["rows"].extend(api_response["response"]["rows"])
+ data["rows"].extend(api_response["response"]["rows"])
# We no de-dupe on lmk-key to avoid duplicates
seen = set()
diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py
index 183503d5..31ae39bd 100644
--- a/backend/apis/GoogleSolarApi.py
+++ b/backend/apis/GoogleSolarApi.py
@@ -9,8 +9,7 @@ from tqdm import tqdm
from math import sin, cos, sqrt, atan2, radians
from utils.logger import setup_logger
-from recommendations.Costs import Costs, MCS_SOLAR_PV_COST_DATA
-from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel
+from recommendations.Costs import Costs
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.Property import Property
from backend.app.db.functions.solar_functions import get_solar_data, store_batch_data
@@ -54,6 +53,9 @@ class GoogleSolarApi:
# Max area of a roof space we allow panels for
PERCENTAGE_OF_ROOF_LIMIT = 0.8
+ # Error Messages
+ ENTITY_NOT_FOUND_ERROR = 'Requested entity was not found.'
+
def __init__(self, api_key, max_retries=5):
"""
Initialize the GoogleSolarApi class with the provided API key and maximum retries.
@@ -112,6 +114,13 @@ class GoogleSolarApi:
response.raise_for_status() # Raise an error for bad status codes
return response.json()
except requests.exceptions.RequestException as e:
+ if (
+ (e.response.status_code == 404) &
+ (e.response.json()["error"]["message"] == self.ENTITY_NOT_FOUND_ERROR)
+ ):
+ logger.warning("No building insights found for the given location.")
+ return {"error": self.ENTITY_NOT_FOUND_ERROR}
+
attempt += 1
print(f"Attempt {attempt} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
@@ -155,6 +164,11 @@ class GoogleSolarApi:
# If we have no data in the db, or updated_at is more than 6 months
if self.insights_data is None or is_outdated:
self.insights_data = self.get_building_insights(longitude, latitude, required_quality)
+ if self.insights_data.get("error") == self.ENTITY_NOT_FOUND_ERROR:
+ # We use default performance since in this case, we couldn't retrieve data. We don't store
+ self.panel_performance = self.default_panel_performance(property_instance=property_instance)
+
+ return
self.need_to_store = True
# Extract key data from the insights response
@@ -820,7 +834,6 @@ class GoogleSolarApi:
if unit["longitude"] is None or unit["latitude"] is None:
# At this point, we've checked that solar PV is valid, and so we provide some defaults
-
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": None,
@@ -875,19 +888,19 @@ class GoogleSolarApi:
cost_instance = Costs(property_instance=property_instance)
- # We return a 2.4 and 4 kwp system
+ # We return a 1.6 and 3.2 kwp system
panel_performance = pd.DataFrame(
[
{
- 'n_panels': 10,
- 'yearly_dc_energy': 4000 * 0.99, # Assumed 99% efficient wattage -> dc
+ 'n_panels': 8,
+ 'yearly_dc_energy': 3200 * assumptions.MEDIAN_WATTAGE_TO_DC,
'total_cost': cost_instance.solar_pv(
- n_panels=10, has_battery=False, n_floors=property_instance.number_of_floors
+ n_panels=8, has_battery=False, n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
- 'panneled_roof_area': 10 * assumptions.RDSAP_AREA_PER_PANEL,
- 'array_wattage': 4000,
- 'initial_ac_kwh_per_year': 4000 * 0.95, # Assumed 95% efficient wattage -> ac
+ 'panneled_roof_area': 8 * assumptions.RDSAP_AREA_PER_PANEL,
+ 'array_wattage': 3200,
+ 'initial_ac_kwh_per_year': 3200 * assumptions.MEDIAN_WATTAGE_TO_AC,
'lifetime_ac_kwh': None,
'lifetime_dc_kwh': None,
'roi': None,
@@ -899,15 +912,15 @@ class GoogleSolarApi:
'rank': None
},
{
- 'n_panels': 6,
- 'yearly_dc_energy': 2400 * 0.99, # Assumed 99% efficient wattage -> dc
+ 'n_panels': 4,
+ 'yearly_dc_energy': 1600 * assumptions.MEDIAN_WATTAGE_TO_DC,
'total_cost': cost_instance.solar_pv(
n_panels=6, has_battery=False, n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
- 'panneled_roof_area': 6 * assumptions.RDSAP_AREA_PER_PANEL,
- 'array_wattage': 2400,
- 'initial_ac_kwh_per_year': 2400 * 0.95, # Assumed 95% efficient wattage -> ac
+ 'panneled_roof_area': 4 * assumptions.RDSAP_AREA_PER_PANEL,
+ 'array_wattage': 1600,
+ 'initial_ac_kwh_per_year': 1600 * assumptions.MEDIAN_WATTAGE_TO_AC,
'lifetime_ac_kwh': None,
'lifetime_dc_kwh': None,
'roi': None,
diff --git a/backend/app/assumptions.py b/backend/app/assumptions.py
index 8d0c05be..261e2b62 100644
--- a/backend/app/assumptions.py
+++ b/backend/app/assumptions.py
@@ -11,6 +11,9 @@ SOLAR_CONSUMPTION_WITH_BATTERY_PROPORTION = 0.7
# Typically, each solar panel takes up around 3.4 m2 of roof space under RdSAP. This was been verified in Elmhurst
RDSAP_AREA_PER_PANEL = 3.4
+# This is a median based on a sample of properties
+MEDIAN_WATTAGE_TO_AC = 0.965
+MEDIAN_WATTAGE_TO_DC = 0.99
SOCIAL_TENURES = ["Rented (social)", "rental (social)"]
diff --git a/etl/customers/mod/pilot/1. Create Sample.py b/etl/customers/mod/pilot/1. Create Sample.py
index e1f9b444..97480d51 100644
--- a/etl/customers/mod/pilot/1. Create Sample.py
+++ b/etl/customers/mod/pilot/1. Create Sample.py
@@ -1,4 +1,17 @@
+import os
import pandas as pd
+from tqdm import tqdm
+from dotenv import load_dotenv
+from backend.SearchEpc import SearchEpc
+from etl.spatial.OpenUprnClient import OpenUprnClient
+from asset_list.utils import get_data
+from utils.s3 import save_csv_to_s3
+
+PORTFOLIO_ID = 139
+USER_ID = 8
+
+load_dotenv(dotenv_path="backend/.env")
+EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
@@ -9,26 +22,182 @@ def app():
folder_path = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme"
sample_list = pd.read_excel(f"{folder_path}/20250227_DIO_Accommodation_Sample_Properties.xlsx")
asset_data = pd.read_excel(f"{folder_path}/20250303_DIO_Accommodation_Property_Attribution.xlsx")
- asset_data["BLNDG_GOVERMENT_UPRN"] = asset_data["BLNDG_GOVERMENT_UPRN"].astype("Int64")
- asset_data["BLNDG_GOVERMENT_UPRN"].nunique()
- for _id in asset_data["ESTB_ID"].unique():
- data = asset_data[asset_data["ESTB_ID"] == _id]
- z = data["BLNDG_GOVERMENT_UPRN"]
+ sample_list = sample_list[sample_list["BLDNG_COUNTRY_NAME"].isin(["ENGLAND", "WALES"])]
- data["BLNDG_GOVERMENT_UPRN"].unique()
+ # Merge on the UPRN
+ sample_list = sample_list.merge(
+ asset_data[["BLDNG_ID", "BLNDG_GOVERMENT_UPRN"]].drop_duplicates(),
+ how="left", on="BLDNG_ID"
+ )
+ sample_list["BLNDG_GOVERMENT_UPRN"] = sample_list["BLNDG_GOVERMENT_UPRN"].astype("Int64")
- asset_data["BLNDG_GOVERMENT_UPRN"].unique()
+ # Use the EPC API to get corrected postcodes
+ model_asset_list = []
+ missed = []
+ for _, x in tqdm(sample_list.iterrows(), total=len(sample_list)):
- df = asset_data.groupby("BLNDG_GOVERMENT_UPRN")["ESTB_ID"].nunique().sort_values(ascending=False).reset_index()
+ if pd.isnull(x["BLNDG_GOVERMENT_UPRN"]):
+ continue
+ searcher = SearchEpc(
+ address1="",
+ postcode="",
+ uprn=x["BLNDG_GOVERMENT_UPRN"],
+ auth_token=EPC_AUTH_TOKEN,
+ os_api_key=""
+ )
+ searcher.find_property(skip_os=True)
+ newest_epc = searcher.newest_epc
+ if newest_epc is None:
+ missed.append(x["BLNDG_GOVERMENT_UPRN"])
+ continue
- example = asset_data[asset_data["BLNDG_GOVERMENT_UPRN"] == df.head(1)["BLNDG_GOVERMENT_UPRN"].values[0]]
+ model_asset_list.append(newest_epc)
- asset_data[asset_data["BLNDG_GOVERMENT_UPRN"]]
+ model_asset_list = pd.DataFrame(model_asset_list)
+ model_asset_list["uprn"] = model_asset_list["uprn"].astype(int)
- asset_data = asset_data[asset_data["ESTB_ID"].isin(sample_list["ESTB_ID"].values)]
- asset_data.drop_duplicates("ESTB_ID", inplace=True)
+ spatial_data = OpenUprnClient.get_spatial_data(
+ uprns=model_asset_list["uprn"].tolist(), bucket_name="retrofit-data-dev"
+ )
- [x for x in asset_data.columns if "uprn" in x.lower()]
+ # We determine if the building is listed, heritage or in a conservation area
- example = asset_data[asset_data["ESTB_ID"] == 1547072]
+ # Merge on the property features
+ features = asset_data.drop(
+ columns=["BUILDING_SYSTEM_ITEM_NAME", "OBSERVED_CONDITION_DESCRIPTION"]
+ ).drop_duplicates()
+
+ df = features.merge(
+ model_asset_list, how="inner", right_on="uprn", left_on="BLNDG_GOVERMENT_UPRN"
+ ).merge(
+ pd.DataFrame(spatial_data).rename(columns={"UPRN": "uprn"}), how="left", on="uprn"
+ )
+
+ # Store data locally
+ # df.to_csv(folder_path + "/MOD property data.csv", index=False)
+
+ # Produce as asset list for analysis
+
+ df["row_id"] = df.index
+
+ epc_data, errors, no_epc = get_data(
+ df=df,
+ manual_uprn_map={},
+ epc_auth_token=EPC_AUTH_TOKEN,
+ uprn_column="uprn",
+ fulladdress_column="address",
+ address1_column="address1",
+ postcode_column="postcode",
+ property_type_column=None,
+ built_form_column=None,
+ epc_api_only=False,
+ row_id_name="row_id",
+ )
+
+ non_invasive_recommendations = []
+ for x in epc_data:
+ non_invasive_recommendations.append(
+ {
+ "uprn": x["uprn"],
+ "recommendations": x["find_my_epc_data"]["recommendations"]
+ }
+ )
+
+ asset_list = df[
+ ["uprn", "address1", "postcode", "NUMBER_OF_BEDROOMS", "BLDNG_STOREYS_QTY", ]
+ ].rename(
+ columns={
+ "address1": "address",
+ "NUMBER_OF_BEDROOMS": "n_bedrooms",
+ "BLDNG_STOREYS_QTY": "number_of_floors"
+ }
+ )
+
+ filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
+ save_csv_to_s3(
+ dataframe=asset_list,
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=filename
+ )
+
+ # Store the non-invasive recommendations in s3
+ non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
+ save_csv_to_s3(
+ dataframe=pd.DataFrame(non_invasive_recommendations),
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=non_invasive_recommendations_filename
+ )
+
+ # Scenario 1 - EPC C
+ body = {
+ "portfolio_id": str(PORTFOLIO_ID),
+ "housing_type": "Private",
+ "goal": "Increasing EPC",
+ "goal_value": "C",
+ "trigger_file_path": filename,
+ "already_installed_file_path": "",
+ "patches_file_path": "",
+ "non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
+ "valuation_file_path": "",
+ "scenario_name": "Hit EPC C",
+ "multi_plan": True,
+ "budget": None,
+ # "inclusions": [
+ # "cavity_wall_insulation",
+ # "loft_insulation",
+ # "windows",
+ # "solar_pv",
+ # "air_source_heat_pump"
+ # ]
+ }
+ print(body)
+
+ # Scenario 2 - EPC B
+ body = {
+ "portfolio_id": str(PORTFOLIO_ID),
+ "housing_type": "Private",
+ "goal": "Increasing EPC",
+ "goal_value": "B",
+ "trigger_file_path": filename,
+ "already_installed_file_path": "",
+ "patches_file_path": "",
+ "non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
+ "valuation_file_path": "",
+ "scenario_name": "Hit EPC B",
+ "multi_plan": True,
+ "budget": None,
+ # "inclusions": [
+ # "cavity_wall_insulation",
+ # "loft_insulation",
+ # "windows",
+ # "solar_pv",
+ # "air_source_heat_pump"
+ # ]
+ }
+ print(body)
+
+ # Scenario 3 - EPC B, 3.5 COP ASHP
+ body = {
+ "portfolio_id": str(PORTFOLIO_ID),
+ "housing_type": "Private",
+ "goal": "Increasing EPC",
+ "goal_value": "B",
+ "trigger_file_path": filename,
+ "already_installed_file_path": "",
+ "patches_file_path": "",
+ "non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
+ "valuation_file_path": "",
+ "scenario_name": "Hit EPC B - 3.5 COP ASHP",
+ "multi_plan": True,
+ "budget": None,
+ "ashp_cop": 3.5
+ # "inclusions": [
+ # "cavity_wall_insulation",
+ # "loft_insulation",
+ # "windows",
+ # "solar_pv",
+ # "air_source_heat_pump"
+ # ]
+ }
+ print(body)
diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py
index a97dbcb3..77e8fd10 100644
--- a/recommendations/SolarPvRecommendations.py
+++ b/recommendations/SolarPvRecommendations.py
@@ -1,5 +1,6 @@
import numpy as np
import pandas as pd
+import backend.app.assumptions as assumptions
from recommendations.Costs import Costs
from recommendations.recommendation_utils import override_costs, estimate_pitched_roof_area
@@ -24,6 +25,23 @@ class SolarPvRecommendations:
SAP_POINTS_PER_5_PERCENT_ROOF_COVERAGE = 1
+ BACKUP_PANEL_PERFORMANCE = pd.DataFrame(
+ [
+ {
+ "n_panels": 4,
+ "array_wattage": 1600,
+ "initial_ac_kwh_per_year": assumptions.MEDIAN_WATTAGE_TO_AC * 1600,
+ "panneled_roof_area": 4 * assumptions.RDSAP_AREA_PER_PANEL
+ },
+ {
+ "n_panels": 8,
+ "array_warrage": 3200,
+ "initial_ac_kwh_per_year": assumptions.MEDIAN_WATTAGE_TO_AC * 3200,
+ "panneled_roof_area": 8 * assumptions.RDSAP_AREA_PER_PANEL
+ },
+ ]
+ )
+
def __init__(self, property_instance):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id