Merge pull request #289 from Hestia-Homes/gla-demo-croydon

Gla demo croydon
This commit is contained in:
KhalimCK 2024-04-10 11:15:21 +01:00 committed by GitHub
commit 50cb58670d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 2146 additions and 163 deletions

View file

@ -282,7 +282,7 @@ class Property:
]
# Setting the insulation thickness here to above average should be tested further because we
# don't see a high volume of instances for this
output["walls_insulation_thickness_ending"] = "above average"
output["walls_insulation_thickness_ending"] = "average"
output["walls_energy_eff_ending"] = "Good"
# Note: often when the wall is insulatied, the internal/external insulation is not noted so we should
@ -298,11 +298,6 @@ class Property:
if recommendation["type"] == "cavity_wall_insulation":
output["is_filled_cavity_ending"] = True
# TODO: perhaps detrimental
# When making a recommendation for the wall, we will also update the ventilation
# if output["mechanical_ventilation_ending"] == 'natural':
# output["mechanical_ventilation_ending"] = 'mechanical, extract only'
else:
if output["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
@ -426,7 +421,7 @@ class Property:
"double glazing installed during or after 2002"
)
if recommendation["type"] in ["heating", "hot_water_tank_insulation"]:
if recommendation["type"] in ["heating", "hot_water_tank_insulation", "heating_control"]:
# We update the data, as defined in the recommendaton
simulation_config = recommendation["simulation_config"]
@ -442,21 +437,12 @@ class Property:
output["photo_supply_ending"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"sealing_open_fireplace",
"low_energy_lighting",
"internal_wall_insulation",
"external_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"room_roof_insulation",
"flat_roof_insulation",
"solid_floor_insulation",
"suspended_floor_insulation",
"exposed_floor_insulation",
"windows_glazing",
"solar_pv",
"heating",
"hot_water_tank_insulation",
"sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
"heating_control",
]:
raise NotImplementedError(
"Implement me, given type %s" % recommendation["type"]
@ -704,7 +690,7 @@ class Property:
filtered_df = uprn_filenames[
(uprn_filenames["lower"] <= self.uprn)
& (uprn_filenames["upper"] >= self.uprn)
]
]
if filtered_df.empty:
logger.warning("Could not find file containing UPRNS")
return None
@ -787,7 +773,7 @@ class Property:
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]]
if self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES
and self.data["floor-level"] is not None
and self.data["floor-level"] is not None
else None
)

View file

@ -11,7 +11,7 @@ def aggregate_portfolio_recommendations(
session.query(
func.sum(Recommendation.estimated_cost).label("cost"),
func.sum(Recommendation.total_work_hours).label("total_work_hours"),
func.sum(Recommendation.heat_demand).label("energy_savings"),
func.sum(Recommendation.adjusted_heat_demand).label("energy_savings"),
func.sum(Recommendation.co2_equivalent_savings).label("co2_equivalent_savings"),
func.sum(Recommendation.energy_cost_savings).label("energy_cost_savings"),
)

View file

@ -24,7 +24,7 @@ from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc
from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
@ -35,12 +35,13 @@ from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
from backend.ml_models.Valuation import PropertyValuation
logger = setup_logger()
BATCH_SIZE = 5
SCORING_BATCH_SIZE = 400
def patch_epc(config, epc_records):
@ -91,10 +92,14 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = []
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if uprn:
uprn = int(float(uprn))
epc_searcher = SearchEpc(
address1=config["address"],
postcode=config["postcode"],
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY
)
@ -160,15 +165,13 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
for p in input_properties:
for p in tqdm(input_properties):
# Property recommendations
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
recommender = Recommendations(property_instance=p, materials=materials)
# TODO: portfolio id as an input is temp
print("DELETE PORTFOLIO ID AS AN INPUT!!")
property_recommendations, property_representative_recommendations = recommender.recommend(body.portfolio_id)
recommender = Recommendations(property_instance=p, materials=materials, exclusions=body.exclusions)
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:
continue
@ -194,15 +197,26 @@ async def trigger_plan(body: PlanTriggerRequest):
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = model_api.predict_all(
df=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
all_predictions = {
"sap_change_predictions": pd.DataFrame(),
"heat_demand_predictions": pd.DataFrame(),
"carbon_change_predictions": pd.DataFrame()
}
to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE)
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
predictions_dict = model_api.predict_all(
df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE],
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# Append the predictions to the predictions dictionary
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
# Insert the predictions into the recommendations and run the optimiser
# TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a
@ -256,14 +270,15 @@ async def trigger_plan(body: PlanTriggerRequest):
if any(x in [r["type"] for r in solution] for x in [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"
]):
ventilation_rec = [
r for r in recommendations_with_impact if r[0]["type"] == "mechanical_ventilation"
][0]
selected_recommendations = set(
list(selected_recommendations) + [ventilation_rec[0]["recommendation_id"]]
ventilation_rec = next(
(r[0] for r in recommendations_with_impact if r[0]["type"] == "mechanical_ventilation"),
None
)
# If a matching recommendation was found, add its ID to the selected recommendations
if ventilation_rec:
selected_recommendations.add(ventilation_rec["recommendation_id"])
# We check if the selected recommendation is wall ventilation and if so, we make sure
# mechanical ventilation is selected

View file

@ -1,10 +1,51 @@
from pydantic import BaseModel
from pydantic import BaseModel, conlist, validator
from typing import Optional
class PlanTriggerRequest(BaseModel):
budget: float | None = None
budget: Optional[float] = None
goal: str
housing_type: str
goal_value: str
portfolio_id: int
trigger_file_path: str
exclusions: Optional[conlist(str, min_items=1)] = None
# Pre-defined list of possibilities for exclusions
_allowed_exclusions = {
"wall_insulation",
"ventilation",
"roof_insulation",
"floor_insulation",
"windows",
"fireplace",
"heating",
"hot_water",
"lighting",
"solar_pv"
}
_allowed_goals = {"Increase EPC"}
_allowed_housing_types = {"Social", "Private"}
# Validator to ensure exclusions are within the pre-defined possibilities
@validator('exclusions', each_item=True)
def check_exclusions(cls, v):
if v not in cls._allowed_exclusions:
raise ValueError(f"{v} is not an allowed exclusion")
return v
# Validator to ensure that the goal is within the pre-defined possibilities
@validator('goal')
def check_goal(cls, v):
if v not in cls._allowed_goals:
raise ValueError(f"{v} is not a valid goal")
return v
# Validator to ensure that the housing type is within the pre-defined possibilities
@validator('housing_type')
def check_housing_type(cls, v):
if v not in cls._allowed_housing_types:
raise ValueError(f"{v} is not a valid housing type")
return v

View file

@ -1,6 +1,4 @@
import boto3
import csv
from io import StringIO
import string
import secrets
import logging
@ -41,25 +39,6 @@ def setup_logger(log_file=None, level=logging.INFO, overwrite_handler=False):
return logger
def read_csv_from_s3(bucket_name, filepath):
s3 = boto3.client('s3')
# Get the object from s3
s3_object = s3.get_object(Bucket=bucket_name, Key=filepath)
# Read the CSV body from the s3 object
body = s3_object['Body'].read()
# Use StringIO to create a file-like object from the string
csv_data = StringIO(body.decode('utf-8'))
# Use csv library to read it into a list of dictionaries
reader = csv.DictReader(csv_data)
data = list(reader)
return data
def generate_api_key():
# Define the characters that will be used to generate the api key
characters = string.ascii_letters + string.digits

View file

@ -10,13 +10,17 @@ class AnnualBillSavings:
AVERAGE_ELECTRICITY_CONSUMPTION = 2700
AVERAGE_GAS_CONSUMPTION = 11500
# Latest price cap figures from Ofgem are for January 2024
# https://www.ofgem.gov.uk/publications/changes-energy-price-cap-1-january-2024
ELECTRICITY_PRICE_CAP = 0.29
GAS_PRICE_CAP = 0.07
# Latest price cap figures from Ofgem are for April 2024
# https://www.ofgem.gov.uk/publications/new-energy-price-cap-level-april-june-2024-starts-today
ELECTRICITY_PRICE_CAP = 0.245
GAS_PRICE_CAP = 0.0604
# This is a weighted mean of the price caps, using the consumption figures above as weights
PRICE_FACTOR = 0.11183098591549295
PRICE_FACTOR = 0.09549999999999999
# Daily standard charge, based on average across England, Scotland and Wales, and includes VAT
DAILY_STANDARD_CHARGE_GAS = 0.3143
DAILY_STANDARD_CHARGE_ELECTRICITY = 0.601
EPC_BANDS = ["G", "F", "E", "D", "C", "B", "A"]
@ -38,6 +42,17 @@ class AnnualBillSavings:
"""
return cls.ELECTRICITY_PRICE_CAP * kwh
@classmethod
def calculate_annual_bill(cls, kwh):
"""
This method will estimate the total annual bill for a property
It assumed gas & electricity are used
:param kwh: The total kwh consumption
:return: An estimate for annual bill
"""
return cls.PRICE_FACTOR * kwh + (cls.DAILY_STANDARD_CHARGE_GAS + cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365)
@classmethod
def adjust_energy_to_metered(cls, epc_energy_consumption, current_epc_rating):
"""

View file

@ -0,0 +1,211 @@
import pandas as pd
from utils.s3 import save_csv_to_s3
USER_ID = 8
PORTFOLIO_ID = 67
archetype_1_uprns = [100020604138, 200001188299, 100020578756, 200001187196, 200001192253, 100020581792, 200001188304,
100020625813, 100020618060, 100020585305, 100020617489, 100020615039, 100020618076, 100020588913,
200001187197, 100020671205, 100020576940, 100020619814, 100020576472, 100020618083]
archetype_2_uprns = [100020698027, 10001007455, 100020653785, 10090383198, 100020665632, 100020620659, 100020615603,
100020609610, 100020625597, 100020665656, 100020665640, 100020587905, 100020665630, 100020624351,
100020625451, 100020624348, 100020666735, 100020653786, 100020576458, 100020657902, 100020624350,
100020637405, 100020666734, 100020616325, 100020666716, 100020653783, 100020665645, 100020642337,
100020665638, 100022904981, 100020688226, 100020630285, 100020626800, 100020665634, 100022907528,
100020665652, 100020624347, 100020666721, 100020585002, 10014055968, 10001008257, 100020621438,
100020576459, 100020665643, 100020665654, 100022917303]
archetype_3_uprns = [100020577523, 100020616446, 100020605342, 100020594652, 100020585394, 100020601138, 100020597485,
100020614883, 100020633162, 100020697787, 200001185785, 100020646842, 100020581449, 100020595611,
100020641814, 100020575611, 100020652986, 100020654671, 100020647336, 100020610518, 100020607980,
100020692380, 100020581690]
archetype_4_uprns = [100020650603, 100020582907, 100020605116, 100020650607, 100020589325, 100020655500, 100020642537,
200001187539, 100020631683, 100020610165, 100020596436, 100020598277, 100020660228]
def app():
"""
We shall define a small portfolio of properties, based in Croydon
:return:
"""
# Firstly, read in the EPC data for Croydon
epc_data = pd.read_csv(
"local_data/all-domestic-certificates/domestic-E09000008-Croydon/certificates.csv",
low_memory=False
)
z = epc_data.groupby(["WALLS_DESCRIPTION", "WALLS_ENERGY_EFF"]).size().reset_index(name="count")
z = z[z["MAINHEAT_DESCRIPTION"] == "Boiler and radiators, mains gas"]
# Filter on entries where we have a UPRN
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
# Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this
epc_data["LODGEMENT_DATE"] = pd.to_datetime(epc_data["LODGEMENT_DATE"])
epc_data = epc_data.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN")
# Now filter on social properties
epc_data = epc_data[epc_data["TENURE"].isin(["rental (social)", "Rented (social)"])]
# There are 17337 properties with a registered EPC in Croydon
# Take below EPC C properties
epc_data = epc_data[epc_data["CURRENT_ENERGY_EFFICIENCY"].astype(int) < 69]
# 7994 properties are below EPC C (46%)
# 79% D, 19% E, 1% F, 0.2% G - it probably makes the most sense to focus on E and D properties
epc_data["CURRENT_ENERGY_RATING"].value_counts(normalize=True)
# For the purpose of the sample, take the properties have surveys done in the last 3 years
# This gives us 1351 remaining properties
three_years_ago = pd.Timestamp.now() - pd.DateOffset(days=int(3 * 365))
epc_data = epc_data[epc_data["LODGEMENT_DATE"] >= three_years_ago]
# Archetype 1: defined below:
# 1) House
# 2) Unfilled cavity
# 3) A roof that could be insulated (flat or pitched with no more than 50mm insulation)
# 4) EPC E or D
# 24 properties
archetype_1_sample = epc_data[
epc_data["PROPERTY_TYPE"].isin(["House"]) &
(epc_data["CURRENT_ENERGY_RATING"].isin(["D", "E"])) &
epc_data["WALLS_DESCRIPTION"].isin(["Cavity wall, as built, no insulation (assumed)"]) &
epc_data["ROOF_DESCRIPTION"].isin(
[
"Pitched, 12 mm loft insulation",
"Pitched, 0 mm loft insulation",
"Pitched, no insulation",
"Pitched, 50 mm loft insulation",
"Flat, no insulation (assumed)",
"Pitched, no insulation (assumed)"
]
)
]
archetype_1_sample_asset_list = archetype_1_sample[["UPRN", "ADDRESS1", "POSTCODE"]].copy()
archetype_1_sample_asset_list["ARCHETYPE"] = "Archetype 1"
# Archetype 2: defined below:
# 1) Flat
# 2) Unfilled cavity
# 3) Another property above
# 4) EPC E
# 57 properties here
archetype_2_sample = epc_data[
epc_data["PROPERTY_TYPE"].isin(["Flat"]) &
(epc_data["CURRENT_ENERGY_RATING"].isin(["E", "D"])) &
epc_data["WALLS_DESCRIPTION"].isin(["Cavity wall, as built, no insulation (assumed)"]) &
epc_data["ROOF_DESCRIPTION"].isin(
[
"(another dwelling above)"
]
)
]
archetype_2_sample_asset_list = archetype_2_sample[["UPRN", "ADDRESS1", "POSTCODE"]].copy()
archetype_2_sample_asset_list["ARCHETYPE"] = "Archetype 2"
# Archetype 3: defined below:
# 1) EPC E or below
# 2) Solid brick wall
# 3) House
# 4) Pitched roof with no insulation
# Just 7 properties (more expensive to retrofit)
archetype_3_sample = epc_data[
epc_data["PROPERTY_TYPE"].isin(["House"]) &
(epc_data["CURRENT_ENERGY_RATING"].isin(["E", "F", "G"])) &
epc_data["WALLS_DESCRIPTION"].isin(["Solid brick, as built, no insulation (assumed)"]) &
epc_data["ROOF_DESCRIPTION"].isin(
[
"Pitched, no insulation",
"Pitched, limited insulation (assumed)",
"Pitched, 100 mm loft insulation",
"Pitched, no insulation (assumed)",
]
)
]
archetype_3_sample_asset_list = archetype_3_sample[["UPRN", "ADDRESS1", "POSTCODE"]].copy()
archetype_3_sample_asset_list["ARCHETYPE"] = "Archetype 3"
# Archetype 4: defined below:
# 1) Maisonette
# 2) Empty cavity
# 3) EPC E
# 16 properties here
archetype_4_sample = epc_data[
epc_data["PROPERTY_TYPE"].isin(["Maisonette"]) &
epc_data["WALLS_DESCRIPTION"].isin(
["Cavity wall, as built, no insulation (assumed)"]
)
]
archetype_4_sample_asset_list = archetype_4_sample[["UPRN", "ADDRESS1", "POSTCODE"]].copy()
archetype_4_sample_asset_list["ARCHETYPE"] = "Archetype 4"
asset_list = pd.concat(
[
archetype_1_sample_asset_list,
archetype_2_sample_asset_list,
archetype_3_sample_asset_list,
archetype_4_sample_asset_list
]
)
asset_list = asset_list.rename(
columns={
"UPRN": "uprn",
"ADDRESS1": "address",
"POSTCODE": "postcode",
"ARCHETYPE": "archetype"
}
)
asset_list["uprn"] = asset_list["uprn"].astype(int)
# We end up with some properties that are currently an EPC C, but we do not have this data in the download, so we
# manually remove
# 1) 3 Reid Close, CR5 3BL
# 2) Flat 6, Collier Court 2A, St. Peters Road CR0 1HD
asset_list = asset_list[
~asset_list["uprn"].isin(
[
100020576460,
100020624352,
]
)
]
# We have slightly too many properties, so we take a random sample of each archetype
# achetype_1_size = 20
# achetype_2_size = 46
# achetype_3_size = 23
# achetype_4_size = 13
# archetype_1_uprns = asset_list[asset_list["archetype"] == "Archetype 1"]["uprn"].sample(
# int(achetype_1_size)
# ).tolist()
# archetype_2_uprns = asset_list[asset_list["archetype"] == "Archetype 2"]["uprn"].sample(
# int(achetype_2_size)
# ).tolist()
# archetype_3_uprns = asset_list[asset_list["archetype"] == "Archetype 3"]["uprn"].sample(
# int(achetype_3_size)
# ).tolist()
# archetype_4_uprns = asset_list[asset_list["archetype"] == "Archetype 4"]["uprn"].sample(
# int(achetype_4_size)
# ).tolist()
uprns_to_keep = archetype_1_uprns + archetype_2_uprns + archetype_3_uprns + archetype_4_uprns
asset_list = asset_list[asset_list["uprn"].isin(uprns_to_keep)]
filename = f"{USER_ID}/{PORTFOLIO_ID}/inputs.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename,
"budget": None,
"exclusions": ["floor_insulation"]
}
print(body)

View file

@ -0,0 +1,760 @@
"""
This script contains the code to generate the data required to populate the slides
We connect to the database amd extract the data for the portfolio needed so it is recommended to use
a environment akin to the backend to run this script
"""
import pandas as pd
import numpy as np
from backend.app.db.connection import db_engine
from sqlalchemy.orm import sessionmaker
from utils.s3 import read_csv_from_s3
from etl.customers.slide_utils import (
plot_epc_distribution,
get_property_details_by_portfolio_id,
get_plan_by_portfolio_id,
get_properties_with_default_recommendations,
create_powerpoint,
create_recommendations_summary
)
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
USER_ID = 8
PORTFOLIO_ID_1 = 67
PORTFOLIO_ID_2 = 68
EPC_TARGET_1 = "C"
EPC_TARGET_2 = "A"
SAP_TARGET_1 = 69
SAP_TARGET_2 = 100
CUSTOMER_KEY = "gla-demo"
# Sample UPRNS
archetype_1_sample = ['100020604138', '200001192253', '100020581792', '100020576940', '200001187196', '100020618060',
'100020625813', '100020578756', '100020618076', '200001187197', '100020619814', '100020617489',
'100020588913']
archetype_2_sample = ['100020585002', '100020615603', '100020665652', '100020626800', '100020624347', '100020624348',
'100020576459', '10001007455', '100020666716', '100020609610', '100020625451', '100020625597',
'100020624351', '100020665634', '100020624350', '100020665640', '100020665632', '100022917303',
'100020665656', '10014055968', '100020630285', '100020665638', '100020616325', '100020637405',
'100020698027', '100020657902', '100020688226', '100020653786', '100020642337', '100020665643']
archetype_3_sample = ['100020594652', '100020697787', '100020577523', '100020633162', '100020601138', '100020595611',
'100020597485', '100020614883', '100020605342', '100020654671', '100020575611', '100020607980',
'200001185785', '100020616446', '100020692380']
archetype_4_sample = ['100020596436', '100020610165', '200001187539', '100020655500', '100020582907', '100020598277',
'100020650607', '100020605116', '100020650603']
def scenario_1():
# Connect to database
session = sessionmaker(bind=db_engine)()
########################################################################
# Get the data we need
########################################################################
portfolio_id = PORTFOLIO_ID_1
# Get the asset list
asset_list = read_csv_from_s3(
"retrofit-plan-inputs-dev", f"{USER_ID}/67/inputs.csv"
)
asset_list = pd.DataFrame(asset_list)
# Get the properties for the portfolio
properties = get_properties_with_default_recommendations(session, portfolio_id)
properties_df = pd.DataFrame(properties)
# We now pull the data for the property details
property_details = get_property_details_by_portfolio_id(session, portfolio_id)
property_details_df = pd.DataFrame(property_details)
# We estimate bills based on the adjusted_energy_consumption
property_details_df["energy_bill"] = property_details_df["adjusted_energy_consumption"].apply(
lambda x: AnnualBillSavings.calculate_annual_bill(x)
)
# Merge on uprn
property_details_df = property_details_df.merge(
properties_df[["uprn", "id"]].rename(columns={"id": "property_id"}),
on="property_id"
)
plans = get_plan_by_portfolio_id(session, portfolio_id)
plans_df = pd.DataFrame(plans)
# Unnest the recommendations. Each recommendation is a list of dictionaries
recommendations_exploded = properties_df["recommendations"].explode().tolist()
recommendations_df = pd.DataFrame([r for r in recommendations_exploded if not pd.isnull(r)])
# Add uprn on
recommendations_df = recommendations_df.merge(
properties_df[["uprn", "id"]].rename(columns={"id": "property_id"}),
how="left",
on="property_id"
)
recommendations_summary = create_recommendations_summary(
recommendations_df,
properties_df,
property_details_df,
SAP_TARGET_1
)
# Calculate % changes of energ, co2 and abs
recommendations_summary["carbon_percent_change"] = (
recommendations_summary["total_carbon"] / recommendations_summary["current_co2"]
)
recommendations_summary["energy_percent_change"] = (
recommendations_summary["adjusted_heat_demand"] / recommendations_summary["current_energy"]
)
recommendations_summary["bills_percent_change"] = (
recommendations_summary["total_bill_savings"] / recommendations_summary["current_energy_bill"]
)
########################
# Overview
########################
overview_totals = recommendations_summary.sum()
overview_means = recommendations_summary.mean()
########################
# Measures
########################
measures_count = recommendations_df.groupby("type")["id"].count().reset_index()
wall_insulation_measures = measures_count[
measures_count["type"].isin(["cavity_wall_insulation", "external_wall_insulation", "internal_wall_insulation"])
]["id"].sum()
ventilation_measures = measures_count[
measures_count["type"].isin(["mechanical_ventilation"])
]["id"].sum()
roof_insulation_measures = measures_count[
measures_count["type"].isin(["loft_insulation", "flat_roof_insulation"])
]["id"].sum()
floor_insulation_measures = measures_count[
measures_count["type"].isin(["solid_floor_insulation", "suspended_floor_insulation"])
]["id"].sum()
windows = measures_count[
measures_count["type"].isin(["windows_glazing"])
]["id"].sum()
heating = measures_count[
measures_count["type"].isin(["heating"])
]["id"].sum()
heating_controls = measures_count[
measures_count["type"].isin(["heating_control"])
]["id"].sum()
solar = measures_count[
measures_count["type"].isin(["solar_pv"])
]["id"].sum()
other = measures_count[
~measures_count["type"].isin([
"cavity_wall_insulation", "external_wall_insulation", "internal_wall_insulation",
"loft_insulation", "flat_roof_insulation", "solid_floor_insulation",
"suspended_floor_insulation", "windows_glazing", "heating", "heating_control", "solar_pv",
"mechanical_ventilation"
])
]["id"].sum()
# Summary information by each archetype
########################
# Archetype 1
########################
archetype_1 = asset_list[asset_list["archetype"] == "Archetype 1"]
recommendations_arch_1_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_1["uprn"].values)
]
arch_1_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_1["uprn"].values)
]
arch_1_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
cols_to_keep = ["total_cost", "total_carbon", "total_bill_savings", "total_sap_points", "adjusted_heat_demand",
"energy_percent_change", "carbon_percent_change", "bills_percent_change"]
arch_1_recommendation_min = recommendations_arch_1_summary.min()[cols_to_keep]
arch_1_recommendation_max = recommendations_arch_1_summary.max()[cols_to_keep]
arch_1_recommendation_means = recommendations_arch_1_summary.mean()[cols_to_keep]
arch_1_totals = recommendations_arch_1_summary.sum()[cols_to_keep]
annual_total_co2 = recommendations_arch_1_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_1_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_1_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_1["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_1_recommendation_means['total_cost'], 2)}: "
f"{arch_1_recommendation_min['total_cost']} - {arch_1_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_1_recommendation_means['total_sap_points'], 2)}: "
f"{arch_1_recommendation_min['total_sap_points']} - {arch_1_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_1_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_1_recommendation_min['adjusted_heat_demand']} - "
f"{arch_1_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_1_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_1_recommendation_min['energy_percent_change']} - "
f"{arch_1_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_1_recommendation_means['total_carbon'], 2)}: "
f"{arch_1_recommendation_min['total_carbon']} - {arch_1_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_1_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_1_recommendation_min['carbon_percent_change']} - "
f"{arch_1_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_1_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_1_recommendation_min['total_bill_savings']} - "
f"{arch_1_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_1_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_1_recommendation_min['bills_percent_change']} - "
f"{arch_1_recommendation_max['bills_percent_change']}")
########################
# Archetype 2
########################
archetype_2 = asset_list[asset_list["archetype"] == "Archetype 2"]
recommendations_arch_2_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_2["uprn"].values)
]
arch_2_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_2["uprn"].values)
]
arch_2_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_2_recommendation_min = recommendations_arch_2_summary.min()
arch_2_recommendation_max = recommendations_arch_2_summary.max()
arch_2_recommendation_means = recommendations_arch_2_summary.mean().round(2)
total_cost = recommendations_arch_2_summary["total_cost"].sum()
annual_total_co2 = recommendations_arch_2_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_2_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_2_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_2["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_2_recommendation_means['total_cost'], 2)}: "
f"{arch_2_recommendation_min['total_cost']} - {arch_2_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_2_recommendation_means['total_sap_points'], 2)}: "
f"{arch_2_recommendation_min['total_sap_points']} - {arch_2_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_2_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_2_recommendation_min['adjusted_heat_demand']} - "
f"{arch_2_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_2_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_2_recommendation_min['energy_percent_change']} - "
f"{arch_2_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_2_recommendation_means['total_carbon'], 2)}: "
f"{arch_2_recommendation_min['total_carbon']} - {arch_2_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_2_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_2_recommendation_min['carbon_percent_change']} - "
f"{arch_2_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_2_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_2_recommendation_min['total_bill_savings']} - "
f"{arch_2_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_2_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_2_recommendation_min['bills_percent_change']} - "
f"{arch_2_recommendation_max['bills_percent_change']}")
########################
# Archetype 3
########################
archetype_3 = asset_list[asset_list["archetype"] == "Archetype 3"]
recommendations_arch_3_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_3["uprn"].values)
]
arch_3_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_3["uprn"].values)
]
arch_3_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_3_recommendation_min = recommendations_arch_3_summary.min()
arch_3_recommendation_max = recommendations_arch_3_summary.max()
arch_3_recommendation_means = recommendations_arch_3_summary.mean()
total_cost = recommendations_arch_3_summary["total_cost"].sum()
annual_total_co2 = recommendations_arch_3_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_3_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_3_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_3["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_3_recommendation_means['total_cost'], 2)}: "
f"{arch_3_recommendation_min['total_cost']} - {arch_3_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_3_recommendation_means['total_sap_points'], 2)}: "
f"{arch_3_recommendation_min['total_sap_points']} - {arch_3_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_3_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_3_recommendation_min['adjusted_heat_demand']} - "
f"{arch_3_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_3_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_3_recommendation_min['energy_percent_change']} - "
f"{arch_3_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_3_recommendation_means['total_carbon'], 2)}: "
f"{arch_3_recommendation_min['total_carbon']} - {arch_3_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_3_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_3_recommendation_min['carbon_percent_change']} - "
f"{arch_3_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_3_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_3_recommendation_min['total_bill_savings']} - "
f"{arch_3_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_3_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_3_recommendation_min['bills_percent_change']} - "
f"{arch_3_recommendation_max['bills_percent_change']}")
########################
# Archetype 4
########################
archetype_4 = asset_list[asset_list["archetype"] == "Archetype 4"]
recommendations_arch_4_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_4["uprn"].values)
]
arch_4_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_4["uprn"].values)
]
arch_4_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_4_recommendation_min = recommendations_arch_4_summary.min()
arch_4_recommendation_max = recommendations_arch_4_summary.max()
arch_4_recommendation_means = recommendations_arch_4_summary.mean()
total_cost = recommendations_arch_4_summary["total_cost"].sum()
annual_total_co2 = recommendations_arch_4_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_4_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_4_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_4["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_4_recommendation_means['total_cost'], 2)}: "
f"{arch_4_recommendation_min['total_cost']} - {arch_4_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_4_recommendation_means['total_sap_points'], 2)}: "
f"{arch_4_recommendation_min['total_sap_points']} - {arch_4_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_4_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_4_recommendation_min['adjusted_heat_demand']} - "
f"{arch_4_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_4_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_4_recommendation_min['energy_percent_change']} - "
f"{arch_4_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_4_recommendation_means['total_carbon'], 2)}: "
f"{arch_4_recommendation_min['total_carbon']} - {arch_4_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_4_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_4_recommendation_min['carbon_percent_change']} - "
f"{arch_4_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_4_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_4_recommendation_min['total_bill_savings']} - "
f"{arch_4_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_4_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_4_recommendation_min['bills_percent_change']} - "
f"{arch_4_recommendation_max['bills_percent_change']}")
########################
# Overview
########################
overview_totals = recommendations_summary.sum()
def make_sample():
# sample_proportion = 67 / 102
# Get the asset list
asset_list = read_csv_from_s3(
"retrofit-plan-inputs-dev", f"{USER_ID}/67/inputs.csv"
)
asset_list = pd.DataFrame(asset_list)
# From the asset list, we deduce how many properties we need
# Need to figure out the sizes
archetype_1_sample_size = 13
archetype_2_sample_size = 30
archetype_3_sample_size = 15
archetype_4_sample_size = 9
# We take the sample and we'll keep the uprns static
archetype_1_sample = asset_list[
asset_list["archetype"] == "Archetype 1"
].sample(archetype_1_sample_size)["uprn"].to_list()
archetype_2_sample = asset_list[
asset_list["archetype"] == "Archetype 2"
].sample(archetype_2_sample_size)["uprn"].to_list()
archetype_3_sample = asset_list[
asset_list["archetype"] == "Archetype 3"
].sample(archetype_3_sample_size)["uprn"].to_list()
archetype_4_sample = asset_list[
asset_list["archetype"] == "Archetype 4"
].sample(archetype_4_sample_size)["uprn"].to_list()
def scenario_2():
# Connect to database
session = sessionmaker(bind=db_engine)()
########################################################################
# Get the data we need
########################################################################
portfolio_id = PORTFOLIO_ID_2
# Get the asset list
asset_list = read_csv_from_s3(
"retrofit-plan-inputs-dev", f"{USER_ID}/67/inputs.csv"
)
asset_list = pd.DataFrame(asset_list)
sample_uprns = archetype_1_sample + archetype_2_sample + archetype_3_sample + archetype_4_sample
# Filter on sample uprns
asset_list = asset_list[asset_list["uprn"].astype(str).isin(sample_uprns)]
# Get the properties for the portfolio
properties = get_properties_with_default_recommendations(session, portfolio_id)
properties_df = pd.DataFrame(properties)
properties_df = properties_df[properties_df["uprn"].astype(str).isin(sample_uprns)]
# We now pull the data for the property details
property_details = get_property_details_by_portfolio_id(session, portfolio_id)
property_details_df = pd.DataFrame(property_details)
property_details_df = property_details_df[property_details_df["property_id"].isin(properties_df["id"].values)]
# We estimate bills based on the adjusted_energy_consumption
property_details_df["energy_bill"] = property_details_df["adjusted_energy_consumption"].apply(
lambda x: AnnualBillSavings.calculate_annual_bill(x)
)
# Merge on uprn
property_details_df = property_details_df.merge(
properties_df[["uprn", "id"]].rename(columns={"id": "property_id"}),
on="property_id"
)
plans = get_plan_by_portfolio_id(session, portfolio_id)
plans_df = pd.DataFrame(plans)
# Unnest the recommendations. Each recommendation is a list of dictionaries
recommendations_exploded = properties_df["recommendations"].explode().tolist()
recommendations_df = pd.DataFrame([r for r in recommendations_exploded if not pd.isnull(r)])
# Add uprn on
recommendations_df = recommendations_df.merge(
properties_df[["uprn", "id"]].rename(columns={"id": "property_id"}),
how="left",
on="property_id"
)
recommendations_summary = create_recommendations_summary(
recommendations_df,
properties_df,
property_details_df,
SAP_TARGET_1
)
# Calculate % changes of energ, co2 and abs
recommendations_summary["carbon_percent_change"] = (
recommendations_summary["total_carbon"] / recommendations_summary["current_co2"]
)
recommendations_summary["energy_percent_change"] = (
recommendations_summary["adjusted_heat_demand"] / recommendations_summary["current_energy"]
)
recommendations_summary["bills_percent_change"] = (
recommendations_summary["total_bill_savings"] / recommendations_summary["current_energy_bill"]
)
########################
# Overview
########################
overview_totals = recommendations_summary.sum()
overview_means = recommendations_summary.mean()
########################
# Measures
########################
measures_count = recommendations_df.groupby("type")["id"].count().reset_index()
wall_insulation_measures = measures_count[
measures_count["type"].isin(["cavity_wall_insulation", "external_wall_insulation", "internal_wall_insulation"])
]["id"].sum()
ventilation_measures = measures_count[
measures_count["type"].isin(["mechanical_ventilation"])
]["id"].sum()
roof_insulation_measures = measures_count[
measures_count["type"].isin(["loft_insulation", "flat_roof_insulation"])
]["id"].sum()
floor_insulation_measures = measures_count[
measures_count["type"].isin(["solid_floor_insulation", "suspended_floor_insulation"])
]["id"].sum()
windows = measures_count[
measures_count["type"].isin(["windows_glazing"])
]["id"].sum()
heating = measures_count[
measures_count["type"].isin(["heating"])
]["id"].sum()
heating_controls = measures_count[
measures_count["type"].isin(["heating_control"])
]["id"].sum()
solar = measures_count[
measures_count["type"].isin(["solar_pv"])
]["id"].sum()
other = measures_count[
~measures_count["type"].isin([
"cavity_wall_insulation", "external_wall_insulation", "internal_wall_insulation",
"loft_insulation", "flat_roof_insulation", "solid_floor_insulation",
"suspended_floor_insulation", "windows_glazing", "heating", "heating_control", "solar_pv",
"mechanical_ventilation"
])
]["id"].sum()
z = recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_3_sample)]
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_3_sample)]["type"].value_counts()
# Summary information by each archetype
########################
# Archetype 1
########################
archetype_1 = asset_list[asset_list["archetype"] == "Archetype 1"]
recommendations_arch_1_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_1["uprn"].values)
]
arch_1_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_1["uprn"].values)
]
arch_1_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_1_recommendation_min = recommendations_arch_1_summary.min()
arch_1_recommendation_max = recommendations_arch_1_summary.max()
arch_1_recommendation_means = recommendations_arch_1_summary.mean()
arch_1_totals = recommendations_arch_1_summary.sum()
annual_total_co2 = recommendations_arch_1_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_1_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_1_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_1["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_1_recommendation_means['total_cost'], 2)}: "
f"{arch_1_recommendation_min['total_cost']} - {arch_1_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_1_recommendation_means['total_sap_points'], 2)}: "
f"{arch_1_recommendation_min['total_sap_points']} - {arch_1_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_1_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_1_recommendation_min['adjusted_heat_demand']} - "
f"{arch_1_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_1_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_1_recommendation_min['energy_percent_change']} - "
f"{arch_1_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_1_recommendation_means['total_carbon'], 2)}: "
f"{arch_1_recommendation_min['total_carbon']} - {arch_1_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_1_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_1_recommendation_min['carbon_percent_change']} - "
f"{arch_1_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_1_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_1_recommendation_min['total_bill_savings']} - "
f"{arch_1_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_1_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_1_recommendation_min['bills_percent_change']} - "
f"{arch_1_recommendation_max['bills_percent_change']}")
########################
# Archetype 2
########################
archetype_2 = asset_list[asset_list["archetype"] == "Archetype 2"]
recommendations_arch_2_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_2["uprn"].values)
]
arch_2_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_2["uprn"].values)
]
arch_2_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_2_recommendation_min = recommendations_arch_2_summary.min()
arch_2_recommendation_max = recommendations_arch_2_summary.max()
arch_2_recommendation_means = recommendations_arch_2_summary.mean().round(2)
total_cost = recommendations_arch_2_summary["total_cost"].sum()
annual_total_co2 = recommendations_arch_2_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_2_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_2_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_2["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_2_recommendation_means['total_cost'], 2)}: "
f"{arch_2_recommendation_min['total_cost']} - {arch_2_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_2_recommendation_means['total_sap_points'], 2)}: "
f"{arch_2_recommendation_min['total_sap_points']} - {arch_2_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_2_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_2_recommendation_min['adjusted_heat_demand']} - "
f"{arch_2_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_2_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_2_recommendation_min['energy_percent_change']} - "
f"{arch_2_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_2_recommendation_means['total_carbon'], 2)}: "
f"{arch_2_recommendation_min['total_carbon']} - {arch_2_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_2_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_2_recommendation_min['carbon_percent_change']} - "
f"{arch_2_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_2_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_2_recommendation_min['total_bill_savings']} - "
f"{arch_2_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_2_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_2_recommendation_min['bills_percent_change']} - "
f"{arch_2_recommendation_max['bills_percent_change']}")
########################
# Archetype 3
########################
archetype_3 = asset_list[asset_list["archetype"] == "Archetype 3"]
recommendations_arch_3_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_3["uprn"].values)
]
arch_3_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_3["uprn"].values)
]
arch_3_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_3_recommendation_min = recommendations_arch_3_summary.min()
arch_3_recommendation_max = recommendations_arch_3_summary.max()
arch_3_recommendation_means = recommendations_arch_3_summary.mean()
total_cost = recommendations_arch_3_summary["total_cost"].sum()
annual_total_co2 = recommendations_arch_3_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_3_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_3_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_3["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_3_recommendation_means['total_cost'], 2)}: "
f"{arch_3_recommendation_min['total_cost']} - {arch_3_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_3_recommendation_means['total_sap_points'], 2)}: "
f"{arch_3_recommendation_min['total_sap_points']} - {arch_3_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_3_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_3_recommendation_min['adjusted_heat_demand']} - "
f"{arch_3_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_3_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_3_recommendation_min['energy_percent_change']} - "
f"{arch_3_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_3_recommendation_means['total_carbon'], 2)}: "
f"{arch_3_recommendation_min['total_carbon']} - {arch_3_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_3_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_3_recommendation_min['carbon_percent_change']} - "
f"{arch_3_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_3_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_3_recommendation_min['total_bill_savings']} - "
f"{arch_3_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_3_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_3_recommendation_min['bills_percent_change']} - "
f"{arch_3_recommendation_max['bills_percent_change']}")
########################
# Archetype 4
########################
archetype_4 = asset_list[asset_list["archetype"] == "Archetype 4"]
recommendations_arch_4_summary = recommendations_summary[
recommendations_summary["uprn"].astype(str).isin(archetype_4["uprn"].values)
]
arch_4_property_details = property_details_df[
property_details_df["uprn"].astype(str).isin(archetype_4["uprn"].values)
]
arch_4_property_details["co2_emissions"].sum() / property_details_df["co2_emissions"].sum()
# Take the mean, median and maximum of each value
arch_4_recommendation_min = recommendations_arch_4_summary.min()
arch_4_recommendation_max = recommendations_arch_4_summary.max()
arch_4_recommendation_means = recommendations_arch_4_summary.mean()
total_cost = recommendations_arch_4_summary["total_cost"].sum()
annual_total_co2 = recommendations_arch_4_summary["total_carbon"].sum()
annual_total_bills = recommendations_arch_4_summary["total_bill_savings"].sum()
annual_total_energy_savings = recommendations_arch_4_summary["adjusted_heat_demand"].sum()
archetype_measures = \
recommendations_df[recommendations_df["uprn"].astype(str).isin(archetype_4["uprn"].values)].groupby("type")[
"id"].count().reset_index()
cost_text = (f"{round(arch_4_recommendation_means['total_cost'], 2)}: "
f"{arch_4_recommendation_min['total_cost']} - {arch_4_recommendation_max['total_cost']}")
sap_text = (f"{round(arch_4_recommendation_means['total_sap_points'], 2)}: "
f"{arch_4_recommendation_min['total_sap_points']} - {arch_4_recommendation_max['total_sap_points']}")
energy_text = (f"{round(arch_4_recommendation_means['adjusted_heat_demand'], 2)}: "
f"{arch_4_recommendation_min['adjusted_heat_demand']} - "
f"{arch_4_recommendation_max['adjusted_heat_demand']}")
energy_percent_text = (f"{round(arch_4_recommendation_means['energy_percent_change'], 2)}: "
f"{arch_4_recommendation_min['energy_percent_change']} - "
f"{arch_4_recommendation_max['energy_percent_change']}")
carbon_text = (f"{round(arch_4_recommendation_means['total_carbon'], 2)}: "
f"{arch_4_recommendation_min['total_carbon']} - {arch_4_recommendation_max['total_carbon']}")
carbon_percent_text = (f"{round(arch_4_recommendation_means['carbon_percent_change'], 2)}: "
f"{arch_4_recommendation_min['carbon_percent_change']} - "
f"{arch_4_recommendation_max['carbon_percent_change']}")
bill_text = (f"{round(arch_4_recommendation_means['total_bill_savings'], 2)}: "
f"{arch_4_recommendation_min['total_bill_savings']} - "
f"{arch_4_recommendation_max['total_bill_savings']}")
bill_percent_text = (f"{round(arch_4_recommendation_means['bills_percent_change'], 2)}: "
f"{arch_4_recommendation_min['bills_percent_change']} - "
f"{arch_4_recommendation_max['bills_percent_change']}")

View file

@ -246,7 +246,7 @@ def create_powerpoint(data, save_location):
prs.save(save_location)
def create_recommendations_summary(recommendations_df, properties_df, sap_target):
def create_recommendations_summary(recommendations_df, properties_df, property_details_df, sap_target):
# Aggregate the impact of the recommendations
# We want:
# Total number of sap points
@ -259,13 +259,15 @@ def create_recommendations_summary(recommendations_df, properties_df, sap_target
total_valuation_impact=("property_valuation_increase", "sum"),
total_bill_savings=("energy_cost_savings", "sum"),
total_cost=("estimated_cost", "sum"),
total_carbon=("co2_equivalent_savings", "sum")
total_carbon=("co2_equivalent_savings", "sum"),
adjusted_heat_demand=("adjusted_heat_demand", "sum")
).reset_index()
# Merge on current sap points
# Merge on current sap points, current CO2, current adjusted_heat_demand, current annual bill
recommendations_summary = recommendations_summary.merge(
properties_df[["id", "uprn", "current_sap_points"]].rename(columns={"id": "property_id"}), on="property_id",
how="left"
)
recommendations_summary["expected_sap_points"] = (
recommendations_summary["current_sap_points"] + recommendations_summary["total_sap_points"]
)
@ -274,4 +276,18 @@ def create_recommendations_summary(recommendations_df, properties_df, sap_target
)
recommendations_summary["sap_difference"] = sap_target - recommendations_summary["expected_sap_points"]
if property_details_df is not None:
recommendations_summary = recommendations_summary.merge(
property_details_df[["uprn", "co2_emissions", "adjusted_energy_consumption", "energy_bill"]].rename(
columns={
"id": "property_id",
"co2_emissions": "current_co2",
"adjusted_energy_consumption": "current_energy",
"energy_bill": "current_energy_bill"
}
),
on="uprn",
how="left"
)
return recommendations_summary

View file

@ -3459,7 +3459,7 @@ class DataLoader:
"not eligible",
asset_list["ECO Eligibility"]
)
asset_list = asset_list.drop(columns=["has_eco3"])
# asset_list = asset_list.drop(columns=["has_eco3"])
# Report on sales
sales_report = {}
@ -6692,6 +6692,469 @@ def create_final_report():
revenue.to_csv("HA Analysis Final - revenue.csv")
def identify_eco_works(loader):
# ha_names = [
# "HA16", # For Housing
# "HA39", # Rooftop
# "HA41", # Settle
# "HA23", # Lambeth
# "HA14", # EMH
# "HA7", # Believe
# "HA102", # Thrive
# ]
# Unitas, fairhive, acis, LHP
ha_names = [
"HA50", # Unitas
"HA15", # Fairhive
"HA107", # ACIS
"HA24", # LHP
]
names = {
"HA50": "Unitas",
"HA15": "Fairhive",
"HA107": "ACIS",
"HA24": "LHP"
}
# gbis rate
breakdowns = []
# lists = {}
for ha, data_assets in loader.data.items():
if ha not in ha_names:
continue
asset_list = data_assets["asset_list"].copy()
survey_list = data_assets["survey_list"].copy()
# Remove things that have sold
if not survey_list.empty:
asset_list = asset_list.merge(
survey_list[["asset_list_row_id", "installation_status"]],
how="left",
on="asset_list_row_id"
)
# Anything that has an installation has gone to installation, and therefore is not remaining
asset_list = asset_list[pd.isnull(asset_list["installation_status"])]
asset_list = asset_list.drop(columns=["installation_status"])
# Needing a CIGA check
needs_cga = asset_list[
asset_list["ECO Eligibility"] == "eco4 (subject to ciga)"
].copy()
eco4 = asset_list[
asset_list["ECO Eligibility"] == "eco4"
].copy()
eco4_passed_ciga = asset_list[
asset_list["ECO Eligibility"] == "eco4 - passed ciga"
].copy()
# lists[ha] = {
# "needs_cga": needs_cga,
# "eco4": eco4,
# "eco4_passed_ciga": eco4_passed_ciga
# }
# Store the data
if not needs_cga.empty:
needs_cga.to_csv(f"local_data/{names[ha]} - needs ciga.csv")
if not eco4.empty:
eco4.to_csv(f"local_data/{names[ha]} - eco4.csv")
if not eco4_passed_ciga.empty:
eco4_passed_ciga.to_csv(f"local_data/{names[ha]} - eco4 passed ciga.csv")
summary = {
"HA Name": ha,
"n_needing_ciga": needs_cga.shape[0],
"eco4": eco4.shape[0],
"eco4_passed_ciga": eco4_passed_ciga.shape[0]
}
breakdowns.append(summary)
breakdowns = pd.DataFrame(breakdowns)
breakdowns = breakdowns.fillna(0)
def unitas_data_prep(loader):
#####
# Adhoc - for UNITAS, stripping out additional surveys that have been completed
unitas_data = loader.data["HA50"].copy()
unitas_asset_list = unitas_data["asset_list"].copy()
unitas_survey_sheet = unitas_data["survey_list"].copy()
# We remove the surveyed properties from the asset sheet
unitas_survey_sheet = unitas_survey_sheet[~pd.isnull(unitas_survey_sheet["asset_list_row_id"])]
unitas_asset_list = unitas_asset_list.merge(
unitas_survey_sheet[["asset_list_row_id", "installation_status"]],
how="left",
on="asset_list_row_id"
)
unitas_asset_list = unitas_asset_list[pd.isnull(unitas_asset_list["installation_status"])]
unitas_asset_list = unitas_asset_list.drop(columns=["installation_status"])
# We read in the data for the further completed surveys
unitas_phase_1_workbook = openpyxl.load_workbook(
"local_data/ha_data/UNITAS ( STOKE) MASTER ROLLING SHEET UPDATED 8.4.24 K - no password.xlsx"
)
phase_1_worksheet = unitas_phase_1_workbook["ECO 4 - PHASE 1"]
phase_2_worksheet = unitas_phase_1_workbook["ECO4 - PHASE 2"]
phase1_colnames = [cell.value for cell in phase_1_worksheet[1]]
phase_1_rows_data = []
for row in phase_1_worksheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
phase_1_rows_data.append(row_data)
phase_1_surveys = pd.DataFrame(phase_1_rows_data, columns=phase1_colnames)
# Correct phase 1 surveys in the same fashion as the previous approach
phase_1_surveys = DataLoader.correct_ha50_survey_list(phase_1_surveys.copy())
# We check all phase 1 surveys are contained in the data we had before
additional = []
for _, row in tqdm(phase_1_surveys.iterrows(), total=len(phase_1_surveys)):
# We look for the entry in the old survey sheet:
# matched_uprn = unitas_survey_sheet[unitas_survey_sheet["EPR UPRN NUMBER"] == row["UPRN"]]
# if matched_uprn.shape[0] == 1:
# continue
matched_1 = unitas_survey_sheet[
(unitas_survey_sheet["Post Code"] == row["Post Code"]) &
(unitas_survey_sheet["NO."] == row["NO."])
]
if matched_1.shape[0] == 1:
continue
matched_2 = unitas_survey_sheet[
(unitas_survey_sheet["Street / Block Name"] == row["Street / Block Name"]) &
(unitas_survey_sheet["NO."] == row["NO."])
]
if matched_2.shape[0] == 1:
continue
additional.append(row.to_dict())
additional = pd.DataFrame(additional)
phase_2_rows_data = []
for row in phase_2_worksheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
phase_2_rows_data.append(row_data)
phase2_colnames = [cell.value for cell in phase_2_worksheet[1]]
phase_2_surveys = pd.DataFrame(phase_2_rows_data, columns=phase2_colnames)
# Drop all of the occurances of "OFFICE USE ONLY" columns
phase_2_surveys = phase_2_surveys.drop(columns=[c for c in phase_2_surveys.columns if "OFFICE USE ONLY" in c])
common_columns = list({c for c in phase_2_surveys.columns if c in additional.columns})
additional_filtered = additional[common_columns]
further_unitas_completed_surveys = pd.concat(
[phase_2_surveys, additional_filtered],
axis=0,
ignore_index=True
)
# Add a phase 2 key
further_unitas_completed_surveys["survey_list_row_id"] = [
"unitas_phase_2" + str(i) for i in further_unitas_completed_surveys.index
]
not_in_asset_list = [
"unitas_phase_20", "unitas_phase_234", "unitas_phase_2163", "unitas_phase_2173", "unitas_phase_2374"
]
additional_postcodes = ["st28bg"]
full_asset_list = unitas_data["asset_list"].copy()
full_asset_list["matching_postcode"] = full_asset_list["matching_postcode"].str.lower().str.replace(" ", "")
further_unitas_completed_surveys["Post Code"] = further_unitas_completed_surveys["Post Code"].str.replace(
"ST 5DT", "ST3 5DT"
)
# We match these back to the asset list
matching_lookup = []
for _, row in tqdm(further_unitas_completed_surveys.iterrows(), total=len(further_unitas_completed_surveys)):
if row["survey_list_row_id"] in not_in_asset_list:
continue
postcode_lower = row["Post Code"].lower().strip().replace(" ", "")
if postcode_lower in additional_postcodes:
continue
# Confirmed not in asset lsit
# Filter asset list on postcode
df = full_asset_list[
full_asset_list["matching_postcode"].str.contains(postcode_lower)
]
df = df[df["HouseNo"] == str(row["NO."])]
if df.shape[0] != 1:
raise Exception("NOT FOUND")
matching_lookup.append(
{
"survey_list_row_id": row["survey_list_row_id"],
"asset_list_row_id": df["asset_list_row_id"].values[0],
}
)
matching_lookup = pd.DataFrame(matching_lookup)
matching_lookup["phase_2_surveyed"] = True
# We merge this onto the asset list and remove the rows
unitas_asset_list = unitas_asset_list.merge(
matching_lookup, how="left", on="asset_list_row_id"
)
# Drop rows where phase_2_surveyed is populated
unitas_asset_list = unitas_asset_list[
pd.isnull(unitas_asset_list["phase_2_surveyed"])
]
# We add in the new CIGA submissions
unitas_round_2_ciga_workbook = openpyxl.load_workbook("local_data/ha_data/Unitas second round CIGA checks.xlsx")
ciga_round_2_worksheet = unitas_round_2_ciga_workbook["Worksheet"]
ciga_round_2_colnames = [cell.value for cell in ciga_round_2_worksheet[1]]
round_2_rows_data = []
for row in ciga_round_2_worksheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
round_2_rows_data.append(row_data)
ciga_round_2 = pd.DataFrame(round_2_rows_data, columns=ciga_round_2_colnames)
# We merge the ciga sheet to the asset list
ciga_dependent_asset_list = unitas_asset_list[
unitas_asset_list["ECO Eligibility"].str.contains("subject to ciga")
].copy()
# We merge the ciga sheet to the asset list
ciga_round_2_matched = ciga_dependent_asset_list.merge(
ciga_round_2, how="inner", on=["Address Line 1", "Post Code"]
)
# Filter on just the properties that had no guarantee
ciga_round_2_matched = ciga_round_2_matched[ciga_round_2_matched["Guarantee"] == "No"]
# ECO Eligibility
# not eligible 9227
# failed ciga 2711
# eco4 (subject to ciga) 2238
# eco4 - passed ciga 901
# gbis 114
# eco4 91
# We filter on the properties we're looking to re-survey
unitas_properties_to_survey = unitas_asset_list[
unitas_asset_list["ECO Eligibility"].isin(
[
"eco4 - passed ciga",
"eco4"
]
)
].copy()
unitas_properties_to_survey = pd.concat(
[
unitas_properties_to_survey,
ciga_round_2_matched[unitas_properties_to_survey.columns]
]
)
epc_api_key = "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA="
# We now retrieve the lastest EPC data
epc_data = []
for _, unitas_property in tqdm(unitas_properties_to_survey.iterrows(), total=len(unitas_properties_to_survey)):
property_type, _ = get_property_type_and_built_form(property_meta=unitas_property, ha_name="HA50")
full_address = unitas_property["matching_address"]
searcher = SearchEpc(
address1=str(unitas_property["HouseNo"]),
postcode=unitas_property["matching_postcode"],
auth_token=epc_api_key,
os_api_key="",
property_type=property_type,
full_address=full_address,
fast=True
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
epc = {
"asset_list_row_id": unitas_property["asset_list_row_id"],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
epc_df = pd.DataFrame(epc_data)
# Pull out just the columns we need
epc_df = epc_df[
[
"asset_list_row_id",
"address1", "postcode",
"current-energy-efficiency",
"current-energy-rating",
"inspection-date",
"transaction-type",
"built-form"
]
]
epc_df["EPC Rating"] = (
epc_df["current-energy-efficiency"].astype(str) +
epc_df["current-energy-rating"].astype(str)
)
# Merge onto the Unitas data:
unitas_properties_to_survey_full = unitas_properties_to_survey.merge(
epc_df[
[
"asset_list_row_id",
"EPC Rating",
"inspection-date",
"transaction-type",
"built-form"
]
],
how="left",
on="asset_list_row_id"
)
unitas_properties_to_survey_full["ECO Eligibility"] = unitas_properties_to_survey_full["ECO Eligibility"].replace(
"eco4 (subject to ciga)", "eco4 - passed ciga, phase 2 check"
)
for col in ["EPC Rating", "inspection-date", "transaction-type", "built-form"]:
unitas_properties_to_survey_full[col] = np.where(
pd.isnull(unitas_properties_to_survey_full[col]),
"No EPC found",
unitas_properties_to_survey_full[col]
)
unitas_properties_to_survey_full[col] = unitas_properties_to_survey_full[col].fillna(
"No EPC found"
)
unitas_properties_to_survey_full[col] = unitas_properties_to_survey_full[col].astype(str)
unitas_properties_to_survey_full = unitas_properties_to_survey_full.rename(
columns={
"inspection-date": "Last EPC Inspection Date",
"transaction-type": "Last EPC Reason",
"built-form": "Last EPC Built Form",
}
)
# We now match to the survey outcomes
unitas_survey_outcomes_workbook = openpyxl.load_workbook(
"local_data/ha_data/UNITAS - survey outcomes 26.03.2024.xlsx"
)
unitas_survey_outcomes_worksheet = unitas_survey_outcomes_workbook["OUTCOMES"]
unitas_outcomes_colnames = [cell.value for cell in unitas_survey_outcomes_worksheet[2]]
outcomes_rows_data = []
for row in unitas_survey_outcomes_worksheet.iter_rows(min_row=3, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
outcomes_rows_data.append(row_data)
unitas_outcomes = pd.DataFrame(outcomes_rows_data, columns=unitas_outcomes_colnames)
unitas_outcomes = unitas_outcomes.rename(
columns={
"Notes (If 'no answer' under outcomes, have you checked around the property for access "
"issues where possible?)": "Notes"
}
)
unitas_outcomes["Postcode"].unique()
eg1 = unitas_properties_to_survey_full[
(unitas_properties_to_survey_full["Post Code"] == "ST6 6RF")
]
eg1_outcomes = unitas_outcomes[
(unitas_outcomes["Postcode"] == "ST6 6RF")
]
# Merge outcomes onto properties to survey. Will probably have to do algorithmically
full_asset_list["matching_postcode_nospace"] = full_asset_list["matching_postcode"].str.lower().str.replace(" ", "")
outcome_matching = []
for _, outcome in tqdm(unitas_outcomes.iterrows(), total=len(unitas_outcomes)):
# We search for the corresponding entry in the asset list
postcode_lower = outcome["Postcode"].lower().strip().replace(" ", "")
# Confirmed not in asset lsit
# Filter asset list on postcode
df = unitas_properties_to_survey_full[
unitas_properties_to_survey_full["matching_postcode_nospace"].str.contains(postcode_lower)
]
df = df[df["HouseNo"] == str(outcome["No."])]
if df.empty:
continue
if df.shape[0] == 1:
outcome_matching.append(
{
"asset_list_row_id": df["asset_list_row_id"].values[0],
**outcome.to_dict()
}
)
continue
raise Exception("something went wrong")
outcome_matching = pd.DataFrame(outcome_matching)
# We can have duplicate matches, so we format the Date letter sent column and retrieve the newest outcome
outcome_matching["Date letters sent"] = outcome_matching["Date letters sent"].str.lower()
outcome_matching["Extracted Date"] = outcome_matching["Date letters sent"].str.extract(
r'(?:w[./]c )(\d{2}\.\d{2}\.\d{4})')
outcome_matching["Extracted Date"] = pd.to_datetime(outcome_matching["Extracted Date"], format='%d.%m.%Y')
# We sort by asset_list_row_id and extracted date, and retrieve the newest
outcome_matching = outcome_matching.sort_values(["asset_list_row_id", "Extracted Date"], ascending=[True, False])
# Some properties will have multiple outcomes - for these, we re-format
outcome_matching_grouped = []
for asset_list_row_id, grouped_data in outcome_matching.groupby("asset_list_row_id"):
if grouped_data.shape[0] == 1:
outcome_matching_grouped.append(
{
"Number of previous visits": 1,
**grouped_data.to_dict("records")[0]
}
)
continue
if grouped_data.shape[0] == 2:
newest_visit = grouped_data.head(1)
oldest_visit = grouped_data.tail(1)[['Outcomes', 'Surveyor', 'Notes', 'Date letters sent']].add_suffix(
" second visit")
to_append = {
"Number of previous visits": 2,
**newest_visit.to_dict("records")[0],
**oldest_visit.to_dict("records")[0]
}
outcome_matching_grouped.append(to_append)
else:
raise Exception("something went wrong")
outcome_matching_grouped = pd.DataFrame(outcome_matching_grouped)
unitas_properties_to_survey_with_outcomes = unitas_properties_to_survey_full.merge(
outcome_matching_grouped, how="left", on="asset_list_row_id"
)
unitas_properties_to_survey_with_outcomes["Number of previous visits"] = (
unitas_properties_to_survey_with_outcomes["Number of previous visits"].fillna(0)
)
# Store as an excel
unitas_properties_to_survey_with_outcomes.to_excel("Unitas - phase 2 properties to Survey.xlsx")
unitas_properties_to_survey_with_outcomes["Last EPC Built Form"].value_counts()
def app():
"""
This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107.
@ -6739,29 +7202,8 @@ def app():
loader = DataLoader(directories, december_figures_filepath, use_cache, rebuild_inputs)
loader.load()
loader.ha_facts_and_figures()
forecast_remaining_sales(loader)
# gbis rate
# breakdowns = []
# for ha, data_assets in loader.data.items():
# asset_list = data_assets["asset_list"].copy()
# breakdown = asset_list["ECO Eligibility"].value_counts().to_dict()
# breakdowns.append(breakdown)
# breakdowns = pd.DataFrame(breakdowns)
#
# installer = []
# for ha, data_assets in loader.data.items():
# survey_list = data_assets["survey_list"]
# if survey_list.empty:
# continue
# if "INSTALLER" not in survey_list.columns:
# continue
#
# installers = survey_list["INSTALLER"].value_counts().to_dict()
# installers["ha_name"] = ha
# installer.append(installers)
# installer = pd.DataFrame(installer)
# installer.drop(columns=["ha_name"]).sum().sum()
forecast_remaining_sales(loader)
# Adhoc - for HA16, get the properties that still need a CIGA check
asset_list_ha16 = loader.data["HA16"]["asset_list"].copy()

View file

@ -42,7 +42,42 @@ BATTERY_COST = 3500
# This is based on https://www.checkatrade.com/blog/cost-guides/cost-smart-thermostat/
SMART_APPLIANCE_THERMOSTAT_COST = 400
PROGRAMMER_COST = 200
PROGRAMMER_COST = 120
ROOM_THERMOSTAT_COST = 150
TRVS_COST = 35
# Cost for TTZC
# Smart thermostat based on checkatrade https://www.checkatrade.com/blog/cost-guides/cost-smart-thermostat/
# Based on the Nest system
TTZC_SMART_THERMOSTAT_COST = 205
TTZC_SMART_THERMOSTAT_LABOUR_HOURS = 2
TTZC_ELECTRICIAN_HOURLY_RATE = 45
# Based on cost of a Nest temperature sensor
TTZC_ROOM_TEMPERATURE_SENSOR_COST = 50
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS = 0.17 # (Assume ~ 10 mins install per sensor)
# Basedon an average cost of smart radiator values
TTZC_SMART_RADIATOR_VALUES = 50
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS = 0.37 # (Assume ~ 15-30 mins install per valve)
# Low carbon combi boiler - median value based on £2200 - £3000 range
LOW_CARBON_COMBI_BOILER = 2200
# boiler prices based on
# https://www.greenmatch.co.uk/boilers/30kw-boiler
# https://www.greenmatch.co.uk/boilers/35kw-boiler
# https://www.greenmatch.co.uk/boilers/40kw-boiler
# These are exclusive of installation costs
COMBI_BOILER_COSTS = {
"30kw": 1550,
"35kw": 1610,
"40kw": 1625
}
CONVENTIONAL_BOILER_COSTS = {
"30kw": 1117,
"35kw": 1546,
"40kw": 1776
}
class Costs:
@ -998,3 +1033,100 @@ class Costs:
"labour_hours": 0,
"labour_days": 0,
}
def roomstat_programmer_trvs(
self, number_heated_rooms, has_programmer, has_trvs, has_room_thermostat
):
"""
:return:
"""
total_cost = 0
labour_hours = 0
if not has_programmer:
total_cost += PROGRAMMER_COST
labour_hours += 1
if not has_trvs:
total_cost += TRVS_COST * number_heated_rooms
labour_hours += 0.25 * number_heated_rooms
if not has_room_thermostat:
total_cost += ROOM_THERMOSTAT_COST
labour_hours += 0.5
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": 1,
}
def time_and_temperature_zone_control(self, number_heated_rooms):
# The product costs are inclusive of VAT
product_costs = (
TTZC_SMART_THERMOSTAT_COST +
TTZC_ROOM_TEMPERATURE_SENSOR_COST * number_heated_rooms +
TTZC_SMART_RADIATOR_VALUES * number_heated_rooms
)
labour_hours = (
TTZC_SMART_THERMOSTAT_LABOUR_HOURS +
TTZC_ROOM_TEMPERATURE_SENSOR_LABOUR_HOURS * number_heated_rooms +
TTZC_SMART_RADIATOR_VALUES_LABOUR_HOURS * number_heated_rooms
)
labour_costs = TTZC_ELECTRICIAN_HOURLY_RATE * labour_hours
# Add continency and preliminaries to the labour to account for the complexity of the job
labour_costs = labour_costs * (1 + self.CONTINGENCY + self.PRELIMINARIES)
vat = labour_costs * self.VAT_RATE
subtotal_before_vat = product_costs + labour_costs
total_cost = subtotal_before_vat + vat
labour_days = np.ceil(labour_hours / 8)
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": labour_days,
}
def low_carbon_boiler(self, is_combi, size):
"""
Based on a basic estimate of median value £2600 to install a low carbon combi boiler
:return:
"""
unit_cost = COMBI_BOILER_COSTS[size] if is_combi else CONVENTIONAL_BOILER_COSTS[size]
# The unit cost is the cost without VAT
# We now need to estimate the cost of the works
labour_days = 2
labour_rate = 500
# Average cost of installation is 1 (maybe 2days) at £300 per day
# https://www.checkatrade.com/blog/cost-guides/new-boiler-cost/
# To be pessimistic, assume 2 days work and £500 day rate
labour_cost = labour_rate * self.labour_adjustment_factor * labour_days
# Add contingency and preliminaries
labour_cost = labour_cost * (1 + self.CONTINGENCY + self.PRELIMINARIES)
vat = labour_cost * self.VAT_RATE
subtotal_before_vat = unit_cost + labour_cost
total_cost = subtotal_before_vat + vat
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_days * 8,
"labour_days": labour_days,
}

View file

@ -27,6 +27,14 @@ class HeatingControlRecommender:
self.recommend_high_heat_retention_controls()
return
if heating_description in ["Boiler and radiators, mains gas"]:
# We can recommend roomstat programmer trvs
self.recommend_roomstat_programmer_trvs()
# We can also recommend time and temperature zone controls
self.recommend_time_temperature_zone_controls()
return
def recommend_room_heaters_electric_controls(self):
"""
If the home has Room heaters, electric, we start by identifying potential heating controls that could
@ -105,3 +113,115 @@ class HeatingControlRecommender:
# We don't implement any other recommendations right now
return
def recommend_roomstat_programmer_trvs(self):
"""
If the home has a boiler and radiators, mains gas, we start by identifying potential heating controls that could
be upgraded, that would provide a practical impact.
The criteria for recommending an upgrade to heating controls are (one of these must be true)
1) There are no controls
2) No programmer
3) No room thermostat
4) No TRVs
:return:
"""
# We check if we have the conditions to recommend this upgrade
needs_programmer = self.property.main_heating_controls["switch_system"] is None
needs_room_thermostat = self.property.main_heating_controls["thermostatic_control"] is None
needs_trvs = self.property.main_heating_controls["trvs"] is None
can_recommend = (
(self.property.main_heating_controls["no_control"] is not None) or
needs_programmer or
needs_room_thermostat or
needs_trvs
)
if not can_recommend:
return
ending_config = MainheatControlAttributes("Programmer, room thermostat and TRVS").process()
# We use this to determine how we should be updating the config
simulation_config = check_simulation_difference(
new_config=ending_config, old_config=self.property.main_heating_controls
)
# This upgrade will only take the heating system to average energy efficiency
# If the current system is below good, we make it good
if self.property.data["mainheatc-energy-eff"] in ["Poor", "Very Poor", "Average"]:
simulation_config["mainheatc_energy_eff_ending"] = "Good"
has_programmer = not needs_programmer
has_room_thermostat = not needs_room_thermostat
has_trvs = not needs_trvs
self.recommendation.append(
{
"type": "heating_control",
"parts": [],
"description": "upgrade heating controls to Room thermostat, programmer and TRVs",
**self.costs.roomstat_programmer_trvs(
number_heated_rooms=int(self.property.data["number-heated-rooms"]),
has_programmer=has_programmer,
has_room_thermostat=has_room_thermostat,
has_trvs=has_trvs
),
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"simulation_config": simulation_config
}
)
return
def recommend_time_temperature_zone_controls(self):
"""
If the home has a boiler, we can recommend time and temperature zone controls. This is a more advanced
and more efficient control system than the standard controls that come with a boiler. However, it may come
with a higher cost and more involved usage
:return:
"""
# We check if the efficiency of the current heating controls is good or below, and
# Conditions for installation are as follows:
# 1) The current heating controls are not time and temperature zone controls
# 2) The current heating controls are not already at 'Very Good' or above
if (
(self.property.main_heating_controls["thermostatic_control"] == "time and temperature zone control") or
(self.property.data["mainheatc-energy-eff"] in ["Very Good"])
):
# No recommendation needed
return
ending_config = MainheatControlAttributes("Time and temperature zone control").process()
# We use this to determine how we should be updating the config
simulation_config = check_simulation_difference(
new_config=ending_config, old_config=self.property.main_heating_controls
)
# If the current system is below very good, we make it very good
if self.property.data["mainheatc-energy-eff"] in ["Poor", "Very Poor", "Average", "Good"]:
simulation_config["mainheatc_energy_eff_ending"] = "Very Good"
self.recommendation.append(
{
"type": "heating_control",
"parts": [],
"description": "Upgrade heating controls to Smart Thermostats, room sensors and smart radiator valves",
**self.costs.time_and_temperature_zone_control(
number_heated_rooms=int(self.property.data["number-heated-rooms"])
),
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"simulation_config": simulation_config
}
)

View file

@ -4,6 +4,8 @@ from recommendations.Costs import Costs
from recommendations.recommendation_utils import check_simulation_difference
from backend.Property import Property
from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes
from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes
from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes
from recommendations.HeatingControlRecommender import HeatingControlRecommender
@ -19,13 +21,33 @@ class HeatingRecommender:
self.recommendations = []
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
if self.property.main_heating["clean_description"] in [
has_electric_heating_description = self.property.main_heating["clean_description"] in [
"Room heaters, electric", "Electric storage heaters", "Electric storage heaters, radiators"
]:
]
no_heating_no_mains = (
self.property.main_heating["clean_description"] in ["No system present, electric heaters assumed"] and
not self.property.data["mains-gas-flag"]
)
if has_electric_heating_description or no_heating_no_mains:
# Recommend high heat retention storage heaters
self.recommend_electric_storage_heaters(phase=phase, system_change=True, heating_controls_only=False)
return
# if the property has mains heating with boiler and radiators, we recommend optimal heating controls
has_boiler = self.property.main_heating["clean_description"] in ["Boiler and radiators, mains gas"]
# We also check that the property doesn't have a heating system, but it has access to the mains gas
no_heating_has_mains = self.property.main_heating["clean_description"] in [
'No system present, electric heaters assumed'
] and self.property.data["mains-gas-flag"]
if has_boiler or no_heating_has_mains:
self.recommend_boiler_upgrades(phase=phase, no_heating_has_mains=no_heating_has_mains)
return
@staticmethod
def check_simulation_difference(old_config, new_config):
"""
@ -82,8 +104,13 @@ class HeatingRecommender:
**recommendation_simulation_config,
**controls_recommendations[0]["simulation_config"]
}
controls_description = controls_recommendations[0]['description']
# Make the first letter of the description lowercase
controls_description = (
controls_description[0].lower() + controls_description[1:]
)
recommendation_description = f"{description} and {controls_recommendations[0]['description']}"
recommendation_description = f"{description} and {controls_description}"
recommendation = {
"phase": phase,
@ -165,9 +192,18 @@ class HeatingRecommender:
# This upgrade will only take the heating system to average energy efficiency
heating_simulation_config["mainheat_energy_eff_ending"] = "Average"
# If the property is off-gas and has no heating system in place, the number of heated rooms will actually
# be 0, so we use the number of rooms as the figure
number_heated_rooms = (
self.property.data["number-heated-rooms"] if self.property.data["number-heated-rooms"] > 0
else (
self.property.number_of_rooms - 1 if self.property.number_of_rooms > 1 else
self.property.number_of_rooms
)
)
# Upgrade to electric storage heaters
costs = self.costs.high_heat_electric_storage_heaters(
number_heated_rooms=self.property.data["number-heated-rooms"]
number_heated_rooms=number_heated_rooms
)
description = "Install high heat retention electric storage heaters"
@ -182,3 +218,155 @@ class HeatingRecommender:
)
self.recommendations.extend(recommendations)
@staticmethod
def estimate_boiler_size(property_type, built_form, floor_area, floor_height, num_heated_rooms):
# Step 1: Base size estimation based on property type (as a starting point)
base_size = {
'Flat': 25,
'House': 30,
'Maisonette': 28,
'Bungalow': 27
}
# Step 2: Calculate the volume of the property
volume = floor_area * floor_height
# Step 3: Adjust base size for built form (to account for heat retention)
form_adjustment = {
'Mid-Terrace': 0,
'End-Terrace': 2,
'Semi-Detached': 4,
'Detached': 6
}
# Step 4: Further adjust for the total volume and number of heated rooms
volume_adjustment = (volume / 100) # Simplified adjustment factor for volume
rooms_adjustment = (num_heated_rooms - 5) * 0.5 # Assuming base case of 5 rooms
# Calculate the estimated boiler size
estimated_size = base_size[property_type] + form_adjustment[built_form] + volume_adjustment + rooms_adjustment
# Step 5: Align with available boiler sizes and ensure it does not exceed 35kW, as it's rare to need more
available_sizes = [30, 35, 40, 45, 50]
estimated_size = min(max(estimated_size, 30), 40) # Ensure within 30kW to 35kW range
# Find the closest available size (in this case, either rounding up or down to align with 30 or 35)
closest_size = min(available_sizes, key=lambda x: abs(x - estimated_size))
return closest_size
def recommend_boiler_upgrades(self, phase, no_heating_has_mains):
"""
This boiler recommendation will only recommend a like-for-like upgrade, since changing the system
is generally more expensive
:param phase:
:param no_heating_has_mains: indicaes if the property has no heating system, but has access to the mains gas
:return:
"""
recommendation_phase = phase
# We now recommend boiler upgrades, if applicable
simulation_config = {}
boiler_costs = {}
if self.property.data["mainheat-energy-eff"] in ["Very Poor", "Poor", "Average"]:
boiler_size = self.estimate_boiler_size(
property_type=self.property.data["property-type"],
built_form=self.property.data["built-form"],
floor_area=self.property.floor_area,
floor_height=self.property.floor_height,
num_heated_rooms=self.property.data["number-heated-rooms"],
)
# If heating and hot water come from the mains, we need a combi boiler, otherwise we need a regular boiler
hotwater_from_mains = self.property.hotwater["clean_description"] in ["From main system"]
is_combi = hotwater_from_mains or no_heating_has_mains
if is_combi:
description = "Upgrade to a new combi boiler"
else:
description = "Upgrade to a new boiler"
simulation_config = {"mainheat_energy_eff_ending": "Good"}
if no_heating_has_mains:
# Installation of a boiler improves the hot water system so we need to reflect this in
# the outcome of the recommendation
heating_ending_config = MainHeatAttributes("Boiler and radiators, mains gas").process()
hotwater_ending_config = HotWaterAttributes("From main system").process()
fuel_ending_config = MainFuelAttributes("mains gas (not community)").process()
heating_simulation_config = check_simulation_difference(
new_config=heating_ending_config, old_config=self.property.main_heating
)
hotwater_simulation_config = check_simulation_difference(
new_config=hotwater_ending_config, old_config=self.property.hotwater
)
fuel_simulation_config = check_simulation_difference(
new_config=fuel_ending_config, old_config=self.property.main_fuel
)
simulation_config = {
**simulation_config,
**heating_simulation_config,
**hotwater_simulation_config,
**fuel_simulation_config,
"hot_water_energy_eff_ending": "Good"
}
boiler_costs = self.costs.low_carbon_boiler(is_combi=is_combi, size=f"{boiler_size}kw")
self.recommendations.append(
{
"phase": recommendation_phase,
"parts": [
# TODO
],
"type": "heating",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"simulation_config": simulation_config,
**boiler_costs
}
)
# We recommend the heating controls
# If the property did not previously have a boiler, we combine
controls_recommender = HeatingControlRecommender(self.property)
controls_recommender.recommend(heating_description="Boiler and radiators, mains gas")
# We may have 2 recommendations from the heating controls
if not controls_recommender.recommendation:
return
if no_heating_has_mains:
# We combine the heating and controls recommendations
boiler_recommendation = self.recommendations[0].copy()
combined_recommendations = []
for controls_recommendation in controls_recommender.recommendation:
combined_recommendation = self.combine_heating_and_controls(
controls_recommendations=[controls_recommendation],
heating_simulation_config=simulation_config,
costs=boiler_costs,
description=boiler_recommendation["description"],
phase=recommendation_phase,
heating_controls_only=False,
system_change=True
)
combined_recommendations.extend(combined_recommendation)
# Overwrite the existing boiler recommendation
self.recommendations = combined_recommendations
else:
# We increment the recommendation phase, since the heating controls are separate from the boiler upgrade
recommendation_phase += 1
# The heating controls recommendation is distrinct from the boiler upgrade recommendation
# We insert phase into the recommendations for heating controls
for recommendation in controls_recommender.recommendation:
recommendation["phase"] = recommendation_phase
self.recommendations.extend(controls_recommender.recommendation)
return

View file

@ -22,8 +22,14 @@ class HotwaterRecommendations:
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
if (self.property.hotwater["heater_type"] in ["electric immersion"]) & \
(self.property.data["hot-water-energy-eff"] == "Very Poor"):
# If there is no system present, but access to the mains, we
if (
(self.property.hotwater["heater_type"] in ["electric immersion"]) &
(self.property.data["hot-water-energy-eff"] == "Very Poor") &
(self.property.hotwater["no_system_present"] is None)
):
self.recommend_tank_insulation(phase=phase)
return

View file

@ -22,7 +22,8 @@ class Recommendations:
def __init__(
self,
property_instance: Property,
materials: List
materials: List,
exclusions: List[str] = None,
):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
@ -31,6 +32,7 @@ class Recommendations:
self.property_instance = property_instance
self.materials = materials
self.exclusions = exclusions if exclusions else []
self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials)
self.wall_recomender = WallRecommendations(property_instance=property_instance, materials=materials)
@ -45,7 +47,7 @@ class Recommendations:
self.heating_recommender = HeatingRecommender(property_instance=property_instance)
self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance)
def recommend(self, portfolio_id):
def recommend(self):
"""
This method runs the recommendations for the individual measures and then appends them to a list for output
@ -58,67 +60,82 @@ class Recommendations:
property_recommendations = []
phase = 0
print("WALL RECOMMENDATIONS HAVE BEEN COMMENTED OUT TEMPORARILY - ADD ME BACK IN")
if portfolio_id != 66:
# Building Fabric
# Building Fabric
if "wall_insulation" not in self.exclusions:
self.wall_recomender.recommend(phase=phase)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
# insulation
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this has no
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
if "roof_insulation" not in self.exclusions:
self.roof_recommender.recommend(phase=phase)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
# insulation
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this
# has no
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
# have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
if "ventilation" not in self.exclusions:
if self.wall_recomender.recommendations or self.roof_recommender.recommendations:
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
self.roof_recommender.recommend(phase=phase)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
if "floor_insulation" not in self.exclusions:
self.floor_recommender.recommend(phase=phase)
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase += 1
self.floor_recommender.recommend(phase=phase)
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase += 1
if "windows" not in self.exclusions:
self.windows_recommender.recommend(phase=phase)
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
self.windows_recommender.recommend(phase=phase)
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
phase += 1
if "fireplace" not in self.exclusions:
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
phase += 1
# Heating and Electical systems
self.heating_recommender.recommend(phase=phase)
if self.heating_recommender.recommendations:
property_recommendations.append(self.heating_recommender.recommendations)
phase += 1
if "heating" not in self.exclusions:
self.heating_recommender.recommend(phase=phase)
if self.heating_recommender.recommendations:
property_recommendations.append(self.heating_recommender.recommendations)
# We check if we have distinct heating and heating controls recommendations
# If so, we increment by 2 (one of the heating system, one for the heating controls)
# otherwise we incremenet by 1
max_used_phase = max([rec["phase"] for rec in self.heating_recommender.recommendations])
amount_to_increment = max_used_phase - phase + 1
phase += amount_to_increment
# Hot water
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
if "hot_water" not in self.exclusions:
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
self.lighting_recommender.recommend(phase=phase)
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
phase += 1
if "lighting" not in self.exclusions:
self.lighting_recommender.recommend(phase=phase)
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
phase += 1
# Renewables
self.solar_recommender.recommend(phase=phase)
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)
phase += 1
if "solar_pv" not in self.exclusions:
self.solar_recommender.recommend(phase=phase)
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)
phase += 1
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)

View file

@ -8,6 +8,9 @@ class SolarPvRecommendations:
# Wattage per panel - this is based on the average wattage of a solar panel being between 250w and 420w
SOLAR_PANEL_WATTAGE = 250
MAX_SYSTEM_WATTAGE = 6000
MIN_SYSTEM_WATTAGE = 1000
def __init__(self, property_instance):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
@ -18,6 +21,19 @@ class SolarPvRecommendations:
self.recommendation = []
@staticmethod
def trim_solar_wattage_options(scenarios_with_wattage):
# Initialize the list with the first element, assuming the list is not empty
trimmed_list = [scenarios_with_wattage[0]]
# Iterate over the list starting from the second element
for scenario in scenarios_with_wattage[1:]:
# Compare the second element (index 1) of the current tuple with the last tuple in the trimmed list
if scenario[1] > trimmed_list[-1][1]:
trimmed_list.append(scenario)
return trimmed_list
def recommend(self, phase):
"""
We check if a property is potentially suitable for solar PV based on the following criteria:
@ -44,28 +60,47 @@ class SolarPvRecommendations:
# 2) With and without battery
roof_coverage_scenarios = [
self.property.solar_pv_percentage - 0.1, self.property.solar_pv_percentage,
self.property.solar_pv_percentage + 0.1
]
# We make sure we haven't gone too low or high
roof_coverage_scenarios = [v for v in roof_coverage_scenarios if 0 <= v <= 1]
if self.property.solar_pv_percentage <= 0.4:
roof_coverage_scenarios.append(self.property.solar_pv_percentage + 0.1)
# We make sure we haven't gone too low or high - we allow no more than 60% coverage
roof_coverage_scenarios = [v for v in roof_coverage_scenarios if 0 <= v <= 0.6]
# If we only have two scenarios, we add a coverage scenario 10% less than the smallest
if len(roof_coverage_scenarios) == 2:
roof_coverage_scenarios.insert(0, roof_coverage_scenarios[0] - 0.1)
battery_scenarios = [False, True]
# I now produce the cross product of the scenarios
scenarios = [(roof, battery) for roof in roof_coverage_scenarios for battery in battery_scenarios]
for roof_coverage, has_battery in scenarios:
scenarios_with_wattage = []
for roof_coverage in roof_coverage_scenarios:
# We now have a property which is potentially suitable for solar PV
solar_pv_roof_area = self.property.get_solar_pv_roof_area(roof_coverage)
number_solar_panels = np.floor(solar_pv_roof_area / self.SOLAR_PANEL_AREA)
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
roof_coverage_percent = round(roof_coverage * 100)
if solar_panel_wattage < self.MIN_SYSTEM_WATTAGE:
continue
solar_panel_wattage = np.clip(
a=solar_panel_wattage, a_min=self.MIN_SYSTEM_WATTAGE, a_max=self.MAX_SYSTEM_WATTAGE
)
scenarios_with_wattage.append((roof_coverage, solar_panel_wattage))
# We trim the scenarios, so that we don't have duplicate wattages
scenarios_with_wattage = self.trim_solar_wattage_options(scenarios_with_wattage)
# Produce the cross product of the scenarios
scenarios = [
(roof, wattage, battery) for roof, wattage in scenarios_with_wattage for battery in battery_scenarios
]
# We deduce the wattage of the solar panels based on the roof coverage
for roof_coverage, solar_panel_wattage, has_battery in scenarios:
# We now have a property which is potentially suitable for solar PV
roof_coverage_percent = round(roof_coverage * 100)
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
# of solar PV installations
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage, has_battery=has_battery)
kw = np.floor(solar_panel_wattage / 100) / 10
if has_battery:

View file

@ -1,9 +1,10 @@
import pickle
import boto3
from io import BytesIO, StringIO
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import csv
import pandas as pd
from io import BytesIO, StringIO
from utils.logger import setup_logger
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
logger = setup_logger()
@ -224,3 +225,22 @@ def read_excel_from_s3(bucket_name, file_key, header_row):
df.reset_index(drop=True, inplace=True)
return df
def read_csv_from_s3(bucket_name, filepath):
s3 = boto3.client('s3')
# Get the object from s3
s3_object = s3.get_object(Bucket=bucket_name, Key=filepath)
# Read the CSV body from the s3 object
body = s3_object['Body'].read()
# Use StringIO to create a file-like object from the string
csv_data = StringIO(body.decode('utf-8'))
# Use csv library to read it into a list of dictionaries
reader = csv.DictReader(csv_data)
data = list(reader)
return data