L&G work and adding new AssetListEpcData class. Working on a remote asssessment

This commit is contained in:
Khalim Conn-Kowlessar 2025-01-23 08:15:47 +00:00
parent ad3ba92475
commit edf9c00759
10 changed files with 429 additions and 52 deletions

View file

@ -133,9 +133,14 @@ class Property:
self.energy_cost_estimates = {}
self.energy_consumption_estimates = {}
# when storing the energy, we'll also
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
"co2_emissions": epc_record.get("co2_emissions_current"),
"epc_co2_emissions": epc_record.get("co2_emissions_current"),
# These will be added in once we estimate the amount of emissions from appliances - using the carbon
# intensity of electricity
"appliances_co2_emissions": None,
"co2_emissions": None
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
@ -725,6 +730,15 @@ class Property:
"unadjusted": unadjusted_kwh_estimates
}
# Update carbon with appliances
self.energy["appliances_co2_emissions"] = (
(unadjusted_kwh_estimates["appliances"] * assumptions.ELECTRICITY_CARBON_INTENSITY) / 1000
)
# Re-calculate total CO2 emissions
self.energy["co2_emissions"] = float(np.round(
self.energy["epc_co2_emissions"] + self.energy["appliances_co2_emissions"], 2
))
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient

View file

@ -1,7 +1,7 @@
# Assumes that the average efficiency of an air source heat pump is 250%, taking the median of the 200-400% range,
# which is often quoted as a sensible efficiency range for air source heat pumps.
# We assume that the ASHP efficiency is 280%, which is the minimum that Cotswolds Energy Group achieves, as
# they target this
PESSIMISTIC_ASHP_EFFICIENCY = 200
AVERAGE_ASHP_EFFICIENCY = 250
AVERAGE_ASHP_EFFICIENCY = 280
# Conservative estimate of the proportion of electricity that will be consumed, whereas the rest will
# be exported. These are averages based on Google research. E.g
@ -14,6 +14,9 @@ RDSAP_AREA_PER_PANEL = 3.4
SOCIAL_TENURES = ["Rented (social)", "rental (social)"]
# Carbon intensity of electricity, as of 16th Jan 2025
ELECTRICITY_CARBON_INTENSITY = 0.232
DESCRIPTIONS_TO_FUEL_TYPES = {
"Air source heat pump, radiators, electric": {
"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100

View file

@ -121,7 +121,7 @@ def extract_portfolio_aggregation_data(
# We can now calculate multiple outputs based on default recommendations
carbon_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
pre_retrofit_co2 = p.data["co2-emissions-current"]
pre_retrofit_co2 = p.energy["co2_emissions"]
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())

View file

@ -0,0 +1,239 @@
import pandas as pd
from backend.app.utils import sap_to_epc
data = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/basildon_age_breakdowns/property_202501170837.csv"
)
data["year_built"].value_counts()
# 1991-2002 139
# 2003-2006 50
# 1996-2002 42
# 1976-1982 37
# 1967-1975 37
# 1983-1990 33
# 1950-1966 26
data["full_property_type"] = data["property_type"] + ": " + data["built_form"]
data["full_property_type"].value_counts()
# House: Mid-Terrace 136
# House: End-Terrace 83
# House: Semi-Detached 55
# Flat: Semi-Detached 24
# Flat: End-Terrace 19
# House: Detached 10
# Flat: Mid-Terrace 9
# Maisonette: Mid-Terrace 9
# Maisonette: Semi-Detached 8
# Maisonette: End-Terrace 6
# Flat: Detached 4
# Bungalow: Detached 1
epc_data = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/basildon_age_breakdowns/basildon EPC Data.csv"
)
# Classify floor area in <73m2, 73-98, 99-200, 200+
epc_data["floor_area_bracket"] = epc_data["total_floor_area"].apply(
lambda x: "<73" if x < 73 else "73-98" if x < 99 else "99-200" if x < 200 else "200+")
# 73-98 185
# <73 156
# 99-200 23
epc_data["wall_type"] = epc_data["walls"].str.split(",").str[0]
epc_data["wall_type"].value_counts()
# Cavity wall 343
# Timber frame 15
# System built 6
# we pull some additional data
# We want:
# 1) The list of properties included in the portfolio, with uprn
# 2) The recommendations against each property with costs, and whether or not the recommendation was defaulted
# 3) The properties without recommendations and why
from tqdm import tqdm
import pandas as pd
import numpy as np
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
def get_data(portfolio_id, scenario_ids):
session = sessionmaker(bind=db_engine)()
session.begin()
# Get properties and their details for a specific portfolio
properties_query = session.query(
PropertyModel,
PropertyDetailsEpcModel
).join(
PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id
).filter(
PropertyModel.portfolio_id == portfolio_id # Filter by portfolio ID
).all()
# Transform properties data to include all fields dynamically
properties_data = [
{**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
**{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
PropertyDetailsEpcModel.__table__.columns}}
for prop in properties_query
]
# Get property IDs from fetched properties
# Get plans linked to the fetched properties
plans_query = session.query(Plan).filter(Plan.scenario_id.in_(scenario_ids)).all()
# Transform plans data to include all fields dynamically
plans_data = [
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
for plan in plans_query
]
# Extract plan IDs for filtering recommendations through PlanRecommendations
plan_ids = [plan['id'] for plan in plans_data]
# Get recommendations through PlanRecommendations for those plans and that are default
recommendations_query = session.query(
Recommendation,
Plan.scenario_id
).join(
PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
).join(
Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
).filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default == True # Filtering for default recommendations
).all()
# Transform recommendations data to include all fields dynamically and include scenario_id
recommendations_data = [
{**{col.name: getattr(rec.Recommendation, col.name) if hasattr(rec, 'Recommendation') else getattr(rec,
col.name) for
col in Recommendation.__table__.columns},
"Scenario ID": rec.scenario_id}
for rec in recommendations_query
]
session.close()
return properties_data, plans_data, recommendations_data
properties_data, plans_data, recommendations_data = get_data(portfolio_id=124, scenario_ids=[199])
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
recommended_measures_df = recommendations_df[
["property_id", "measure_type", "estimated_cost", "default"]
]
recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]]
recommended_measures_df = recommended_measures_df.drop(columns=["default"])
post_install_sap = recommendations_df[["property_id", "default", "sap_points"]]
post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = post_install_sap.groupby("property_id")[["sap_points"]].sum().reset_index()
recommendations_measures_pivot = recommended_measures_df.pivot(
index='property_id',
columns='measure_type',
values='estimated_cost'
)
recommendations_measures_pivot = recommendations_measures_pivot.reset_index()
recommendations_measures_pivot = recommendations_measures_pivot.rename(
columns={
"air_source_heat_pump": "Cost: Air Source Heat Pump",
"cavity_wall_insulation": "Cost: Cavity Wall Insulation",
"double_glazing": "Cost: Double Glazing",
"loft_insulation": "Cost: Loft Insulation",
"mechanical_ventilation": "Cost: Ventilation",
"solar_pv": "Cost: Solar PV"
}
)
recommendations_measures_pivot = recommendations_measures_pivot.fillna(0)
recommendations_measures_pivot["Recommendation: Air Source Heat Pump"] = (
recommendations_measures_pivot["Cost: Air Source Heat Pump"] > 0
)
recommendations_measures_pivot["Recommendation: Cavity Wall Insulation"] = (
recommendations_measures_pivot["Cost: Cavity Wall Insulation"] > 0
)
recommendations_measures_pivot["Recommendation: Double Glazing"] = (
recommendations_measures_pivot["Cost: Double Glazing"] > 0
)
recommendations_measures_pivot["Recommendation: Loft Insulation"] = (
recommendations_measures_pivot["Cost: Loft Insulation"] > 0
)
recommendations_measures_pivot["Recommendation: Ventilation"] = (
recommendations_measures_pivot["Cost: Ventilation"] > 0
)
recommendations_measures_pivot["Recommendation: Solar PV"] = (
recommendations_measures_pivot["Cost: Solar PV"] > 0
)
df = properties_df[
[
"property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows",
"current_epc_rating",
"current_sap_points", "total_floor_area", "number_of_rooms",
]
].merge(
recommendations_measures_pivot, how="left", on="property_id"
).merge(
post_install_sap, how="left", on="property_id"
)
df = df.drop(columns=["property_id"])
df["sap_points"] = df["sap_points"].fillna(0)
df = df.rename(
columns={
"uprn": "UPRN",
"address": "Address",
"postcode": "Postcode",
"walls": "Walls",
"roof": "Roof",
"heating": "Heating",
"windows": "Windows",
"current_epc_rating": "Current EPC Rating",
"current_sap_points": "Current SAP Points",
"total_floor_area": "Total Floor Area",
"number_of_rooms": "Number of Habitable Rooms",
"floor_height": "Floor Height",
}
)
df["Has Recommendations"] = ~pd.isnull(df["Cost: Air Source Heat Pump"])
# We fill missings:
for col in [
"Recommendation: Air Source Heat Pump", "Recommendation: Cavity Wall Insulation",
"Recommendation: Double Glazing", "Recommendation: Loft Insulation", "Recommendation: Ventilation",
"Recommendation: Solar PV"
]:
df[col] = df[col].fillna(False)
for col in [
"Cost: Air Source Heat Pump", "Cost: Cavity Wall Insulation",
"Cost: Double Glazing", "Cost: Loft Insulation", "Cost: Ventilation",
"Cost: Solar PV"
]:
df[col] = df[col].fillna(0)
# Calculate post SAP
df["Predicted Post Works SAP"] = df["Current SAP Points"] + df["sap_points"]
df["Predicted Post Works SAP"] = df["Predicted Post Works SAP"].round()
df["Predicted Post Works EPC"] = df["Predicted Post Works SAP"].apply(lambda x: sap_to_epc(x))
df.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Basildon Data Export - 2.csv", index=False)

View file

@ -1,9 +1,15 @@
import os
import pandas as pd
from dotenv import load_dotenv
from utils.s3 import save_csv_to_s3
from etl.find_my_epc.AssetListEpcData import AssetListEpcData
PORTFOLIO_ID = 120
PORTFOLIO_ID = 126
USER_ID = 8
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
"""
@ -13,11 +19,20 @@ def app():
asset_list = [
{
"uprn": 100030334057,
"address": "5, Lynton Street",
"postcode": "DE22 3RW"
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"building_id": 1
}
]
asset_list = pd.DataFrame(asset_list)
@ -29,40 +44,37 @@ def app():
file_name=filename
)
non_invasive_recommendations = [
{
"uprn": 100030334057,
"recommendations": [
{
"type": "internal_wall_insulation",
"sap_points": 9,
"survey": True
},
{
"type": "external_wall_insulation",
"sap_points": 9,
"survey": True
},
{
"type": "suspended_floor_insulation",
"sap_points": 2,
"survey": True
}
]
}
]
# Pull the non-invasive recommendations automatically
asset_list_epc_client = AssetListEpcData(
asset_list=asset_list,
epc_auth_token=EPC_AUTH_TOKEN
)
asset_list_epc_client.get_data()
asset_list_epc_client.get_non_invasive_recommendations()
# Store non-invasive recommendations in S3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
dataframe=pd.DataFrame(asset_list_epc_client.non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
valuation_data = [
{
"uprn": 100030334057,
"value": 133_000
"address": "Garden Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"value": 337_000
},
{
"addresss": "Top Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"value": 337_000
},
{
"address": "First Floor Flat, 48 Bedminster Parade",
"postcode": "BS3 4HS",
"value": 337_000
}
]
# Store valuation data to s3

View file

@ -0,0 +1,89 @@
import time
import pandas as pd
from tqdm import tqdm
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.SearchEpc import SearchEpc
from utils.logger import setup_logger
logger = setup_logger()
class AssetListEpcData:
def __init__(self, asset_list: pd.DataFrame, epc_auth_token: str):
"""
This class handles pulling data assocaited to an asset list and performs common functions like
getting EPC api data, retrieveing data form the find my epc website and extracting non-intrusive
recommendations
:param asset_list:
"""
# Check the asset list contains the correct columns
self.asset_list = self.check_asset_list(asset_list)
self.epc_auth_token = epc_auth_token
self.extracted_data = None
self.non_invasive_recommendations = None
@staticmethod
def check_asset_list(asset_list):
# TODO: Update this with pydantic
return asset_list
def get_non_invasive_recommendations(self):
"""
Extracts non-invasive recommendations in a format that can be used by the engine
:return:
"""
if self.extracted_data is None:
raise ValueError("Please run get_data first")
self.non_invasive_recommendations = [
{
"uprn": r["uprn"],
"recommendations": r["recommendations"]
} for r in self.extracted_data
]
def get_data(self):
logger.info("Retrieving data for given asset list")
# Pull the additional data
extracted_data = []
for _, home in tqdm(self.asset_list.iterrows(), total=len(self.asset_list)):
add1 = home["address"]
pc = home["postcode"]
# Retrieve the EPC data
epc_searcher = SearchEpc(
address1=add1,
postcode=pc,
uprn=home["uprn"],
auth_token=self.epc_auth_token,
os_api_key=""
)
epc_searcher.find_property(skip_os=True)
if epc_searcher.newest_epc is None:
continue
find_epc_searcher = RetrieveFindMyEpc(
address=epc_searcher.newest_epc["address1"],
postcode=epc_searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
time.sleep(0.5)
# We need uprn
extracted_data.append(
{
"uprn": home["uprn"],
**find_epc_data,
}
)
logger.info("Data Extrction complete")

View file

@ -313,6 +313,9 @@ class RetrieveFindMyEpc:
"Heating controls (programmer and TRVs)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Heating controls (programmer and room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
"Replacement warm air unit": [],
"Secondary glazing": ["secondary_glazing"]
}

View file

@ -27,8 +27,8 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
try:
postcode = home[postcode_column]
house_number = home[address1_column]
full_address = home[fulladdress_column]
house_number = home[address1_column].strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
@ -56,7 +56,13 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")[1].strip()
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
@ -126,6 +132,10 @@ def extract_address1(asset_list, full_address_col, method="first_two_words"):
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
raise ValueError(f"Method {method} not recognized")
@ -152,17 +162,19 @@ def app():
Property UPRN
"""
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Route Marches"
DATA_FILENAME = "Full Below SAP C Stock - RN Copy.xlsx"
SHEET_NAME = "Electric Properties"
POSTCODE_COLUMN = "Postcode"
FULLADDRESS_COLUMN = "Address"
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Southern"
DATA_FILENAME = "January 2025 Additions Query.xlsx"
SHEET_NAME = "Jan 2025 additions"
POSTCODE_COLUMN = "Post Code"
FULLADDRESS_COLUMN = "Street / Block Name"
ADDRESS1_COLUMN = None
ADDRESS1_METHOD = "first_two_words"
ADDRESS1_METHOD = "first_word"
ADDRESS_COLS_TO_CONCAT = []
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {}
MANUAL_UPRN_MAP = {
"Ardelagh Ardelagh Faris Lane Woodham Addlestone KT15 3DJ": 100061484560
}
asset_list = pd.read_excel(os.path.join(DATA_FOLDER, DATA_FILENAME), header=0, sheet_name=SHEET_NAME)
asset_list = asset_list[~pd.isnull(asset_list[POSTCODE_COLUMN])].reset_index()
@ -211,6 +223,9 @@ def app():
manual_uprn_map=MANUAL_UPRN_MAP
)
no_data = asset_list[asset_list["row_id"].isin(no_epc)]
print(no_data[[FULLADDRESS_COLUMN, POSTCODE_COLUMN]])
# Append the failed data to the main data
epc_data.extend(epc_data_failed)
@ -372,8 +387,6 @@ def app():
)
asset_list = asset_list.drop(columns=["row_id", "index"])
asset_list[asset_list["Assessors name"] == "Robin Bailey"]["Assessor's Email"].value_counts()
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull - Main.xlsx"
asset_list.to_excel(filename, index=False)

View file

@ -28,9 +28,6 @@ class Recommendations:
High level recommendations class, which sits above the measure specific recommendation classes
"""
# Constant for carbon intensity calculation, as of 16th Jan 2025
CARBON_INTENSITY = 0.232
def __init__(
self,
property_instance: Property,
@ -531,6 +528,9 @@ class Recommendations:
previous_phase_values = {
"sap": float(property_instance.data["current-energy-efficiency"]),
# For carbon, even though we generally use the updated figure which includes the carbon
# associated to appliances, for this scoring process we use the EPC carbon value. This means
# that we don't overestimate the impact since the model uses the EPC carbon value
"carbon": float(property_instance.data["co2-emissions-current"]),
"heat_demand": float(property_instance.data["energy-consumption-current"]),
}
@ -832,8 +832,8 @@ class Recommendations:
if rec["type"] == "solar_pv":
rec["kwh_savings"] = rec_impact["solar_kwh_savings"].values[0]
# Calculate carbon savings from this
emissions_kg = rec["kwh_savings"] * cls.CARBON_INTENSITY # Calculate emissions in kg
# Calculate carbon savings from this - emissions in kg and convert to tonnes
emissions_kg = rec["kwh_savings"] * assumptions.ELECTRICITY_CARBON_INTENSITY
emissions_tonnes = emissions_kg / 1000
rec["co2_equivalent_savings"] = emissions_tonnes

View file

@ -23,6 +23,10 @@ def prepare_input_measures(property_recommendations, goal):
# if the recommendation is a solar recommendation with a battery, we exclude it from the optimisation.
recs = [r for r in recs if ~r["has_battery"]]
recs_to_append = [rec for rec in recs if rec["energy_cost_savings"] >= 0]
if not recs_to_append:
continue
input_measures.append(
[
{