debuyggin solar api when no data found

This commit is contained in:
Khalim Conn-Kowlessar 2025-03-13 11:08:59 +00:00
parent dc2d108060
commit 0a7fb131ef
11 changed files with 629 additions and 220 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="AssetList" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="AssetList" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Fastapi-backend" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

178
asset_list/DataMapper.py Normal file
View file

@ -0,0 +1,178 @@
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class DataRemapper:
def __init__(self, standard_values, standard_map=None, max_tokens=1000):
"""
Initialize the remapper with standard values and a predefined mapping.
:param standard_values: Set of allowed standardized values.
:param standard_map: Dictionary of common remappings {raw_value: standard_value}.
"""
self.standard_values = standard_values
self.standard_map = standard_map
self.fuzzy_threshold = 90 # Adjust fuzzy matching sensitivity
self.ai_model = "gpt-4-turbo" # Use gpt-3.5-turbo for cheaper processing
# Tokenizer for counting tokens
self.tokenizer = tiktoken.encoding_for_model(self.ai_model)
# Track token usage and remap dictionary
self.total_tokens_used = 0
self.total_cost = 0
self.remap_dict = {} # {original_value: standardized_value}
self.max_tokens = max_tokens # Limit for OpenAI API
# Memoization for AI calls
self.ai_cache = {} # {tuple(unmapped_values): {original_value: standardized_value}}
# Capture the reponse for debugging
self.ai_response = None
# OpenAI pricing (as of Feb 2024)
self.pricing = {
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0015 / 1000, "output": 0.002 / 1000},
}
self.openai_client = OpenAI(api_key=OPENAI_API_KEY)
@staticmethod
def clean_string(text):
"""Basic text cleaning: remove extra spaces, punctuation, and normalize case."""
if not isinstance(text, str):
return None
text = text.strip().lower()
text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
# Replace double strings
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(self, text):
"""Use fuzzy matching to find the closest standard value."""
match, score = process.extractOne(text, self.standard_values) if text else (None, 0)
return match if score >= self.fuzzy_threshold else None
def count_tokens(self, text):
"""Estimate the number of tokens in a given text."""
return len(self.tokenizer.encode(text)) if text else 0
def ai_standardize(self, unmapped_values):
"""Call OpenAI API **once** for all unmapped values to minimize cost, with memoization."""
if not unmapped_values:
return {}
unmapped_tuple = tuple(sorted(unmapped_values)) # Ensure consistency for memoization
if unmapped_tuple in self.ai_cache:
return self.ai_cache[unmapped_tuple] # Return memoized result
prompt = f"""
You are an expert in data classification. Standardize each of these values into one of the categories:
{list(self.standard_values)}.
Return only a JSON dictionary where:
- The keys are the original values.
- The values are the standardized ones.
Strictly return JSON **without markdown formatting** or extra text.
Example Output:
{{
"BLKHOUS": "block house",
"BEDSIT": "bedsit"
}}
Values to standardize:
{unmapped_values}
"""
# Count input tokens
input_tokens = self.count_tokens(prompt)
if input_tokens > self.max_tokens:
raise ValueError("Input tokens exceed the maximum limit.")
logger.info("Calling OpenAI API for standardization...")
response = self.openai_client.chat.completions.create(
model=self.ai_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=self.max_tokens,
temperature=0.1,
)
output_text = response.choices[0].message.content.strip()
output_tokens = self.count_tokens(output_text) # Count output tokens
# Track total token usage
self.total_tokens_used += input_tokens + output_tokens
# Estimate cost
input_cost = input_tokens * self.pricing[self.ai_model]["input"]
output_cost = output_tokens * self.pricing[self.ai_model]["output"]
self.total_cost += input_cost + output_cost
try:
# Parse response as dictionary
mapping = eval(output_text) # OpenAI should return a valid dictionary
except:
mapping = {val: "unknown" for val in unmapped_values} # Fallback
# Memoize the AI response
self.ai_cache[unmapped_tuple] = mapping
# We store the raw AI response for debugging
logger.debug(f"AI Response: {mapping}")
self.ai_response = output_text
return mapping
def standardize_list(self, values_to_remap):
"""
Standardizes a list of values and returns a dictionary {original_value: standardized_value}.
:param values_to_remap: List of raw values to standardize.
:return: Dictionary {original_value: standardized_value}.
"""
unique_values = set(values_to_remap) # Process only unique values
unmapped_values = []
for value in unique_values:
if pd.isna(value): # Handle NaN values
self.remap_dict[value] = "unknown"
continue
cleaned_value = self.clean_string(value)
# Rule-Based Check (Predefined Mapping)
if cleaned_value in self.standard_map or value in self.standard_map:
self.remap_dict[value] = (
self.standard_map[cleaned_value] if cleaned_value in self.standard_map else self.standard_map[value]
)
continue
if value.lower() in self.standard_map:
self.remap_dict[value] = self.standard_map[value.lower()]
continue
# Exact Match in Standard Values
if cleaned_value in self.standard_values:
self.remap_dict[value] = cleaned_value
continue
# Fuzzy Matching
fuzzy_match = self.fuzzy_match(cleaned_value)
if fuzzy_match:
self.remap_dict[value] = fuzzy_match
continue
# Capture anything that wasn't mapped
unmapped_values.append(value)
# AI Model - remap anything unmapped (batch request)
ai_mapping = self.ai_standardize(unmapped_values)
self.remap_dict.update(ai_mapping)
return self.remap_dict
def report_usage(self):
"""Prints a summary of token usage and cost."""
print(f"\n🔹 Total Tokens Used: {self.total_tokens_used}")
print(f"💰 Estimated Cost: ${self.total_cost:.4f}")

View file

@ -1,9 +1,6 @@
import os
import time
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from pprint import pprint
import msgpack
from utils.s3 import read_from_s3
@ -13,181 +10,15 @@ from asset_list.mappings.built_form import BUILT_FORM_MAPPINGS
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
from asset_list.utils import get_data
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(
df, manual_uprn_map, epc_api_only=False, row_id_name="row_id"
):
uprn_column = AssetList.STANDARD_UPRN
fulladdress_column = AssetList.STANDARD_FULL_ADDRESS
address1_column = AssetList.STANDARD_ADDRESS_1
postcode_column = AssetList.STANDARD_POSTCODE
# These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat"
}
built_form_map = {
"mid-terrace": "Mid-Terrace",
"end-terrace": "End-Terrace",
"semi-detached": "Semi-Detached",
"detached": "Detached"
}
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(df.iterrows(), total=len(df)):
try:
# If we have a block of flats, we cannot retrieve this data
if home[AssetList.STANDARD_PROPERTY_TYPE] == "block of flats":
no_epc.append(home[row_id_name])
continue
postcode = home[postcode_column]
house_number = str(home[address1_column]).strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
if uprn is None and home.get(uprn_column):
uprn = home[uprn_column]
if pd.isnull(uprn):
uprn = None
property_type = property_type_map.get(home[AssetList.STANDARD_PROPERTY_TYPE], None)
built_form = built_form_map.get(home[AssetList.STANDARD_BUILT_FORM])
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
# As a final resort, we estimate the EPC
if property_type is not None and searcher.newest_epc is None:
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home[row_id_name])
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
if epc_api_only:
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"]
}
epc_data.append(epc)
continue
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_data = {}
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home[row_id_name])
time.sleep(5)
return epc_data, errors, no_epc
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
@ -507,6 +338,12 @@ def app():
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
uprn_column=AssetList.STANDARD_UPRN,
fulladdress_column=AssetList.STANDARD_FULL_ADDRESS,
address1_column=AssetList.STANDARD_ADDRESS_1,
postcode_column=AssetList.STANDARD_POSTCODE,
property_type_column=AssetList.STANDARD_PROPERTY_TYPE,
built_form_column=AssetList.STANDARD_BUILT_FORM,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only
)
@ -516,6 +353,10 @@ def app():
epc_data_failed, _, _ = get_data(
df=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
uprn_column=AssetList.STANDARD_UPRN,
fulladdress_column=AssetList.STANDARD_FULL_ADDRESS,
address1_column=AssetList.STANDARD_ADDRESS_1,
postcode_column=AssetList.STANDARD_POSTCODE,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only
)

183
asset_list/utils.py Normal file
View file

@ -0,0 +1,183 @@
import time
import numpy as np
import pandas as pd
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from tqdm import tqdm
from utils.logger import setup_logger
logger = setup_logger()
def get_data(
df,
manual_uprn_map,
epc_auth_token,
uprn_column,
fulladdress_column,
address1_column,
postcode_column,
property_type_column,
built_form_column,
epc_api_only=False,
row_id_name="row_id",
):
# These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat"
}
built_form_map = {
"mid-terrace": "Mid-Terrace",
"end-terrace": "End-Terrace",
"semi-detached": "Semi-Detached",
"detached": "Detached"
}
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(df.iterrows(), total=len(df)):
try:
# If we have a block of flats, we cannot retrieve this data
if home.get(property_type_column) == "block of flats":
no_epc.append(home[row_id_name])
continue
postcode = home[postcode_column]
house_number = str(home[address1_column]).strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
if uprn is None and home.get(uprn_column):
uprn = home[uprn_column]
if pd.isnull(uprn):
uprn = None
property_type = property_type_map.get(home.get(property_type_column), None)
built_form = built_form_map.get(home.get(built_form_column))
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=epc_auth_token,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=epc_auth_token,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
# As a final resort, we estimate the EPC
if property_type is not None and searcher.newest_epc is None:
searcher.ordnance_survey_client.property_type = property_type
searcher.ordnance_survey_client.built_form = built_form
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home[row_id_name])
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
if epc_api_only:
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"]
}
epc_data.append(epc)
continue
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_data = {}
else:
logger.error(f"Error retrieving FindMyEPC data: {e}")
raise Exception(f"Error retrieving FindMyEPC data: {e}")
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home[row_id_name])
time.sleep(5)
return epc_data, errors, no_epc

View file

@ -226,25 +226,20 @@ class Property:
# as we collect more data from the energy assessment
n_bathrooms = kwargs.get("n_bathrooms", None)
if n_bathrooms not in [None, ""]:
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
n_bathrooms = int(round(float(n_bathrooms) + 1e-5))
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
n_bathrooms = int(round(float(n_bathrooms) + 1e-5)) if n_bathrooms not in [None, ""] else None
n_bedrooms = kwargs.get("n_bedrooms", None)
if n_bedrooms not in [None, ""]:
n_bedrooms = int(round(float(n_bedrooms) + 1e-5))
n_bedrooms = int(round(float(n_bedrooms) + 1e-5)) if n_bedrooms not in [None, ""] else None
number_of_floors = kwargs.get("number_of_floors", None)
if number_of_floors not in [None, ""]:
number_of_floors = int(round(float(number_of_floors) + 1e-5))
number_of_floors = int(round(float(number_of_floors) + 1e-5)) if number_of_floors not in [None, ""] else None
insulation_floor_area = kwargs.get("insulation_floor_area", None)
if insulation_floor_area not in [None, ""]:
insulation_floor_area = float(insulation_floor_area)
insulation_floor_area = float(insulation_floor_area) if insulation_floor_area not in [None, ""] else None
insulation_wall_area = kwargs.get("insulation_wall_area", None)
if insulation_wall_area not in [None, ""]:
insulation_wall_area = float(insulation_wall_area)
insulation_wall_area = float(insulation_wall_area) if insulation_wall_area not in [None, ""] else None
return {
"n_bathrooms": n_bathrooms,

View file

@ -308,12 +308,20 @@ class SearchEpc:
self.data = output["response"]
return output["msg"]
if not self.uprn and not self.address1 and not self.postcode:
raise ValueError("No search parameters provided")
uprn_params = {"uprn": self.uprn} if self.uprn else {}
address_params = {"address": self.address1, "postcode": self.postcode}
address_params = {}
if self.address1:
address_params["address"] = self.address1
if self.postcode:
address_params["postcode"] = self.postcode
# We attempt the search with uprn params
data = {"rows": []}
api_response = {}
if uprn_params:
api_response = self._get_epc(params=uprn_params, size=size)
if api_response["msg"]["status"] == 200:
@ -321,14 +329,15 @@ class SearchEpc:
# If we were unsuccessful, we then make a second attempt to fetch the data. We find that
# properties are sometimes listed under the wrong UPRN
api_response = self._get_epc(params=address_params, size=size)
if api_response["msg"]["status"] == 200:
# We update the data with the correct uprn
if self.uprn:
for x in api_response["response"]["rows"]:
x["uprn"] = self.uprn
if address_params:
api_response = self._get_epc(params=address_params, size=size)
if api_response["msg"]["status"] == 200:
# We update the data with the correct uprn
if self.uprn:
for x in api_response["response"]["rows"]:
x["uprn"] = self.uprn
data["rows"].extend(api_response["response"]["rows"])
data["rows"].extend(api_response["response"]["rows"])
# We no de-dupe on lmk-key to avoid duplicates
seen = set()

View file

@ -9,8 +9,7 @@ from tqdm import tqdm
from math import sin, cos, sqrt, atan2, radians
from utils.logger import setup_logger
from recommendations.Costs import Costs, MCS_SOLAR_PV_COST_DATA
from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel
from recommendations.Costs import Costs
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.Property import Property
from backend.app.db.functions.solar_functions import get_solar_data, store_batch_data
@ -54,6 +53,9 @@ class GoogleSolarApi:
# Max area of a roof space we allow panels for
PERCENTAGE_OF_ROOF_LIMIT = 0.8
# Error Messages
ENTITY_NOT_FOUND_ERROR = 'Requested entity was not found.'
def __init__(self, api_key, max_retries=5):
"""
Initialize the GoogleSolarApi class with the provided API key and maximum retries.
@ -112,6 +114,13 @@ class GoogleSolarApi:
response.raise_for_status() # Raise an error for bad status codes
return response.json()
except requests.exceptions.RequestException as e:
if (
(e.response.status_code == 404) &
(e.response.json()["error"]["message"] == self.ENTITY_NOT_FOUND_ERROR)
):
logger.warning("No building insights found for the given location.")
return {"error": self.ENTITY_NOT_FOUND_ERROR}
attempt += 1
print(f"Attempt {attempt} failed: {e}")
time.sleep(2 ** attempt) # Exponential backoff
@ -155,6 +164,11 @@ class GoogleSolarApi:
# If we have no data in the db, or updated_at is more than 6 months
if self.insights_data is None or is_outdated:
self.insights_data = self.get_building_insights(longitude, latitude, required_quality)
if self.insights_data.get("error") == self.ENTITY_NOT_FOUND_ERROR:
# We use default performance since in this case, we couldn't retrieve data. We don't store
self.panel_performance = self.default_panel_performance(property_instance=property_instance)
return
self.need_to_store = True
# Extract key data from the insights response
@ -820,7 +834,6 @@ class GoogleSolarApi:
if unit["longitude"] is None or unit["latitude"] is None:
# At this point, we've checked that solar PV is valid, and so we provide some defaults
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": None,
@ -875,19 +888,19 @@ class GoogleSolarApi:
cost_instance = Costs(property_instance=property_instance)
# We return a 2.4 and 4 kwp system
# We return a 1.6 and 3.2 kwp system
panel_performance = pd.DataFrame(
[
{
'n_panels': 10,
'yearly_dc_energy': 4000 * 0.99, # Assumed 99% efficient wattage -> dc
'n_panels': 8,
'yearly_dc_energy': 3200 * assumptions.MEDIAN_WATTAGE_TO_DC,
'total_cost': cost_instance.solar_pv(
n_panels=10, has_battery=False, n_floors=property_instance.number_of_floors
n_panels=8, has_battery=False, n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
'panneled_roof_area': 10 * assumptions.RDSAP_AREA_PER_PANEL,
'array_wattage': 4000,
'initial_ac_kwh_per_year': 4000 * 0.95, # Assumed 95% efficient wattage -> ac
'panneled_roof_area': 8 * assumptions.RDSAP_AREA_PER_PANEL,
'array_wattage': 3200,
'initial_ac_kwh_per_year': 3200 * assumptions.MEDIAN_WATTAGE_TO_AC,
'lifetime_ac_kwh': None,
'lifetime_dc_kwh': None,
'roi': None,
@ -899,15 +912,15 @@ class GoogleSolarApi:
'rank': None
},
{
'n_panels': 6,
'yearly_dc_energy': 2400 * 0.99, # Assumed 99% efficient wattage -> dc
'n_panels': 4,
'yearly_dc_energy': 1600 * assumptions.MEDIAN_WATTAGE_TO_DC,
'total_cost': cost_instance.solar_pv(
n_panels=6, has_battery=False, n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
'panneled_roof_area': 6 * assumptions.RDSAP_AREA_PER_PANEL,
'array_wattage': 2400,
'initial_ac_kwh_per_year': 2400 * 0.95, # Assumed 95% efficient wattage -> ac
'panneled_roof_area': 4 * assumptions.RDSAP_AREA_PER_PANEL,
'array_wattage': 1600,
'initial_ac_kwh_per_year': 1600 * assumptions.MEDIAN_WATTAGE_TO_AC,
'lifetime_ac_kwh': None,
'lifetime_dc_kwh': None,
'roi': None,

View file

@ -11,6 +11,9 @@ SOLAR_CONSUMPTION_WITH_BATTERY_PROPORTION = 0.7
# Typically, each solar panel takes up around 3.4 m2 of roof space under RdSAP. This was been verified in Elmhurst
RDSAP_AREA_PER_PANEL = 3.4
# This is a median based on a sample of properties
MEDIAN_WATTAGE_TO_AC = 0.965
MEDIAN_WATTAGE_TO_DC = 0.99
SOCIAL_TENURES = ["Rented (social)", "rental (social)"]

View file

@ -1,4 +1,17 @@
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.spatial.OpenUprnClient import OpenUprnClient
from asset_list.utils import get_data
from utils.s3 import save_csv_to_s3
PORTFOLIO_ID = 139
USER_ID = 8
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
@ -9,26 +22,182 @@ def app():
folder_path = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/MOD/Pilot Programme"
sample_list = pd.read_excel(f"{folder_path}/20250227_DIO_Accommodation_Sample_Properties.xlsx")
asset_data = pd.read_excel(f"{folder_path}/20250303_DIO_Accommodation_Property_Attribution.xlsx")
asset_data["BLNDG_GOVERMENT_UPRN"] = asset_data["BLNDG_GOVERMENT_UPRN"].astype("Int64")
asset_data["BLNDG_GOVERMENT_UPRN"].nunique()
for _id in asset_data["ESTB_ID"].unique():
data = asset_data[asset_data["ESTB_ID"] == _id]
z = data["BLNDG_GOVERMENT_UPRN"]
sample_list = sample_list[sample_list["BLDNG_COUNTRY_NAME"].isin(["ENGLAND", "WALES"])]
data["BLNDG_GOVERMENT_UPRN"].unique()
# Merge on the UPRN
sample_list = sample_list.merge(
asset_data[["BLDNG_ID", "BLNDG_GOVERMENT_UPRN"]].drop_duplicates(),
how="left", on="BLDNG_ID"
)
sample_list["BLNDG_GOVERMENT_UPRN"] = sample_list["BLNDG_GOVERMENT_UPRN"].astype("Int64")
asset_data["BLNDG_GOVERMENT_UPRN"].unique()
# Use the EPC API to get corrected postcodes
model_asset_list = []
missed = []
for _, x in tqdm(sample_list.iterrows(), total=len(sample_list)):
df = asset_data.groupby("BLNDG_GOVERMENT_UPRN")["ESTB_ID"].nunique().sort_values(ascending=False).reset_index()
if pd.isnull(x["BLNDG_GOVERMENT_UPRN"]):
continue
searcher = SearchEpc(
address1="",
postcode="",
uprn=x["BLNDG_GOVERMENT_UPRN"],
auth_token=EPC_AUTH_TOKEN,
os_api_key=""
)
searcher.find_property(skip_os=True)
newest_epc = searcher.newest_epc
if newest_epc is None:
missed.append(x["BLNDG_GOVERMENT_UPRN"])
continue
example = asset_data[asset_data["BLNDG_GOVERMENT_UPRN"] == df.head(1)["BLNDG_GOVERMENT_UPRN"].values[0]]
model_asset_list.append(newest_epc)
asset_data[asset_data["BLNDG_GOVERMENT_UPRN"]]
model_asset_list = pd.DataFrame(model_asset_list)
model_asset_list["uprn"] = model_asset_list["uprn"].astype(int)
asset_data = asset_data[asset_data["ESTB_ID"].isin(sample_list["ESTB_ID"].values)]
asset_data.drop_duplicates("ESTB_ID", inplace=True)
spatial_data = OpenUprnClient.get_spatial_data(
uprns=model_asset_list["uprn"].tolist(), bucket_name="retrofit-data-dev"
)
[x for x in asset_data.columns if "uprn" in x.lower()]
# We determine if the building is listed, heritage or in a conservation area
example = asset_data[asset_data["ESTB_ID"] == 1547072]
# Merge on the property features
features = asset_data.drop(
columns=["BUILDING_SYSTEM_ITEM_NAME", "OBSERVED_CONDITION_DESCRIPTION"]
).drop_duplicates()
df = features.merge(
model_asset_list, how="inner", right_on="uprn", left_on="BLNDG_GOVERMENT_UPRN"
).merge(
pd.DataFrame(spatial_data).rename(columns={"UPRN": "uprn"}), how="left", on="uprn"
)
# Store data locally
# df.to_csv(folder_path + "/MOD property data.csv", index=False)
# Produce as asset list for analysis
df["row_id"] = df.index
epc_data, errors, no_epc = get_data(
df=df,
manual_uprn_map={},
epc_auth_token=EPC_AUTH_TOKEN,
uprn_column="uprn",
fulladdress_column="address",
address1_column="address1",
postcode_column="postcode",
property_type_column=None,
built_form_column=None,
epc_api_only=False,
row_id_name="row_id",
)
non_invasive_recommendations = []
for x in epc_data:
non_invasive_recommendations.append(
{
"uprn": x["uprn"],
"recommendations": x["find_my_epc_data"]["recommendations"]
}
)
asset_list = df[
["uprn", "address1", "postcode", "NUMBER_OF_BEDROOMS", "BLDNG_STOREYS_QTY", ]
].rename(
columns={
"address1": "address",
"NUMBER_OF_BEDROOMS": "n_bedrooms",
"BLDNG_STOREYS_QTY": "number_of_floors"
}
)
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
# Scenario 1 - EPC C
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Hit EPC C",
"multi_plan": True,
"budget": None,
# "inclusions": [
# "cavity_wall_insulation",
# "loft_insulation",
# "windows",
# "solar_pv",
# "air_source_heat_pump"
# ]
}
print(body)
# Scenario 2 - EPC B
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "B",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Hit EPC B",
"multi_plan": True,
"budget": None,
# "inclusions": [
# "cavity_wall_insulation",
# "loft_insulation",
# "windows",
# "solar_pv",
# "air_source_heat_pump"
# ]
}
print(body)
# Scenario 3 - EPC B, 3.5 COP ASHP
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "B",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Hit EPC B - 3.5 COP ASHP",
"multi_plan": True,
"budget": None,
"ashp_cop": 3.5
# "inclusions": [
# "cavity_wall_insulation",
# "loft_insulation",
# "windows",
# "solar_pv",
# "air_source_heat_pump"
# ]
}
print(body)

View file

@ -1,5 +1,6 @@
import numpy as np
import pandas as pd
import backend.app.assumptions as assumptions
from recommendations.Costs import Costs
from recommendations.recommendation_utils import override_costs, estimate_pitched_roof_area
@ -24,6 +25,23 @@ class SolarPvRecommendations:
SAP_POINTS_PER_5_PERCENT_ROOF_COVERAGE = 1
BACKUP_PANEL_PERFORMANCE = pd.DataFrame(
[
{
"n_panels": 4,
"array_wattage": 1600,
"initial_ac_kwh_per_year": assumptions.MEDIAN_WATTAGE_TO_AC * 1600,
"panneled_roof_area": 4 * assumptions.RDSAP_AREA_PER_PANEL
},
{
"n_panels": 8,
"array_warrage": 3200,
"initial_ac_kwh_per_year": assumptions.MEDIAN_WATTAGE_TO_AC * 3200,
"panneled_roof_area": 8 * assumptions.RDSAP_AREA_PER_PANEL
},
]
)
def __init__(self, property_instance):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id