Adding in mvp valuation and energy cost savings for keyzy demo

This commit is contained in:
Khalim Conn-Kowlessar 2023-11-28 11:41:34 +00:00
parent f4e0528aa0
commit 1da9433ee2
10 changed files with 209 additions and 85 deletions

View file

@ -8,7 +8,9 @@ class Settings(BaseSettings):
SECRET_KEY: str
ENVIRONMENT: str
DATA_BUCKET: str
PREDICTIONS_BUCKET: str
SAP_PREDICTIONS_BUCKET: str
CARBON_PREDICTIONS_BUCKET: str
HEAT_PREDICTIONS_BUCKET: str
PLAN_TRIGGER_BUCKET: str
EPC_AUTH_TOKEN: str
DB_HOST: str

View file

@ -9,9 +9,9 @@ def aggregate_portfolio_recommendations(session, portfolio_id: int):
session.query(
func.sum(Recommendation.estimated_cost).label("cost"),
func.sum(Recommendation.total_work_hours).label("total_work_hours"),
# For future usage we will aggregate multiple fields in this step
# func.sum(Recommendation.heat_demand).label("total_heat_demand"),
# func.sum(Recommendation.energy_savings).label("total_energy_savings")
func.sum(Recommendation.heat_demand).label("total_heat_demand"),
func.sum(Recommendation.energy_savings).label("total_energy_savings"),
func.sum(Recommendation.energy_cost_savings).label("energy_cost_savings")
)
.join(PlanRecommendations, PlanRecommendations.recommendation_id == Recommendation.id)
.join(Plan, Plan.id == PlanRecommendations.plan_id)
@ -22,8 +22,9 @@ def aggregate_portfolio_recommendations(session, portfolio_id: int):
aggregates_dict = {
"cost": aggregates.cost or 0,
"total_work_hours": aggregates.total_work_hours or 0,
# "total_heat_demand": aggregates.total_heat_demand or 0,
# "total_energy_savings": aggregates.total_energy_savings or 0
"total_heat_demand": aggregates.total_heat_demand or 0,
"total_energy_savings": aggregates.total_energy_savings or 0,
"energy_cost_savings": aggregates.energy_cost_savings or 0,
}
# Get the portfolio and update the fields

View file

@ -83,6 +83,7 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
"heat_demand": rec["heat_demand"],
"co2_equivalent_savings": rec["co2_equivalent_savings"],
"total_work_hours": rec["labour_hours"],
"energy_cost_savings": rec["energy_cost_savings"]
}
for rec in recommendations_to_upload
]

View file

@ -20,23 +20,16 @@ from backend.app.db.functions.recommendations_functions import (
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import (
create_recommendation_scoring_data, get_cleaned, insert_temp_recommendation_id
)
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.FireplaceRecommendations import FireplaceRecommendations
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.WallRecommendations import WallRecommendations
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
@ -130,49 +123,8 @@ async def trigger_plan(body: PlanTriggerRequest):
# Property recommendations
p.get_components(cleaned)
property_recommendations = []
# Floor recommendations
floor_recommender = FloorRecommendations(property_instance=p, materials=materials)
floor_recommender.recommend()
if floor_recommender.recommendations:
property_recommendations.append(floor_recommender.recommendations)
# Wall recommendations
wall_recomender = WallRecommendations(property_instance=p, materials=materials)
wall_recomender.recommend()
if wall_recomender.recommendations:
property_recommendations.append(wall_recomender.recommendations)
# Roof recommendations
roof_recommender = RoofRecommendations(property_instance=p, materials=materials)
roof_recommender.recommend()
if roof_recommender.recommendations:
property_recommendations.append(roof_recommender.recommendations)
# Ventilation recommendations
ventilation_recomender = VentilationRecommendations(
property_instance=p,
materials=[part for part in materials if part["type"] == "mechanical_ventilation"]
)
ventilation_recomender.recommend()
if ventilation_recomender.recommendation:
property_recommendations.append(ventilation_recomender.recommendation)
# Fireplace sealing recommendations
fireplace_recommender = FireplaceRecommendations(property_instance=p)
fireplace_recommender.recommend()
if fireplace_recommender.recommendation:
property_recommendations.append(fireplace_recommender.recommendation)
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = insert_temp_recommendation_id(property_recommendations)
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend()
if not property_recommendations:
continue
@ -239,7 +191,11 @@ async def trigger_plan(body: PlanTriggerRequest):
all_predictions = model_api.predict_all(
df=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
predictions_bucket=get_settings().PREDICTIONS_BUCKET
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# Insert the predictions into the recommendations and run the optimiser

View file

@ -8,25 +8,6 @@ from backend.app.config import get_settings
import msgpack
def insert_temp_recommendation_id(property_recommendations):
"""
Creates a temporary recommendation id which is needed for
filtering recommendations between default and no, after the optimiser has been
run
:param property_recommendations: nested list of recommendations, grouped by data_types
:return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id"
integer inserted
"""
idx = 0
for recs in property_recommendations:
for rec in recs:
rec["recommendation_id"] = idx
idx += 1
return property_recommendations
def get_cleaned():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
@ -154,7 +135,17 @@ def create_recommendation_scoring_data(
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
scoring_dict["roof_insulation_thickness_ENDING"] = str(int(parts[0]["depth"]))
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
scoring_dict["roof_insulation_thickness_ENDING"] = str(proposed_depth)
scoring_dict["ROOF_ENERGY_EFF_ENDING"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades

View file

@ -0,0 +1,28 @@
class AnnualBillSavings:
"""
This is a simple class which will estimate the annual bill savings, based on the kwh savings.
This class uses data from Ofgem, including their price caps, to provide us with an estimate for
1KWH of energy.
"""
# These gas an electricity consumption figures are based off of figures presented by Ofgem
# https://www.ofgem.gov.uk/information-consumers/energy-advice-households/average-gas-and-electricity-use-explained
AVERAGE_ELECTRICITY_CONSUMPTION = 2700
AVERAGE_GAS_CONSUMPTION = 11500
# Latest price cap figures from Ofgem are for January 2024
# https://www.ofgem.gov.uk/publications/changes-energy-price-cap-1-january-2024
ELECTRICITY_PRICE_CAP = 0.29
GAS_PRICE_CAP = 0.07
# This is a weighted mean of the price caps, using the consumption figures above as weights
PRICE_FACTOR = 0.11183098591549295
@classmethod
def estimate(cls, kwh: float):
"""
Estimate the annual bill savings based on the kwh savings
:param kwh: The kwh savings
:return: An estimate for annual bill savings
"""
return cls.PRICE_FACTOR * kwh

View file

@ -0,0 +1,43 @@
class PropertyValuation:
"""
This is a placeholder class for the property valuation model
"""
UPRN_VALUE_LOOKUP = {
15038202: 202000,
37024763: 213000,
}
VALUE_INCREASE_MAPPING = [
{
"starting_epc": "D",
"ending_epc": "C",
"increase_percentage": 0.057,
},
{
"starting_epc": "D",
"ending_epc": "B",
"increase_percentage": 0.057,
},
]
@classmethod
def estimate(cls, property_instance, target_epc):
current_value = cls.UPRN_VALUE_LOOKUP.get(property_instance.uprn)
if not current_value:
raise ValueError("Have not implemented valuation for this property")
valuation_increases = [
v for v in cls.VALUE_INCREASE_MAPPING if
v["starting_epc"] == property_instance.epc_band and v["ending_epc"] == target_epc
]
if len(valuation_increases) != 1:
raise ValueError("Valuation increase mapping not found")
new_valuation = (1 + valuation_increases[0]["increase_percentage"]) * current_value
increase = round(new_valuation - current_value, 2)
return increase

View file

@ -77,7 +77,7 @@ class ModelApi:
Returns:
dict: The API response as a dictionary if the request was successful, None otherwise.
"""
logger.info("Making request to sap change api")
logger.info(f"Making request to {model_prefix} change api")
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
payload = {
"file_location": file_location,
@ -100,7 +100,7 @@ class ModelApi:
# depending on how you want to handle errors in your application
return None
def predict_all(self, df, bucket, predictions_bucket) -> dict:
def predict_all(self, df, bucket, prediction_buckets) -> dict:
"""
For each model prefix, this method will upload the scoring data to s3 and then make a request to the
@ -109,7 +109,7 @@ class ModelApi:
a dictionary of panaas dataframes
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param predictions_bucket: Name of the bucket in s3 to store predictions
:param prediction_buckets: Dictionary containing the prediction buckets for each model prefix
:return:
"""
@ -117,7 +117,11 @@ class ModelApi:
for model_prefix in self.MODEL_PREFIXES:
logger.info(f"Scoring for model prefix: {model_prefix}")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
response = self.predict(file_location, model_prefix)
response = self.predict(
"s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix
)
predictions_bucket = prediction_buckets[model_prefix]
# Retrieve the predictions
predictions_df = pd.DataFrame(

View file

@ -58,7 +58,7 @@ class Costs:
EWI_SCAFFOLDING_PRELIMINARIES = 0.15
VAT_RATE = 0.2
PROFIT_MARGIN = 0.15
PROFIT_MARGIN = 0.2
def __init__(self, property_instance):
"""

View file

@ -1,8 +1,98 @@
from backend.Property import Property
from typing import List
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.FireplaceRecommendations import FireplaceRecommendations
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
class Recommendations:
"""
High level recommendations class, which sits above the measure specific recommendation classes
"""
def __init__(
self,
property_instance: Property,
materials: List
):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
:param materials: List of materials to be used in the recommendations
"""
self.property_instance = property_instance
self.materials = materials
self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials)
self.wall_recomender = WallRecommendations(property_instance=property_instance, materials=materials)
self.roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
self.ventilation_recomender = VentilationRecommendations(
property_instance=property_instance,
materials=[part for part in materials if part["type"] == "mechanical_ventilation"]
)
self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance)
def recommend(self):
"""
This method runs the recommendations for the individual measures and then appends them to a list for output
:return:
"""
property_recommendations = []
# Floor recommendations
self.floor_recommender.recommend()
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
# Wall recommendations
self.wall_recomender.recommend()
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
# Roof recommendations
self.roof_recommender.recommend()
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
# Ventilation recommendations
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
# Fireplace sealing recommendations
self.fireplace_recommender.recommend()
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)
return property_recommendations
@staticmethod
def insert_temp_recommendation_id(property_recommendations):
"""
Creates a temporary recommendation id which is needed for
filtering recommendations between default and no, after the optimiser has been
run
:param property_recommendations: nested list of recommendations, grouped by data_types
:return: Updated recommendations_to_upload, where where recommendation has a "recommendation_id"
integer inserted
"""
idx = 0
for recs in property_recommendations:
for rec in recs:
rec["recommendation_id"] = idx
idx += 1
return property_recommendations
@classmethod
def calculate_recommendation_impact(cls, property_instance, all_predictions, recommendations):
@ -30,6 +120,7 @@ class Recommendations:
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
new_sap = property_sap_predictions[property_sap_predictions["recommendation_id"] == str(
rec["recommendation_id"]
)]["predictions"].values[0]
@ -44,10 +135,17 @@ class Recommendations:
rec["sap_points"] = new_sap - float(property_instance.data["current-energy-efficiency"])
rec["co2_equivalent_savings"] = float(property_instance.data["co2-emissions-current"]) - new_carbon
rec["heat_demand"] = float(property_instance.data["co2-emissions-current"]) - new_heat_demand
# Energy consumption current is per meter squared, so we need to multiply by the floor area to get
# an absolute figure for the home
rec["heat_demand"] = (
(float(property_instance.data["energy-consumption-current"]) - new_heat_demand
) * property_instance.floor_area)
rec["energy_cost_savings"] = AnnualBillSavings.estimate(rec["heat_demand"])
if (rec["sap_points"] is None) and (rec["co2_equivalent_savings"] is None) or (
rec["heat_demand"] is None):
rec["heat_demand"] is None) or (rec["energy_cost_savings"] is None):
raise ValueError("sap points, co2 or heat demand is missing")
return property_recommendations