Merge pull request #284 from Hestia-Homes/urban-splash

Urban splash
This commit is contained in:
KhalimCK 2024-02-22 11:41:40 +00:00 committed by GitHub
commit a45cf2f319
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 1760 additions and 210 deletions

View file

@ -238,12 +238,15 @@ class Property:
# Note: often when the wall is insulatied, the internal/external insulation is not noted so we should
# test the impact of using these booleans
if recommendation["type"] == "external_wall_insulation":
output["external_insulation"] = True
output["internal_insulation"] = False
output["external_insulation_ending"] = True
output["internal_insulation_ending"] = False
if recommendation["type"] == "internal_wall_insulation":
output["external_insulation"] = False
output["internal_insulation"] = True
output["external_insulation_ending"] = False
output["internal_insulation_ending"] = True
if recommendation["type"] == "cavity_wall_insulation":
output["is_filled_cavity_ending"] = True
# TODO: perhaps detrimental
# When making a recommendation for the wall, we will also update the ventilation
@ -314,7 +317,7 @@ class Property:
if recommendation["type"] == "low_energy_lighting":
output["low_energy_lighting_ending"] = 100
output["lighting_energy_eff_starting"] = "Very Good"
output["lighting_energy_eff_ending"] = "Very Good"
if recommendation["type"] == "windows_glazing":
output["multi_glaze_proportion_ending"] = 100
@ -338,7 +341,19 @@ class Property:
if is_secondary_glazing:
output["glazed_type_ending"] = "secondary glazing"
else:
output["glazed_type_ending"] = "double glazing installed during or after 2002 "
output["glazed_type_ending"] = "double glazing installed during or after 2002"
if recommendation["type"] in ["heating", "hot_water_tank_insulation"]:
# We update the data, as defined in the recommendaton
simulation_config = recommendation["simulation_config"]
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning
# value
for key, value in simulation_config.items():
if value is None:
simulation_config[key] = "Unknown"
output.update(simulation_config)
if recommendation["type"] == "solar_pv":
output["photo_supply_ending"] = recommendation["photo_supply"]
@ -348,9 +363,9 @@ class Property:
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv"
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation"
]:
raise NotImplementedError("Implement me")
raise NotImplementedError("Implement me, given type %s" % recommendation["type"])
output['id'] = "+".join([str(property_id), str(primary_recommendation_id)])
@ -455,7 +470,7 @@ class Property:
to_update[k] = None
return to_update
def get_full_property_data(self):
def get_full_property_data(self, current_valuation=None):
"""
This method extracts the data which is pushed to the database, containing core information, from the EPC
about a property
@ -477,6 +492,7 @@ class Property:
"tenure": self.data["tenure"],
"current_epc_rating": self.data["current-energy-rating"],
"current_sap_points": self.data["current-energy-efficiency"],
"current_valuation": current_valuation
}
property_data = self._clean_upload_data(property_data)

View file

@ -86,6 +86,7 @@ class PropertyModel(Base):
tenure = Column(Text)
current_epc_rating = Column(Enum(Epc))
current_sap_points = Column(Float)
current_valuation = Column(Float)
class FeatureRating(enum.Enum):

View file

@ -53,6 +53,9 @@ class Plan(Base):
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
is_default = Column(Boolean, nullable=False)
valuation_increase_lower_bound = Column(Float)
valuation_increase_upper_bound = Column(Float)
valuation_increase_average = Column(Float)
class PlanRecommendations(Base):

View file

@ -1,6 +1,6 @@
from datetime import datetime
import numpy as np
from tqdm import tqdm
import pandas as pd
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
@ -37,12 +37,30 @@ from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from backend.ml_models.Valuation import PropertyValuation
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
logger = setup_logger()
BATCH_SIZE = 5
def patch_epc(config, epc_records):
"""
This utility function is useful to patch the epc data if we have data from the customer
:return:
"""
number_habitable_rooms = config.get("number-habitable-rooms", None)
number_heated_rooms = config.get("number-heated-rooms", None)
if number_habitable_rooms is not None:
epc_records["original_epc"]["number-habitable-rooms"] = int(number_habitable_rooms)
if number_heated_rooms is not None:
epc_records["original_epc"]["number-heated-rooms"] = int(number_heated_rooms)
return epc_records
router = APIRouter(
prefix="/plan",
tags=["plan"],
@ -57,6 +75,11 @@ async def trigger_plan(body: PlanTriggerRequest):
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
# TODO: We should store the trigger file path in the database with the plan so we can track the file that
# triggered the plan
# TODO: Create the ability to congigure/switch off certain measures
try:
session.begin()
logger.info("Getting the inputs")
@ -66,7 +89,7 @@ async def trigger_plan(body: PlanTriggerRequest):
)
input_properties = []
for config in plan_input:
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
epc_searcher = SearchEpc(
@ -97,6 +120,7 @@ async def trigger_plan(body: PlanTriggerRequest):
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
epc_records = patch_epc(config, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
@ -133,7 +157,6 @@ async def trigger_plan(body: PlanTriggerRequest):
p.get_spatial_data(uprn_filenames)
logger.info("Getting components and epc recommendations")
recommendations = {}
recommendations_scoring_data = []
representative_recommendations = {}
@ -142,10 +165,10 @@ async def trigger_plan(body: PlanTriggerRequest):
# Property recommendations
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
# TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires
# decanting the tenant
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations, property_representative_recommendations = recommender.recommend()
# TODO: portfolio id as an input is temp
print("DELETE PORTFOLIO ID AS AN INPUT!!")
property_recommendations, property_representative_recommendations = recommender.recommend(body.portfolio_id)
if not property_recommendations:
continue
@ -182,6 +205,10 @@ async def trigger_plan(body: PlanTriggerRequest):
)
# Insert the predictions into the recommendations and run the optimiser
# TODO: If a recommendation has a negative impact on SAP, we should remove it - this seems to have become a
# possibility with heating system
# TODO: After optimising, if there are any cheap, quick win measures (e.g. insulate water tank with hot water
# cylinder jacket), we should add these to the recommendations as default
logger.info("Optimising recommendations")
for property_id in recommendations.keys():
@ -201,19 +228,22 @@ async def trigger_plan(body: PlanTriggerRequest):
expected_adjusted_energy=expected_adjusted_energy
)
input_measures = prepare_input_measures(recommendations_with_impact, body.goal)
input_measures = prepare_input_measures(recommendations_with_impact, body.goal, body.housing_type)
current_sap_points = int(property_instance.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
if body.budget:
optimiser = GainOptimiser(input_measures, max_cost=body.budget)
optimiser = GainOptimiser(
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
current_sap_points = int(property_instance.data["current-energy-efficiency"])
target_sap_points = epc_to_sap_lower_bound(body.goal_value)
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures,
min_gain=CostOptimiser.calculate_sap_gain_with_slack(target_sap_points - current_sap_points)
min_gain=sap_gain
)
optimiser.setup()
@ -265,27 +295,43 @@ async def trigger_plan(body: PlanTriggerRequest):
batch_properties = input_properties[i:i + BATCH_SIZE]
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
# Your existing operations
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
create_property_details_epc(session, property_details_epc)
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
property_data = p.get_full_property_data()
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
recommendations_to_upload = recommendations.get(p.id, [])
if not recommendations_to_upload:
continue
new_plan_id = create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"is_default": True
"is_default": True,
"valuation_increase_lower_bound": (
valuations["lower_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_upper_bound": (
valuations["upper_bound_increased_value"] - valuations["current_value"]
),
"valuation_increase_average": (
valuations["average_increased_value"] - valuations["current_value"]
),
})
uploaded_recommendation_ids = upload_recommendations(session, recommendations_to_upload, p.id)
@ -294,14 +340,6 @@ async def trigger_plan(body: PlanTriggerRequest):
session, plan_id=new_plan_id, recommendation_ids=uploaded_recommendation_ids
)
# Get defaults
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
property_valuation_increases.append(
valuations["average_increased_value"] - valuations["current_value"]
)

View file

@ -24,6 +24,34 @@ class PropertyValuation:
100120703802: 277000, # Based on Zoopla
10014469685: 286000, # Based on Zoopla
10001328782: 196000, # Based on Zoopla
# Urban Splash - valuations from The Move Market
10023345430: 74_000,
10023345435: 99_000,
10023345436: 62_000,
10023345441: 62_000,
10094183503: 2_988_000,
10094183499: 123_000,
10070056824: 70_000,
110070056242: 100_000,
10070056243: 130_000,
10070056817: 130_000,
10094183501: 185_000,
10070056250: 71_000,
10094183500: 185_000,
10070056843: 67_000,
10070056844: 67_000,
10070056241: 76_000,
10070056834: 63_000,
10023345439: 62_000,
10070056815: 101_000,
10070056816: 101_000,
10094183498: 101_000,
10070056840: 673_000,
10070056848: 76_000,
10070056849: 76_000,
10070056829: 76_000,
10070056920: 76_000,
10023345463: 76_000,
}
# We base our valuation uplifts on a number of sources

View file

@ -0,0 +1,277 @@
from pptx.enum.text import PP_ALIGN # NOQA
from pptx import Presentation
from pptx.util import Inches, Pt
import matplotlib.pyplot as plt
from sqlalchemy.orm import Session
from sqlalchemy.sql import true
from backend.app.db.utils import row2dict
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
from backend.app.db.models.recommendations import Recommendation
from backend.app.db.models.recommendations import Plan
from backend.app.utils import sap_to_epc
EPC_COLOURS = {
"A": "#028051",
"B": "#14b759",
"C": "#8ecd46",
"D": "#fdd401",
"E": "#fdab67",
"F": "#ee8023",
"G": "#e71437"
}
def get_properties_with_default_recommendations(session: Session, portfolio_id: int):
"""
Fetch properties for a given portfolio_id along with their default recommendations,
ensuring that all properties are retrieved even if they don't have recommendations
where default is True.
:param session: The SQLAlchemy session used to execute the query.
:param portfolio_id: The ID of the portfolio for which to retrieve properties and recommendations.
:return: A list of dictionaries, where each dictionary represents a property including
its associated default recommendations if any.
"""
# Adjust the join to correctly filter recommendations while including all properties
query = session.query(PropertyModel, Recommendation).outerjoin(Recommendation,
(Recommendation.property_id == PropertyModel.id) & (
Recommendation.default == true())) \
.filter(PropertyModel.portfolio_id == portfolio_id) \
.all()
properties = {}
for property, recommendation in query:
# Ensure the property is added once with an empty list of recommendations initially
if property.id not in properties:
properties[property.id] = row2dict(property)
properties[property.id]['recommendations'] = []
# Append recommendations if they exist and meet the criteria (already filtered by the query)
if recommendation and recommendation.default:
properties[property.id]['recommendations'].append(row2dict(recommendation))
return list(properties.values())
def get_property_details_by_portfolio_id(session: Session, portfolio_id: int):
"""
This function retrieves all property details associated with a given portfolio_id.
:param session: The SQLAlchemy session used to execute the query.
:param portfolio_id: The ID of the portfolio for which to retrieve property details.
:return: A list of dictionaries, where each dictionary represents a property's details.
Returns an empty list if no property details are found.
"""
property_details = session.query(PropertyDetailsEpcModel).filter(
PropertyDetailsEpcModel.portfolio_id == portfolio_id).all()
# Convert the SQLAlchemy objects to dictionaries
property_details_dict = [row2dict(pd) for pd in property_details] if property_details else []
return property_details_dict
def get_plan_by_portfolio_id(session: Session, portfolio_id: int):
"""
This function retrieves all plans associated with a given portfolio_id.
:param session: The SQLAlchemy session used to execute the query.
:param portfolio_id: The ID of the portfolio for which to retrieve plans.
:return: A list of dictionaries, where each dictionary represents a plan.
Returns an empty list if no plans are found.
"""
plans = session.query(Plan).filter(Plan.portfolio_id == portfolio_id).all()
# Convert the SQLAlchemy objects to dictionaries
plans_dict = [row2dict(plan) for plan in plans] if plans else []
return plans_dict
def plot_epc_distribution(df, customer_key, title='Your Units', background_color='white', bar_height=0.4, font_size=15):
"""
Plots a horizontal bar chart of EPC rating distribution with adjustable bar thickness and text sizes.
Allows setting the plot background color and dynamically adjusts text size and bar spacing.
:param df: DataFrame with columns ['current_epc_rating', 'count', 'percentage']
:param title: Title of the plot
:param background_color: Background color of the plot
:param bar_height: Thickness of the bars (default 0.4)
:param font_size: Base font size for text annotations (default 15)
"""
# Calculate dynamic figure size or adjust based on preferences
square_size = max(6, len(df) * 0.6) # Ensure minimum size and adjust based on number of entries
fig, ax = plt.subplots(figsize=(square_size, square_size))
fig.patch.set_facecolor(background_color) # Set figure background color
ax.set_facecolor(background_color) # Set axes background color
df['percentage'] = df['percentage'].round(1) # Round the percentage values to 1 decimal place
df_sorted = df.sort_values('percentage', ascending=True)
# Plot bars with specified height for adjustable thickness
bars = ax.barh(df_sorted['current_epc_rating'], df_sorted['percentage'],
color=df_sorted['current_epc_rating'].map(EPC_COLOURS), edgecolor='none', height=bar_height)
epc_rating_font_size = font_size * 2 # EPC rating font size larger than base font size
count_percentage_font_size = font_size # Count (percentage) font size as base font size
# Annotate bars with EPC ratings inside and count with percentage values outside
for index, bar in enumerate(bars):
width = bar.get_width()
epc_rating = df_sorted.iloc[index]['current_epc_rating']
count = df_sorted.iloc[index]['count']
percentage = df_sorted.iloc[index]['percentage']
# EPC rating inside the bar with increased font size
ax.text(width - (width * 0.05), bar.get_y() + bar.get_height() / 2,
f"{epc_rating}", va='center', ha='right', color='white', fontsize=epc_rating_font_size)
# Count and percentage outside the bar, original font size
ax.text(width + 1, bar.get_y() + bar.get_height() / 2,
f"{count} ({percentage}%)", va='center', color='black', fontsize=count_percentage_font_size)
ax.set_title(title, fontsize=font_size * 1.2) # Adjust title font size proportionally
ax.tick_params(axis='x', which='both', bottom=False, top=False,
labelbottom=False) # Remove x-axis tick marks and values
ax.tick_params(axis='y', which='both', left=False, right=False,
labelleft=False) # Remove y-axis tick marks and labels
ax.spines['top'].set_visible(False) # Remove top spine
ax.spines['right'].set_visible(False) # Remove right spine
ax.spines['left'].set_visible(False) # Remove left spine
ax.spines['bottom'].set_visible(False) # Remove bottom spine
plt.tight_layout() # Adjust layout
plt.show()
# Save the figure as an image
figure_path = f'etl/customers/{customer_key}/epc_distribution_plot.png'
fig.savefig(figure_path, bbox_inches='tight')
plt.close(fig) # Close the figure to free memory
return fig, figure_path
def save_plot_to_image(figure, path='plot.png'):
"""
Saves a matplotlib figure to an image file for insertion into PowerPoint.
"""
figure.savefig(path, bbox_inches='tight')
plt.close(figure)
def save_figure_as_image(figure, filename='temp_plot.png'):
"""
Saves a matplotlib figure to an image file.
"""
figure.savefig(filename, dpi=300)
plt.close(figure) # Close the figure to prevent it from displaying in notebooks or Python environments
def add_commentary_with_bullets(slide, commentary, top_inches, left_inches=Inches(1), width_inches=Inches(8),
height_inches=Inches(2)):
"""
Adds commentary with bullet points to a slide.
:param slide: The slide object to add the commentary to.
:param commentary: The commentary text, with sections separated by newlines for bullet points.
:param top_inches: The top position of the commentary text box.
:param left_inches: The left position of the commentary text box.
:param width_inches: The width of the commentary text box.
:param height_inches: The height of the commentary text box.
"""
txBox = slide.shapes.add_textbox(left_inches, top_inches, width_inches, height_inches)
tf = txBox.text_frame
# Configure text frame
tf.word_wrap = True
tf.auto_size = True
tf.paragraphs[0].alignment = PP_ALIGN.LEFT
# Split the commentary into sections for bullet points
sections = commentary.split("\n")
for i, section in enumerate(sections):
if i > 0:
p = tf.add_paragraph() # Add a new paragraph for each section after the first
else:
p = tf.paragraphs[0] # Use the first paragraph for the first section
p.text = section
p.space_after = Pt(14) # Adjust space after each bullet point as needed
p.font.size = Pt(14) # Adjust font size as needed
p.level = 0 # Bullet level, can be adjusted for nested bullets
p.space_before = Pt(0)
def add_slide_with_image(prs, title, img_path=None, commentary=None):
"""
Adds a slide with an image (if provided) and optional commentary. If no image is provided,
places the commentary text in the middle of the slide.
"""
slide_layout = prs.slide_layouts[5] # Title and Content layout
slide = prs.slides.add_slide(slide_layout)
title_placeholder = slide.shapes.title
title_placeholder.text = title
# Determine the position of the commentary text box based on whether an image is included
if img_path:
# Add the image
slide.shapes.add_picture(img_path, Inches(1), Inches(1.5), Inches(8), Inches(4.5))
# Position for commentary when image is present
commentary_top = Inches(6)
else:
# Position for commentary when image is not present (centered vertically)
commentary_top = Inches(3)
# Add commentary if provided
if commentary:
add_commentary_with_bullets(slide, commentary, commentary_top)
def create_powerpoint(data, save_location):
"""
Creates a PowerPoint presentation based on provided data and optional commentaries.
:param data: A dictionary containing the data needed for each slide.
:param save_location: The file path where the PowerPoint presentation will be saved.
"""
prs = Presentation()
for slide, slide_data in data.items():
slide_figure_path = data[slide].get('image_path')
text = data[slide].get('text')
title = data[slide].get('title', "")
add_slide_with_image(prs, title, slide_figure_path, text)
# Save the presentation
prs.save(save_location)
def create_recommendations_summary(recommendations_df, properties_df, sap_target):
# Aggregate the impact of the recommendations
# We want:
# Total number of sap points
# total valuation impact
# total bill savings
# total cost
# Total Co2 impact
recommendations_summary = recommendations_df.groupby(["property_id"]).agg(
total_sap_points=("sap_points", "sum"),
total_valuation_impact=("property_valuation_increase", "sum"),
total_bill_savings=("energy_cost_savings", "sum"),
total_cost=("estimated_cost", "sum"),
total_carbon=("co2_equivalent_savings", "sum")
).reset_index()
# Merge on current sap points
recommendations_summary = recommendations_summary.merge(
properties_df[["id", "uprn", "current_sap_points"]].rename(columns={"id": "property_id"}), on="property_id",
how="left"
)
recommendations_summary["expected_sap_points"] = (
recommendations_summary["current_sap_points"] + recommendations_summary["total_sap_points"]
)
recommendations_summary["expected_epc_rating"] = recommendations_summary["expected_sap_points"].apply(
lambda x: sap_to_epc(x)
)
recommendations_summary["sap_difference"] = sap_target - recommendations_summary["expected_sap_points"]
return recommendations_summary

View file

@ -0,0 +1,195 @@
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from utils.s3 import read_excel_from_s3
from backend.SearchEpc import SearchEpc
from epc_api.client import EpcClient
from utils.s3 import save_csv_to_s3
# Read in the .env file in backend
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
USER_ID = 8
PORTFOLIO_ID = 66
SECOND_SCENARIO_PORTFOLIO_ID = 65
# We also create a second portfolio for a subset of properties that do not meet the install requirements
# We drop these uprns from the first plan
second_portfolio_uprns = [
10070056840, 10070056846, 10070056847, 10070056843, 10070056848, 10070056844, 10070056849,
10070056829, 10070056920, 10023345463
]
def app():
"""
This application will read in the Urban Splash data, in the dev AWS account, and pre-process it. There are a
few issues with the file, including incorrect postcodes.
The customer is interested in the following:
- Getting properties to an EPC C
- Doing do within a budget of £5,000
:return:
"""
potential_postcodes = ["BD9 5BQ", "BD9 5BR", "BD9 5BN"]
raw_asset_list = read_excel_from_s3(
bucket_name="retrofit-datalake-dev",
file_key="customers/urban_splash/raw_asset_list/USRF - Velvet Mill EPC.xlsx",
header_row=2
)
# We have a series of apartment numbers that are "Apartment 001", "Apartment 002", etc. We need to convert these
# to "Apartment 1", "Apartment 2", etc.
raw_asset_list["address1"] = raw_asset_list["Unit Number"].str.replace(
"Apartment 00", "Apartment ", regex=True
)
raw_asset_list["address1"] = raw_asset_list["address1"].str.replace(
"Apartment 0", "Apartment ", regex=True
)
# For each entry in the asset list, we make an api call to the EPC database to get the EPC data. We'll retrieve the
# uprn for the property, as well as a nice address and postcode that we can use. We'll also try and deduce the
# likely wall construction, since many of the homes are new builds, based on their newest EPC
epc_data = []
processed_asset_list = []
for _, row in tqdm(raw_asset_list.iterrows(), total=len(raw_asset_list)):
newest_epc = None
idx = 0
while newest_epc is None:
postcode = potential_postcodes[idx]
searcher = SearchEpc(
address1=row.address1, postcode=postcode, auth_token=EPC_AUTH_TOKEN, os_api_key=""
)
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
if idx == len(potential_postcodes) - 1:
break
idx += 1
else:
newest_epc = searcher.newest_epc
if newest_epc is None:
raise Exception("FX ME")
if row["Beds"] == "Studio":
number_heated_rooms = 2
number_habitable_rooms = 2
else:
# Assume one room for communal space, one room for bathroom
number_heated_rooms = row["Beds"] + 2
number_habitable_rooms = row["Beds"] + 2
to_append = {
**row.to_dict(),
"uprn": newest_epc["uprn"],
"address": newest_epc["address1"],
"postcode": newest_epc["postcode"],
# "walls-description": newest_epc["walls-description"],
# "roof-description": newest_epc["roof-description"],
# "floor-description": newest_epc["floor-description"],
# "total-floor-area": newest_epc["total-floor-area"],
"full-address": newest_epc["address"],
"number-heated-rooms": number_heated_rooms,
"number-habitable-rooms": number_habitable_rooms,
}
processed_asset_list.append(to_append)
epc_data.append(newest_epc)
processed_asset_list_df = pd.DataFrame(processed_asset_list)
epc_data_df = pd.DataFrame(epc_data)
# We store this data
# Store the data in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/test_inputs.csv"
save_csv_to_s3(
dataframe=processed_asset_list_df[
~processed_asset_list_df["uprn"].astype(int).isin(second_portfolio_uprns)
],
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename,
"budget": None,
}
print(body)
subset = processed_asset_list_df[
processed_asset_list_df["uprn"].astype(int).isin(second_portfolio_uprns)
]
filename2 = f"{USER_ID}/{SECOND_SCENARIO_PORTFOLIO_ID}/test_inputs.csv"
save_csv_to_s3(
dataframe=subset,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename2
)
body = {
"portfolio_id": str(SECOND_SCENARIO_PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename,
"budget": None,
}
print(body)
# Some basic analysis on the heating, heating controls and hot water systems
# All of the heating systems are rated very poor, poor or average. When it's average, they are all also
# "Room heaters, electric", but the house has "Programmer and appliance thermostats" for the heating controls.
# which is more efficient
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
# Heating
print(epc_data_df[["mainheat-description", "mainheatcont-description", "mainheat-energy-eff"]].drop_duplicates())
# mainheat-description mainheatcont-description mainheat-energy-eff
# 0 Room heaters, electric Programmer and room thermostat Very Poor
# 12 Room heaters, electric Programmer and appliance thermostats Average
# 20 Electric storage heaters, radiators Celect-type controls Poor
# Hot water
print(epc_data_df[["hotwater-description", "hot-water-energy-eff"]].drop_duplicates())
# hotwater-description hot-water-energy-eff
# 0 Electric immersion, standard tariff Very Poor
# 12 Electric immersion, off-peak Average
# We now retrieve EPCS for all of the properties that are in these postcodes very obviously for the velvet mill
# We'll use this information to get a sense of the likely wall/roof/floor construction for the properties
# client = EpcClient(auth_token=EPC_AUTH_TOKEN)
#
# neighbouring_epcs = []
# for pc in potential_postcodes:
# response = client.domestic.search(params={"postcode": pc}, size=1000)
# data = response["rows"]
#
# # keep just rows that are clearly for the velvet mill
# data = [x for x in data if "velvet" in x["address1"].lower()]
#
# neighbouring_epcs.extend(data)
#
# neighbouring_epcs_df = pd.DataFrame(neighbouring_epcs)
# neighbouring_epcs_df["walls-description"].value_counts()
# neighbouring_epcs_df["roof-description"].value_counts()
# neighbouring_epcs_df["floor-description"].value_counts()

View file

@ -0,0 +1,352 @@
"""
This script contains the code to generate the data required to populate the slides
We connect to the database amd extract the data for the portfolio needed so it is recommended to use
a environment akin to the backend to run this script
"""
import pandas as pd
import numpy as np
from backend.app.db.connection import db_engine
from sqlalchemy.orm import sessionmaker
from etl.customers.slide_utils import (
plot_epc_distribution,
get_property_details_by_portfolio_id,
get_plan_by_portfolio_id,
get_properties_with_default_recommendations,
create_powerpoint,
create_recommendations_summary
)
PORTFOLIO_ID = 66
SECOND_SCENARIO_PORTFOLIO_ID = 65
EPC_TARGET = "C"
SAP_TARGET = 69
CUSTOMER_KEY = "urban_splash"
def app():
# Connect to database
session = sessionmaker(bind=db_engine)()
########################################################################
# Get the data we need
########################################################################
# Get the properties for the portfolio
properties = get_properties_with_default_recommendations(session, PORTFOLIO_ID)
properties_df = pd.DataFrame(properties)
# We now pull the data for the property details
property_details = get_property_details_by_portfolio_id(session, PORTFOLIO_ID)
property_details_df = pd.DataFrame(property_details)
# Merge on uprn
property_details_df = property_details_df.merge(
properties_df[["uprn", "id"]].rename(columns={"id": "property_id"}),
on="property_id"
)
plans = get_plan_by_portfolio_id(session, PORTFOLIO_ID)
plans_df = pd.DataFrame(plans)
# Unnest the recommendations. Each recommendation is a list of dictionaries
recommendations_exploded = properties_df["recommendations"].explode().tolist()
recommendations_df = pd.DataFrame([r for r in recommendations_exploded if not pd.isnull(r)])
recommendations_summary = create_recommendations_summary(recommendations_df, properties_df, SAP_TARGET)
# Get the data for the second scenario portfolio
properties_second_scenario = get_properties_with_default_recommendations(session, SECOND_SCENARIO_PORTFOLIO_ID)
properties_second_scenario_df = pd.DataFrame(properties_second_scenario)
propert_details_second_scenario = get_property_details_by_portfolio_id(session, SECOND_SCENARIO_PORTFOLIO_ID)
property_details_second_scenario_df = pd.DataFrame(propert_details_second_scenario)
# Merge on uprn
property_details_second_scenario_df = property_details_second_scenario_df.merge(
properties_second_scenario_df[["uprn", "id"]].rename(columns={"id": "property_id"}),
on="property_id"
)
plans_second_scenario = get_plan_by_portfolio_id(session, SECOND_SCENARIO_PORTFOLIO_ID)
plans_second_scenario_df = pd.DataFrame(plans_second_scenario)
# Merge on uprn so we can compare properties across portfolios
plans_second_scenario_df = plans_second_scenario_df.merge(
properties_second_scenario_df[["uprn", "id"]].rename(columns={"id": "property_id"}), on="property_id"
)
recommendations_exploded_second_scenario = properties_second_scenario_df["recommendations"].explode().tolist()
recommendations_second_scenario_df = pd.DataFrame(
[r for r in recommendations_exploded_second_scenario if not pd.isnull(r)]
)
recommendations_summary_second_scenario = create_recommendations_summary(
recommendations_second_scenario_df, properties_second_scenario_df, SAP_TARGET
)
# Combine the data for both scenarios
full_property_details = pd.concat([property_details_df, property_details_second_scenario_df])
full_properties = pd.concat([properties_df, properties_second_scenario_df])
epc_rating_summary = full_properties.groupby("current_epc_rating").size().reset_index(name="count")
epc_rating_summary["percentage"] = epc_rating_summary["count"] / epc_rating_summary["count"].sum() * 100
########################################################################
# We pull out the data for the slides
########################################################################
############
# Slide 1:
############
# visual
epc_plot, figure_path = plot_epc_distribution(
epc_rating_summary, CUSTOMER_KEY, title="", background_color="white", bar_height=0.75, font_size=15
)
# floor area - upper and lower bounds
# Take just properties that are below EPC C
properties_needing_work = full_properties[
full_properties["current_sap_points"] < SAP_TARGET
]
property_details_needing_work = full_property_details[
full_property_details["uprn"].isin(properties_needing_work["uprn"])
]
min_area, max_area, average_area = (
full_property_details["total_floor_area"].min(),
full_property_details["total_floor_area"].max(),
full_property_details["total_floor_area"].mean()
)
# Annual energy consumption - upper and lower bounds
min_energy_consumption, max_energy_consumption, average_consumption, total_consumption = (
property_details_needing_work["adjusted_energy_consumption"].min(),
property_details_needing_work["adjusted_energy_consumption"].max(),
property_details_needing_work["adjusted_energy_consumption"].mean(),
property_details_needing_work["adjusted_energy_consumption"].sum()
)
# Co2 emissions - upper and lower bounds
min_co2, max_co2, average_co2, total_co2 = (
property_details_needing_work["co2_emissions"].min(),
property_details_needing_work["co2_emissions"].max(),
property_details_needing_work["co2_emissions"].mean(),
property_details_needing_work["co2_emissions"].sum()
)
# Valuation: upper and lower bounds and average - take positive values in case we have just a sample
valuation_df = properties_df[properties_df["current_valuation"] > 0]
min_valuation, max_valuation, average_valuation = (
valuation_df["current_valuation"].min(),
valuation_df["current_valuation"].max(),
valuation_df["current_valuation"].median()
)
recommendations_df.keys()
slide_1_commentary = (
f"Floor areas range from {min_area} to {max_area} square meters, with an average of {average_area} square "
f"meters. \n"
f"Annual energy consumption ranges from {min_energy_consumption} to {max_energy_consumption} kWh, with an "
f"average of {average_consumption} kWh. \n"
f"CO2 emissions range from {min_co2} to {max_co2} tonnes, with an average of {average_co2} tonnes. \n"
f"Valuations range from £{min_valuation} to £{max_valuation} £, with an average of £"
f"{average_valuation}.\n"
)
############
# Slide 2:
############
# What it would take to hit EPC C
# We calculate the number of units that will make it to an EPC C
units_hitting_target = recommendations_summary[
recommendations_summary["expected_epc_rating"] == EPC_TARGET
]
n_units_to_target = units_hitting_target.shape[0]
measures = "Electrical heating system upgrades & heating controls and Hot water system improvements"
# Costs
(
expected_cost_per_unit_lower,
expected_cost_per_unit_upper,
expected_project_cost,
) = (
units_hitting_target["total_cost"].min(),
units_hitting_target["total_cost"].max(),
units_hitting_target["total_cost"].sum()
)
# Per property
# Take positive entries just in case we we have a sample
valuation_impact_df = plans_df[plans_df["property_id"].isin(units_hitting_target["property_id"])]
valuation_impact_df = valuation_impact_df[valuation_impact_df["valuation_increase_lower_bound"] > 0]
min_valuation_impact, max_valuation_impact, average_valuation_impact = (
valuation_impact_df["valuation_increase_lower_bound"].median(),
valuation_impact_df["valuation_increase_upper_bound"].median(),
valuation_impact_df["valuation_increase_average"].median()
)
# Bill savings per property
min_bill_savings, max_bill_savings, average_bill_savings = (
units_hitting_target["total_bill_savings"].min(),
units_hitting_target["total_bill_savings"].max(),
units_hitting_target["total_bill_savings"].mean()
)
# Total CO2 reduction of portfolio
min_co2_reduction, max_co2_reduction, average_co2_reduction, total_co2_reduction = (
units_hitting_target["total_carbon"].min(),
units_hitting_target["total_carbon"].max(),
units_hitting_target["total_carbon"].mean(),
units_hitting_target["total_carbon"].sum()
)
slide_2_commentary = (
f"{n_units_to_target} units expected to achieve EPC {EPC_TARGET} \n"
f"Expected cost: {expected_cost_per_unit_lower} - {expected_cost_per_unit_upper}, total project: £"
f"{expected_project_cost}\n"
f"Measures include: {measures}\n"
f"Valuation increase per property: £{min_valuation_impact}-{max_valuation_impact}, average: £"
f"{average_valuation_impact}\n"
f"Bill savings per property: £{min_bill_savings}-{max_bill_savings}, average: £{average_bill_savings}\n"
f"Total CO2 reduction: {min_co2_reduction}-{max_co2_reduction} tonnes, average: {average_co2_reduction}\n"
f"tonnes, total for the {n_units_to_target} properties: {total_co2_reduction} tonnes\n"
)
############
# Slide 3:
############
units_missed_target = recommendations_summary_second_scenario.copy()
n_units_missed_target = units_missed_target.shape[0]
# How close were the properties that missed the target
# We calculate the difference between the expected sap points and the lower bound sap points for the target
# min_difference, max_difference, average_difference = (
# np.ceil(units_missed_target["sap_difference"].min()),
# np.ceil(units_missed_target["sap_difference"].max()),
# np.ceil(units_missed_target["sap_difference"].mean())
# )
second_scenario_measures = ("Electrical heating system upgrades & heating controls, Hot water system improvements "
"and internal wall insulation")
# Just take all of the units in the second scenario, since they're borderline
units_hitting_target_second_scenario = recommendations_summary_second_scenario[
# (recommendations_summary_second_scenario["expected_epc_rating"] == EPC_TARGET) &
(recommendations_summary_second_scenario["uprn"].isin(units_missed_target["uprn"].values))
]
n_units_hitting_second_scenario = units_hitting_target_second_scenario[
units_hitting_target_second_scenario["expected_epc_rating"] == EPC_TARGET
].shape[0]
# Impact on second scenario
# Costs
(
expected_cost_per_unit_lower_second_scenario,
expected_cost_per_unit_upper_second_scenario,
expected_project_cost_second_scenario,
) = (
recommendations_summary_second_scenario["total_cost"].min(),
recommendations_summary_second_scenario["total_cost"].max(),
recommendations_summary_second_scenario["total_cost"].sum()
)
valuation_impact_df_second_scenario = plans_second_scenario_df[
plans_second_scenario_df["uprn"].isin(units_hitting_target_second_scenario["uprn"])
]
valuation_impact_df_second_scenario = valuation_impact_df_second_scenario[
valuation_impact_df_second_scenario["valuation_increase_lower_bound"] > 0
]
(
min_valuation_impact_second_scenario,
max_valuation_impact_second_scenario,
average_valuation_impact_second_scenario
) = (
valuation_impact_df_second_scenario["valuation_increase_lower_bound"].median(),
valuation_impact_df_second_scenario["valuation_increase_upper_bound"].median(),
valuation_impact_df_second_scenario["valuation_increase_average"].median()
)
# Bill savings per property
min_bill_savings_second_scenario, max_bill_savings_second_scenario, average_bill_savings_second_scenario = (
units_hitting_target_second_scenario["total_bill_savings"].min(),
units_hitting_target_second_scenario["total_bill_savings"].max(),
units_hitting_target_second_scenario["total_bill_savings"].mean()
)
# Total CO2 reduction of portfolio
(
min_co2_reduction_second_scenario,
max_co2_reduction_second_scenario,
average_co2_reduction_second_scenario,
total_co2_reduction_second_scenario
) = (
units_hitting_target_second_scenario["total_carbon"].min(),
units_hitting_target_second_scenario["total_carbon"].max(),
units_hitting_target_second_scenario["total_carbon"].mean(),
units_hitting_target_second_scenario["total_carbon"].sum()
)
# Values for the leftovers
units_missing_second_scenario = recommendations_summary_second_scenario[
(recommendations_summary_second_scenario["expected_epc_rating"] != EPC_TARGET) &
(recommendations_summary_second_scenario["uprn"].isin(units_missed_target["uprn"].values))
]
min_difference_second_scenario, max_difference_second_scenario, average_difference_second_scenario = (
np.ceil(units_missing_second_scenario["sap_difference"].min()),
np.ceil(units_missing_second_scenario["sap_difference"].max()),
np.ceil(units_missing_second_scenario["sap_difference"].mean())
)
slide_3_text = (
f"{n_units_missed_target} units look like they would miss the EPC {EPC_TARGET} by {min_difference}-"
f"{max_difference} points \n"
"When on site, an assessor may be able to identify further improvements to bring the properties up to an EPC "
f"{EPC_TARGET}.\n"
f"We have looked at a more extensive package for these properties, including: {second_scenario_measures}\n"
f"Of the {n_units_missed_target} properties, a further {units_hitting_target_second_scenario.shape[0]} are "
f"expected to achieve EPC {EPC_TARGET} with these measures.\n"
f"Expected cost: {expected_cost_per_unit_lower_second_scenario} - "
f"{expected_cost_per_unit_upper_second_scenario}, "
f"total project: £"
f"{expected_project_cost_second_scenario}\n"
f"Valuation increase per property: £{min_valuation_impact_second_scenario}-"
f"{max_valuation_impact_second_scenario}, average: £"
f"{average_valuation_impact_second_scenario}\n"
f"Bill savings per property: £{min_bill_savings_second_scenario}-{max_bill_savings_second_scenario}, "
f"average: £{average_bill_savings_second_scenario}\n"
f"Total CO2 reduction: {min_co2_reduction_second_scenario}-{max_co2_reduction_second_scenario} tonnes, "
f"average: "
f"{average_co2_reduction_second_scenario}\n"
f"tonnes, total for the {n_units_hitting_second_scenario} properties: {total_co2_reduction_second_scenario} "
f"tonnes\n"
f"Even in the second scenario, the remaining {units_missing_second_scenario.shape[0]} properties are expected "
f"to miss EPC {EPC_TARGET} by {min_difference_second_scenario} point on average - they should be visited by "
f"an assessor"
)
slide_data = {
'slide_1': {
"title": "EPC Rating Distribution",
'image_path': figure_path, # Pass the path to the saved image
"text": slide_1_commentary
},
"slide_2": {
"title": f"Properties that achieve EPC {EPC_TARGET}",
"text": slide_2_commentary,
},
"slide 3": {
"title": f"Properties that miss EPC {EPC_TARGET}",
"text": slide_3_text
}
}
save_location = f"etl/customers/{CUSTOMER_KEY}/{CUSTOMER_KEY}_tech_slides.pptx"
create_powerpoint(slide_data, save_location)

View file

@ -18,43 +18,40 @@ from recommendations.recommendation_utils import calculate_cavity_age
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
DATA_FOLDER = Path(__file__).parent / "local_data" / "ha_data"
logger = setup_logger()
load_dotenv(ENV_FILE)
class DataLoader:
MIN_ROWS = {
"ha_1": 2,
"ha_6": 2,
"ha_14": 3, # The spreadsheet starts from the third row
"ha_39": 2,
"ha_107": 2,
}
COLUMN_CONFIG = {
"ha_1": {
"HA1": {
"address": "Address",
"postcode": "Address - Postcode"
},
"HA6": {
"address": "propertyaddress",
"postcode": "address" # The 'address' column actually contains postcode
}
}
def __init__(self, files, use_cache):
self.files = files
def __init__(self, directories, use_cache):
self.directories = directories
self.use_cache = use_cache
self.data = {}
def create_asset_list_matching_address(self, ha_name, asset_list):
if ha_name in ["ha_1", "ha_6"]:
if ha_name in ["HA1", "HA6"]:
asset_list["matching_address"] = asset_list[
self.COLUMN_CONFIG[ha_name]["address"]
].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list[
self.COLUMN_CONFIG[ha_name]["postcode"]
].str.lower().str.strip()
elif ha_name == "ha_14":
elif ha_name == "HA14":
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
asset_list["Address 2"].str.lower().str.strip() + ", " + \
@ -62,7 +59,7 @@ class DataLoader:
asset_list["Address 4"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
elif ha_name == "ha_39":
elif ha_name == "HA39":
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \
@ -71,7 +68,7 @@ class DataLoader:
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
asset_list["post_code"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
elif ha_name == "ha_107":
elif ha_name == "HA107":
# Create matching_address by concatenating House No, Street, Town, District, Postcode
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Street"].str.lower().str.strip() + ", " + \
@ -87,7 +84,7 @@ class DataLoader:
def append_asset_list_built_form(self, ha_name, asset_list):
# Finally, we process property_type or built form, where needed
if ha_name == "ha_6":
if ha_name == "HA6":
asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6)
return asset_list
@ -99,7 +96,7 @@ class DataLoader:
:return:
"""
if ha_name in ["ha_107"]:
if ha_name in ["HA107"]:
asset_list["HouseNo"] = asset_list["House No"].copy()
else:
split_addresses = asset_list['matching_address'].str.split(',', expand=True)
@ -113,32 +110,41 @@ class DataLoader:
return asset_list
def load_asset_list(self, file_path, ha_name, sheet_name=None):
workbook = openpyxl.load_workbook(file_path)
if sheet_name is not None:
sheet = workbook[sheet_name]
@staticmethod
def create_ciga_list_house_no(ha_name, ciga_list):
"""
This function will append the House number onto the asset list
:return:
"""
if ha_name in ["HA6"]:
split_addresses = ciga_list['Matched Address'].str.split(',', expand=True)
house_numbers = split_addresses[0].str.split(' ', expand=True)
# THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how
# many columns there might be
house_numbers = house_numbers.iloc[:, 0:1]
house_numbers.columns = ['HouseNo']
ciga_list = pd.concat([ciga_list, house_numbers[["HouseNo"]]], axis=1)
else:
sheet = workbook.active
sheet_colnames = [cell.value for cell in sheet[self.MIN_ROWS[ha_name] - 1]]
raise NotImplementedError("Implement me")
return ciga_list
def load_asset_list(self, filepath, ha_name):
workbook = openpyxl.load_workbook(filepath)
asset_sheet = workbook["Assets"]
asset_sheet_colnames = [cell.value for cell in asset_sheet[1]]
rows_data = []
rows_colors = []
for row in tqdm(
sheet.iter_rows(min_row=self.MIN_ROWS[ha_name], values_only=False)
): # Assuming the first row is headers
for row in asset_sheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None
# row_color = COLOR_INDEX[row_color]
rows_data.append(row_data)
rows_colors.append(row_color)
asset_list = pd.DataFrame(rows_data, columns=sheet_colnames)
asset_list = pd.DataFrame(rows_data, columns=asset_sheet_colnames)
asset_list = asset_list.loc[:, asset_list.columns.notnull()]
asset_list['row_color'] = rows_colors
# Remove entirely empty roww - consider all rows apart from row_color
# Remove entirely empty rows - consider all rows apart from row_color
asset_list = asset_list.loc[asset_list.loc[:, asset_list.columns != 'row_color'].notnull().any(axis=1)]
# Add in asset_list_row_id
@ -151,77 +157,43 @@ class DataLoader:
asset_list = self.append_asset_list_built_form(ha_name=ha_name, asset_list=asset_list)
return asset_list
# We check if there is a survey list
survey_list = pd.DataFrame()
if "ECO Surveys" in workbook.sheetnames:
survey_sheet = workbook["ECO Surveys"]
survey_rows = []
for row in survey_sheet.iter_rows(min_row=2, values_only=False): # Assuming the first row is headers
row_data = [cell.value for cell in row] # This will get you the cell values
survey_rows.append(row_data)
def load_survey_list(self, file_path, ha_name, asset_list, sheet_name=None):
survey_workbook = openpyxl.load_workbook(file_path)
if sheet_name is not None:
survey_sheet = survey_workbook[sheet_name]
else:
survey_sheet = survey_workbook.active
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
# Remove columns that are None
survey_list = survey_list.loc[:, survey_list.columns.notnull()]
survey_list["survey_list_row_id"] = [ha_name + "_survey_" + str(i) for i in range(0, len(survey_list))]
# Perform survey list merge
survey_list = self.merge_surveys_to_assets(asset_list, survey_list, ha_name)
survey_rows = []
survey_colors = []
# We check if there are CIGA checks
ciga_list = pd.DataFrame()
if "CIGA Checks" in workbook.sheetnames:
ciga_sheet = workbook["CIGA Checks"]
ciga_rows = []
for row in ciga_sheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
ciga_rows.append(row_data)
for row in tqdm(survey_sheet.iter_rows(min_row=2, values_only=False)): # Assuming the first row is headers
row_data = [cell.value for cell in row] # This will get you the cell values
row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None
survey_rows.append(row_data)
survey_colors.append(row_color)
ciga_list = pd.DataFrame(ciga_rows, columns=[cell.value for cell in ciga_sheet[1]])
# Remove columns that are None
ciga_list = ciga_list.loc[:, ciga_list.columns.notnull()]
ciga_list = self.create_ciga_list_house_no(ha_name, ciga_list)
# Perform ciga list merge
ciga_list = self.merge_ciga_to_assets(asset_list, ciga_list, ha_name)
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
# Remove columns that are None
survey_list = survey_list.loc[:, survey_list.columns.notnull()]
survey_list["row_colour"] = survey_colors
# The survey list has 4 possible colours:
# PURPLE - Installer advised install complete and a complimentary post works EPC has been completed.
# GREEN - Installer advised install complete.
# RED - Cancelled
# BLUE - Loft Only Installed
# NO FILL - No official update from installer (could be installed or cancelled)
survey_list["row_colour_name"] = np.where(
survey_list["row_colour"] == survey_list_colours["red"], "red",
np.where(survey_list["row_colour"] == survey_list_colours["green"], "green",
np.where(survey_list["row_colour"] == survey_list_colours["purple"], "purple",
np.where(survey_list["row_colour"] == survey_list_colours["blue"], "blue", "no fill")))
)
survey_list["row_meaning"] = np.where(
survey_list["row_colour_name"] == "red", "Cancelled",
np.where(
survey_list["row_colour_name"] == "green",
"Installer advised install complete",
np.where(
survey_list["row_colour_name"] == "purple",
"Installer advised install complete and a complimentary post works EPC has been completed",
np.where(
survey_list["row_colour_name"] == "blue",
"Loft Only Installed",
"No official update from installer (could be installed or cancelled)"
)
)
)
)
# Add in asset_list_row_id
survey_list["survey_list_row_id"] = [ha_name + "_surveys_" + str(i) for i in range(0, len(survey_list))]
# We now do the matching between the asset list and the survey list.
# What we'll get from this is a lookup table from the asset list to the survey list
if ha_name == "ha_6":
matched_lookup = self.merge_ha_6(asset_list, survey_list)
else:
raise NotImplementedError("Only HA 6 has surveys")
return survey_list, matched_lookup
return asset_list, survey_list, ciga_list
@staticmethod
def merge_ha_6(asset_list, survey_list):
def correct_ha6_asset_list(asset_list):
# Correct the asset list across propertyaddress and matching_address
asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Baggott Place", "Baggotts Place")
asset_list["matching_address"] = asset_list["matching_address"].str.replace("baggott place", "baggotts place")
@ -234,6 +206,11 @@ class DataLoader:
asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Moffat Way", "Moffatt Way")
asset_list["matching_address"] = asset_list["matching_address"].str.replace("moffat way", "moffatt way")
return asset_list
@staticmethod
def correct_ha6_survey_list(survey_list):
# Correct the survey list
survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace(
"Seabridge Road", "Seabridge Lane"
@ -358,10 +335,23 @@ class DataLoader:
"Post Code"
] = "ST5 7BY"
missed_postcodes = [
postcode.lower() for postcode in survey_list["Post Code"] if
postcode.lower() not in asset_list["matching_postcode"].values
]
return survey_list
def merge_surveys_to_assets(self, asset_list, survey_list, ha_name):
# Correct the asset list
asset_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_asset_list")
asset_list = asset_list_correction_function(asset_list)
# Correct the survey list
survey_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_survey_list")
survey_list = survey_list_correction_function(survey_list)
missed_postcodes = []
if ha_name == "HA6":
missed_postcodes = [
postcode.lower() for postcode in survey_list["Post Code"] if
postcode.lower() not in asset_list["matching_postcode"].values
]
matching_lookup = []
for _, row in tqdm(survey_list.iterrows(), total=len(survey_list)):
@ -405,7 +395,54 @@ class DataLoader:
matching_lookup = pd.DataFrame(matching_lookup)
return matching_lookup
# Merge onto the survey list
survey_list = survey_list.merge(matching_lookup, how='left', on="survey_list_row_id")
return survey_list
def merge_ciga_to_assets(self, asset_list, ciga_list, ha_name):
matching_lookup = []
for _, row in tqdm(ciga_list.iterrows(), total=len(ciga_list)):
house_number = row["HouseNo"]
if isinstance(house_number, str):
house_number = house_number.lower().strip()
# Filter on the postcode
df = asset_list[
asset_list["matching_address"].str.contains(row["Matched Postcode"].lower().strip())
].copy()
df = df[df["HouseNo"] == str(house_number)]
# TODO: Might need to consider street name at some point
if df.shape[0] != 1:
if df.shape[0] != 1:
df = df[df["matching_postcode"].str.lower().str.contains(row["Post Code"].lower())]
if df.shape[0] != 1:
postcode_lower = row["Post Code"].lower()
if postcode_lower in missed_postcodes:
matching_lookup.append(
{
"survey_list_row_id": row["survey_list_row_id"],
"asset_list_row_id": None,
}
)
continue
print(row["Street / Block Name"])
print(house_number)
print(row["Post Code"].lower())
raise ValueError("Investigate")
matching_lookup.append(
{
"survey_list_row_id": row["survey_list_row_id"],
"asset_list_row_id": df["asset_list_row_id"].values[0],
}
)
matching_lookup = pd.DataFrame(matching_lookup)
@staticmethod
def identify_built_form_ha6(property_string):
@ -445,16 +482,17 @@ class DataLoader:
return
data = {}
for ha_name, file_config in self.files.items():
for filepath in self.directories:
ha_name = filepath.split("/")[2]
# Load asset list
logger.info("Loading asset list for {}".format(ha_name))
asset_list = self.load_asset_list(
file_path=file_config["asset_list"]["filepath"],
asset_list, survey_list, ciga_list = self.load_asset_list(
filepath=filepath,
ha_name=ha_name,
sheet_name=file_config["asset_list"]["sheetname"]
)
if file_config.get("survey_list"):
# TODO: Delete this
logger.info("Loading survey list for {}".format(ha_name))
survey_list, matched_lookup = self.load_survey_list(
asset_list=asset_list,
@ -1240,13 +1278,16 @@ def analyse_ha_data(outputs, loader):
def app():
"""
This app contains the housign association analysis for HAs 1, 6, 14, 39 and 107.
This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107.
Only HA 6 has surveys
:return:
"""
use_cache = False
# List all of the data in the folder
directories = [str(list(entry.iterdir())[0]) for entry in DATA_FOLDER.iterdir() if entry.is_dir()]
files = {
"ha_1": {
"asset_list": {
@ -1284,7 +1325,7 @@ def app():
}
}
loader = DataLoader(files, use_cache)
loader = DataLoader(directories, use_cache)
loader.load()
# TODO: We probably need to make sure that we have all of the columns that we need

View file

@ -467,8 +467,7 @@ class EPCRecord:
]
if (
self.construction_age_band is not None
and self.construction_age_band not in DATA_ANOMALY_MATCHES
self.construction_age_band not in DATA_ANOMALY_MATCHES
):
result = result[
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
@ -481,7 +480,7 @@ class EPCRecord:
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[
["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
].mean()
def _clean_property_dimensions(self):
@ -490,12 +489,11 @@ class EPCRecord:
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
raise ValueError("EPC Record doesn not contain epc data")
if not self.prepared_epc["number-habitable-rooms"] or (
self.prepared_epc["floor-height"] == ""
or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
if (self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES) or (
self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
) or (self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES):
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET,
file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet",
@ -504,14 +502,17 @@ class EPCRecord:
property_dimensions
)
if not self.prepared_epc["number-habitable-rooms"]:
if self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["number-habitable-rooms"] = float(
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()
)
else:
self.prepared_epc["number-habitable-rooms"] = float(
self.prepared_epc["number-habitable-rooms"]
)
self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"])
if self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["number-heated-rooms"] = float(self.property_dimensions["NUMBER_HEATED_ROOMS"].round())
else:
self.prepared_epc["number-heated-rooms"] = float(self.prepared_epc["number-heated-rooms"])
self.number_of_floors = estimate_number_of_floors(
self.prepared_epc["property-type"]
@ -729,7 +730,7 @@ class EPCRecord:
old_record["lodgement-datetime"]
for old_record in self.old_data
if old_record["construction-age-band"]
not in DATA_ANOMALY_MATCHES
not in DATA_ANOMALY_MATCHES
]
)

View file

@ -35,8 +35,8 @@ def app():
cleaned_data = {}
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
for directory in tqdm(epc_directories):
for directory in tqdm(epc_directories):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]

View file

@ -16,7 +16,6 @@ class MainHeatAttributes(Definitions):
"solar assisted heat pump",
"exhaust source heat pump",
"community heat pump",
"portable electric heating"
]
FUEL_TYPES = ["electric", "mains gas", "wood logs", "coal", "oil", "wood pellets", "anthracite",
"dual fuel mineral and wood", "smokeless fuel", "lpg", "b30k"]
@ -62,7 +61,8 @@ class MainHeatAttributes(Definitions):
REMAP = {
"electric ceiling": "electric ceiling heating",
"electric heat pumps": "electric heat pump",
"solar-assisted heat pump": "solar assisted heat pump"
"solar-assisted heat pump": "solar assisted heat pump",
"portable electric heating": "portable electric heaters",
}
edge_case_result = {}
@ -139,6 +139,8 @@ class MainHeatAttributes(Definitions):
result.update({f'has_{ft.replace(" ", "_")}': False for ft in self.FUEL_TYPES})
result.update({f'has_{ot.replace(" ", "_")}': False for ot in self.OTHERS})
result['has_underfloor_heating'] = False
# We re-map entries that are the same
# We just drop those keys
if self.nodata:
return result

View file

@ -7,7 +7,7 @@ from pathlib import Path
import pandas as pd
from tqdm import tqdm
from etl.epc.settings import EARLIEST_EPC_DATE
from etl.epc.DataProcessor import DataProcessor
from etl.epc.DataProcessor import EPCDataProcessor
from BaseUtility import Definitions
from utils.s3 import save_dataframe_to_s3_parquet
@ -21,24 +21,31 @@ BUCKET = os.environ.get("BUCKET", "retrofit-data-dev")
def app():
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
sample = []
for directory in tqdm(directories):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
data = data[data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
data = data[~pd.isnull(data["UPRN"])]
data["TOTAL_FLOOR_AREA"] = data["TOTAL_FLOOR_AREA"].astype(float)
data["CONSTRUCTION_AGE_BAND"] = data["CONSTRUCTION_AGE_BAND"].apply(
lambda x: DataProcessor.clean_construction_age_band(x)
lambda x: EPCDataProcessor.clean_construction_age_band(x)
)
data = data[~pd.isnull(data["CONSTRUCTION_AGE_BAND"])]
data = data[~data["CONSTRUCTION_AGE_BAND"].isin(Definitions.DATA_ANOMALY_MATCHES)]
data = data[~pd.isnull(data["TOTAL_FLOOR_AREA"])]
data = data[~pd.isnull(data["NUMBER_HABITABLE_ROOMS"])]
data = data[~pd.isnull(data["FLOOR_HEIGHT"])]
data = data[~pd.isnull(data["NUMBER_HEATED_ROOMS"])]
df = (
data.groupby(GROUPBY)
.agg({"NUMBER_HABITABLE_ROOMS": "median", "TOTAL_FLOOR_AREA": "mean", "FLOOR_HEIGHT": "mean"})
.agg(
{"NUMBER_HEATED_ROOMS": "median", "NUMBER_HABITABLE_ROOMS": "median", "TOTAL_FLOOR_AREA": "mean",
"FLOOR_HEIGHT": "mean"}
)
.reset_index()
)

View file

@ -40,6 +40,10 @@ MCS_SOLAR_PV_COST_DATA = {
# This is based on quotes from installers
BATTERY_COST = 3500
# This is based on https://www.checkatrade.com/blog/cost-guides/cost-smart-thermostat/
SMART_APPLIANCE_THERMOSTAT_COST = 400
PROGRAMMER_COST = 200
class Costs:
"""
@ -878,3 +882,119 @@ class Costs:
"labour_hours": 72,
"labour_days": 2,
}
def programmer_and_appliance_thermostat(self, has_programmer):
"""
Calculate the total cost of installing a programmer and appliance thermostat
If the property already has a programmer, then the only thing we need to calculate the cost for is the
appliance thermostat
"""
if has_programmer:
labour_hours = 2
total_cost = SMART_APPLIANCE_THERMOSTAT_COST
else:
labour_hours = 4
total_cost = SMART_APPLIANCE_THERMOSTAT_COST + PROGRAMMER_COST
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# We estimate the cost of an appliance thermostat at £400, which is the upper end of the range
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": 1,
}
def electric_room_heaters(self, number_heated_rooms):
"""
We base the estimates for the cost of electric room heaters on the cost per room as estimated by the
following article:
https://www.bestelectricradiators.co.uk/blog/cost-to-install-a-new-heating-system-uk/
:param number_heated_rooms: int, number of rooms to be heated
:return:
"""
total_cost = 500 * number_heated_rooms
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# TODO: Rough estimate to be reviewed
labour_hours = 1 * number_heated_rooms
labour_days = np.ceil(labour_hours / 8)
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": labour_days,
}
def high_heat_electric_storage_heaters(self, number_heated_rooms):
"""
We base the estimates for the cost of electric storage heaters on the cost per room as estimated by the
energy saving trust
https://energysavingtrust.org.uk/advice/electric-heating/
The cost is based on the number of heated rooms
:param number_heated_rooms: int, number of rooms to be heated
"""
total_cost = 1500 * number_heated_rooms
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# TODO: Rough estimate to be reviewed
labour_hours = 3 * number_heated_rooms
labour_days = np.ceil(labour_hours / 8)
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": labour_days,
}
def celect_type_controls(self):
"""
Calculate the cost of installing Celect type controls
"""
# The £50 cost is a rough estimate based on internet research
total_cost = 50
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# We estimate the labour hours to be 4
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": 4,
"labour_days": 1,
}
def hot_water_tank_insulation(self):
"""
Calculate the cost of installing hot water tank insulation
"""
# The £50 cost is a rough estimate based on internet research
total_cost = 50
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": 0,
"labour_days": 0,
}

View file

@ -71,9 +71,7 @@ class FloorRecommendations(Definitions):
def recommend(self, phase=0):
u_value = self.property.floor["thermal_transmittance"]
property_type = self.property.data["property-type"]
floor_area = self.property.insulation_floor_area
year_built = self.property.year_built
@ -90,6 +88,10 @@ class FloorRecommendations(Definitions):
):
return
# If the property is a new build flat, we won't recommend floor upgrades
if len(self.property.full_sap_epc) and (property_type == "Flat"):
return
if u_value:
# By being built more recently than this, it means that the property was likely build with soild
@ -101,16 +103,17 @@ class FloorRecommendations(Definitions):
# The floor is already compliant
return
u_value = get_floor_u_value(
floor_type=self.property.floor_type,
area=floor_area,
perimeter=self.property.perimeter,
age_band=self.property.age_band,
insulation_thickness=self.property.floor["insulation_thickness"],
wall_type=self.property.wall_type
)
if u_value is None:
u_value = get_floor_u_value(
floor_type=self.property.floor_type,
area=floor_area,
perimeter=self.property.perimeter,
age_band=self.property.age_band,
insulation_thickness=self.property.floor["insulation_thickness"],
wall_type=self.property.wall_type
)
self.estimated_u_value = u_value
self.estimated_u_value = u_value
if u_value < self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
return

View file

@ -0,0 +1,107 @@
from recommendations.Costs import Costs
from recommendations.recommendation_utils import check_simulation_difference
from backend.Property import Property
from etl.epc_clean.epc_attributes.MainheatControlAttributes import MainheatControlAttributes
class HeatingControlRecommender:
def __init__(self, property_instance: Property):
self.property = property_instance
self.costs = Costs(self.property)
self.recommendation = []
def recommend(self, heating_description):
# Reset the recommendations
self.recommendation = []
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
if heating_description in ["Room heaters, electric"]:
self.recommend_room_heaters_electric_controls()
return
if heating_description in ["Electric storage heaters", "Electric storage heaters, radiators"]:
self.recommend_high_heat_retention_controls()
return
def recommend_room_heaters_electric_controls(self):
"""
If the home has Room heaters, electric, we start by identifying potential heating controls that could
be upgraded, that would provide a practical impact. This will be the least invasive improvement.
We can then consider the heating system itself
:return:
"""
if (self.property.data["mainheatc-energy-eff"] in ["Poor", "Very Poor", "Average"]) or (
self.property.main_heating_controls["clean_description"] in ["Programmer and room thermostat"]
):
# We recommend Programmer and appliance thermostats as the heating control. This has an average energy
# efficiency rating, and is likely to be more efficient than the current heating controls. if the
# rating is poor or very poor, the home may have a Programmer and room thermostat, which is less efficient
# than a Programmer and appliance thermostats, because it allows for much more granular control at not
# just a room level but individual heater/appliance level
# Note: A room thermostat is commonly placed in a hallway, and it measures the temperature of the air
# surrounding it. It then sends a signal to the heating system to turn on or off, depending on the
# temperature. An appliance thermostat, on the other hand, is placed on the heater/appliance itself, and
# measures the temperature of the heater/appliance. This allows for much more granular control, and
# prevents overheating.
# In order to cost, we check if the property already has a programmer, and therefor we will just need to
# add the cost of the appliance thermostats
has_programmer = self.property.main_heating_controls["switch_system"] == "programmer"
ending_config = MainheatControlAttributes("Programmer and appliance thermostats").process()
# We look at what has changed in the ending config, and compare it to the current config
# We use this to determine how we should be updating the config
simulation_config = check_simulation_difference(
new_config=ending_config, old_config=self.property.main_heating_controls
)
# This upgrade will only take the heating system to average energy efficiency
simulation_config["mainheatc_energy_eff_ending"] = "Good"
self.recommendation.append(
{
"description": "upgrade heating controls to Programmer and Appliance or Smart Thermostats",
**self.costs.programmer_and_appliance_thermostat(has_programmer=has_programmer),
"simulation_config": simulation_config
}
)
# We don't implement any other recommendations right now
return
def recommend_high_heat_retention_controls(self):
"""
When applicable, we recommend upgrading the heating controls to high heat retention controls. This is a
specific type of control system that is designed to work with electric storage heaters. It is a more
efficient control system than the standard controls that come with electric storage heaters.
We can then consider the heating system itself
:return:
"""
# We recommend upgrading to Celect type controls
ending_config = MainheatControlAttributes("Controls for high heat retention storage heaters").process()
# We look at what has changed in the ending config, and compare it to the current config
simulation_config = check_simulation_difference(
new_config=ending_config, old_config=self.property.main_heating_controls
)
# This upgrade will only take the heating system to average energy efficiency
simulation_config["mainheatc_energy_eff_ending"] = "Good"
self.recommendation.append(
{
"description": "upgrade heating controls to High Heat Retention Storage Heater Controls",
**self.costs.celect_type_controls(),
"simulation_config": simulation_config
}
)
# We don't implement any other recommendations right now
return

View file

@ -0,0 +1,184 @@
import pandas as pd
from recommendations.Costs import Costs
from recommendations.recommendation_utils import check_simulation_difference
from backend.Property import Property
from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes
from recommendations.HeatingControlRecommender import HeatingControlRecommender
class HeatingRecommender:
def __init__(self, property_instance: Property):
self.property = property_instance
self.costs = Costs(self.property)
self.recommendations = []
def recommend(self, phase=0):
self.recommendations = []
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
if self.property.main_heating["clean_description"] in [
"Room heaters, electric", "Electric storage heaters", "Electric storage heaters, radiators"
]:
# Recommend high heat retention storage heaters
self.recommend_electric_storage_heaters(phase=phase, system_change=True, heating_controls_only=False)
return
@staticmethod
def check_simulation_difference(old_config, new_config):
"""
Given two dictionaries, that describe the heating control configurations, this method will compare the two
and pick out the differences. These differences will be things that have been added and things that have been
removed. This will be used to determine how we should be updating the configuration in the simulation
:return:
"""
differences = {key + "_ending": new_config[key] for key in new_config if old_config[key] != new_config[key]}
return differences
@staticmethod
def combine_heating_and_controls(
controls_recommendations, heating_simulation_config, costs, description, phase, heating_controls_only,
system_change
):
"""
Given a recommendation for heating controls, and a recommendation for the heating system, we combine the two
into a single recommendation
:param controls_recommendations: The heating controls recommendations
:param heating_simulation_config: The simulation configuration for the heating system
:param costs: The costs of the heating system
:param description: The description of the recommendation
:param phase: The phase of the recommendation
:param heating_controls_only: If True, we will also add a recommendation for heating controls only
:param system_change: Indicates if we are recommending a different type of heating system, compared to the
current system. If we have a system change and we have a heat control recommendation, we only recommend
both heating and controls together
:return:
"""
# We produce recommendations with & without heating controls
# We will also produce a recommendation for heating controls only
heating_controls_switch = [True, False] if controls_recommendations else [False]
if not heating_simulation_config:
heating_controls_switch = []
if system_change and len(controls_recommendations):
heating_controls_switch = [True]
output = []
for controls_switch in heating_controls_switch:
total_costs = costs.copy()
recommendation_simulation_config = heating_simulation_config.copy()
recommendation_description = description
if controls_switch:
# We add the costs of the heating controls, onto each key in the costs dictionary
for key in total_costs:
total_costs[key] += controls_recommendations[0][key]
recommendation_simulation_config = {
**recommendation_simulation_config,
**controls_recommendations[0]["simulation_config"]
}
recommendation_description = f"{description} and {controls_recommendations[0]['description']}"
recommendation = {
"phase": phase,
"parts": [
# TODO
],
"type": "heating",
"description": recommendation_description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**total_costs,
"simulation_config": recommendation_simulation_config
}
output.append(recommendation)
if heating_controls_only and len(controls_recommendations):
# Also add on a recommendation for heating controls only
heating_control_recommendation = controls_recommendations[0].copy()
# Capitalize the first letter of the description
heating_control_recommendation["description"] = (
heating_control_recommendation["description"][0].upper() +
heating_control_recommendation["description"][1:]
)
output.append(
{
"phase": phase,
"parts": [
# TODO
],
"type": "heating",
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**heating_control_recommendation
}
)
return output
def recommend_electric_storage_heaters(self, phase, system_change, heating_controls_only):
"""
We recommend electric storage heaters as an upgrade to the heating system.
We will recommend upgrading to a high heat retention storage system, if the current system is not already
high heat retention storage
:param phase: The phase of the recommendation
:param system_change: Indicates if we are recommending a different type of heating system, compared to the
current system
:param heating_controls_only: Indicates if we should include a recommendation for just heating controls
:return:
"""
controls_recommender = HeatingControlRecommender(self.property)
# The heating controls we're recommending for are based on the recommended heating system
high_heat_retention_contols_desc = "Controls for high heat retention storage heaters"
# We only recommend Celect-type controls if the current heating system is not Celect-type controls
if self.property.main_heating_controls["clean_description"] != high_heat_retention_contols_desc:
controls_recommender.recommend(heating_description="Electric storage heaters, radiators")
# Conditions for not needing this recommendation
already_installed_hh_retention = (
"Electric storage heaters" in self.property.main_heating["clean_description"] and
self.property.main_heating_controls["clean_description"].lower() == high_heat_retention_contols_desc.lower()
)
# Conditions for not recommending electric storage heaters
if already_installed_hh_retention:
# No recommendation needed
return
# Set up artefacts, suitable for the simulation and regardless of controls
heating_ending_config = MainHeatAttributes("Electric storage heaters, radiators").process()
heating_simulation_config = check_simulation_difference(
new_config=heating_ending_config, old_config=self.property.main_heating
)
# This upgrade will only take the heating system to average energy efficiency
heating_simulation_config["mainheat_energy_eff_ending"] = "Average"
# Upgrade to electric storage heaters
costs = self.costs.high_heat_electric_storage_heaters(
number_heated_rooms=self.property.data["number-heated-rooms"]
)
description = "Install high heat retention electric storage heaters"
recommendations = self.combine_heating_and_controls(
controls_recommendations=controls_recommender.recommendation,
heating_simulation_config=heating_simulation_config,
costs=costs,
description=description,
phase=phase,
heating_controls_only=heating_controls_only,
system_change=system_change
)
self.recommendations.extend(recommendations)

View file

@ -0,0 +1,53 @@
from backend.Property import Property
from recommendations.Costs import Costs
class HotwaterRecommendations:
def __init__(self, property_instance: Property):
self.property = property_instance
self.costs = Costs(self.property)
self.recommendations = []
def recommend(self, phase):
"""
There are maybe a number of recommendations that are simultaneously applicable to the property.
If this is true then the phase may need to be incrememnted from within this recommendation
:param phase:
:return:
"""
# Reset the recommendations
self.recommendations = []
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
if (self.property.hotwater["heater_type"] in ["electric immersion"]) & \
(self.property.data["hot-water-energy-eff"] == "Very Poor"):
self.recommend_tank_insulation(phase=phase)
return
def recommend_tank_insulation(self, phase):
"""
If the home has a very poor hot water system, this is often indicative of a lack of insulation on the hot water
tank. This is a very simple and cost effective improvement that can be made to the home.
"""
recommendation_cost = self.costs.hot_water_tank_insulation()
self.recommendations.append(
{
"phase": phase,
"parts": [
# TODO
],
"type": "hot_water_tank_insulation",
"description": "Insulate the hot water tank with an insulation jacket",
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**recommendation_cost,
"simulation_config": {"hot_water_energy_eff_ending": "Average"}
}
)
return

View file

@ -1,5 +1,3 @@
import numpy as np
from backend.Property import Property
from typing import List
from itertools import groupby
@ -11,6 +9,8 @@ from recommendations.FireplaceRecommendations import FireplaceRecommendations
from recommendations.LightingRecommendations import LightingRecommendations
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from recommendations.WindowsRecommendations import WindowsRecommendations
from recommendations.HeatingRecommender import HeatingRecommender
from recommendations.HotwaterRecommendations import HotwaterRecommendations
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
@ -42,8 +42,10 @@ class Recommendations:
self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials)
self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials)
self.solar_recommender = SolarPvRecommendations(property_instance=property_instance)
self.heating_recommender = HeatingRecommender(property_instance=property_instance)
self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance)
def recommend(self):
def recommend(self, portfolio_id):
"""
This method runs the recommendations for the individual measures and then appends them to a list for output
@ -55,53 +57,64 @@ class Recommendations:
property_recommendations = []
phase = 0
# Wall recommendations
self.wall_recomender.recommend(phase=phase)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof insulation
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this has no
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
if self.wall_recomender.recommendations or self.roof_recommender.recommendations:
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
print("WALL RECOMMENDATIONS HAVE BEEN COMMENTED OUT TEMPORARILY - ADD ME BACK IN")
if portfolio_id != 66:
# Building Fabric
self.wall_recomender.recommend(phase=phase)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
# insulation
# We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this has no
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
if self.wall_recomender.recommendations or self.roof_recommender.recommendations:
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
# Roof recommendations
self.roof_recommender.recommend(phase=phase)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
# Floor recommendations
self.floor_recommender.recommend(phase=phase)
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase += 1
# Windows recommendations
self.windows_recommender.recommend(phase=phase)
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
# Fireplace sealing recommendations
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
phase += 1
# Lighting recommendations
# Heating and Electical systems
self.heating_recommender.recommend(phase=phase)
if self.heating_recommender.recommendations:
property_recommendations.append(self.heating_recommender.recommendations)
phase += 1
# Hot water
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
self.lighting_recommender.recommend(phase=phase)
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
phase += 1
# Solar recommendations
# Renewables
self.solar_recommender.recommend(phase=phase)
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)

View file

@ -47,6 +47,12 @@ class WallRecommendations(Definitions):
# we still consider it as an option
U_VALUE_ERROR = 0.01
# Typically when the U-value is around 0.75 and below, and the home is a new build, this is a good indication
# that the home is already insulated with at least some partial insulation. We don't recommend insulation
# in this case. This estimate was verified with the Warmfront team and 0.75 has been used as a conservative
# threshold
NEW_BUILD_INSULATED = 0.75
def __init__(
self,
property_instance: Property,
@ -114,6 +120,13 @@ class WallRecommendations(Definitions):
if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT:
raise NotImplementedError("Haven't handled the case of other u value units yet")
# If the property is a new build and the U-value is below 0.75, we don't recommend insulation because it's
# not practical
if (self.property.data["transaction-type"] == "new dwelling") and (u_value <= self.NEW_BUILD_INSULATED):
# Recommend nothing
return
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already
# + it already has a U-value WORSE than the building regulations, so we recommend either internal or
# external wall insulation
@ -121,7 +134,7 @@ class WallRecommendations(Definitions):
u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE
):
# Recommend insulation
self.find_insulation(u_value)
self.find_insulation(u_value, phase)
return
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already

View file

@ -30,7 +30,9 @@ class CostOptimiser:
:param min_gain: Numerical value for the minimum gain
:return:
"""
if min_gain <= 5:
if min_gain == 0:
return min_gain
elif min_gain <= 5:
return min_gain + 0.5
elif min_gain <= 20:
return min_gain + 1.5

View file

@ -9,10 +9,24 @@ class GainOptimiser:
This class is used to maximise gain, given a constrained cost
"""
def __init__(self, components, max_cost):
def __init__(self, components, max_cost, max_gain):
"""
This function will try and maximise the gain, given a constrained cost. If we specific a max_gain, then the
optimisation routine is constained to try not to exceed a maximum increase
If the maximum gain (`max_gain`) is explicitly set to 0, the optimization routine interprets this as an
instruction not to perform any optimization.
:param components: List of components, where each component is a dictionary with keys "id", "cost" and "gain"
:param max_cost: Maximum cost constraint
:param max_gain: Maximum gain constraint
"""
self.components = components
self.max_cost = max_cost
self.max_gain = max_gain
self.cost_constraint = None
self.max_gain_constraint = None
self.m = None
self.variables = []
self.solution = []
@ -50,6 +64,15 @@ class GainOptimiser:
self.cost_constraint = self.m.add_constr(cost_expression)
# Add an optional max gain constraint if max_gain is not None
if self.max_gain is not None:
max_gain_expression = xsum(
component['gain'] * var for group, group_vars in zip(self.components, self.variables) for component, var
in zip(group, group_vars)
) <= self.max_gain
self.max_gain_constraint = self.m.add_constr(max_gain_expression)
# This constraint ensures that at most one item from each group is selected
# This is expressed by summing up the decision variables for each group and ensuring that the sum is <= 1
for group_vars in self.variables:
@ -59,6 +82,10 @@ class GainOptimiser:
# Remove the original cost constraint
self.m.remove(self.cost_constraint)
if self.max_gain is not None:
# Remove the original max gain constraint
self.m.remove(self.max_gain_constraint)
# Add slack variable
s = self.m.add_var(lb=0)
@ -80,18 +107,34 @@ class GainOptimiser:
def solve(self):
# Solve the problem
if self.max_gain == 0:
logger.info("Max gain is set to 0, no optimisation will be performed")
# Nothing to do
return
self.m.optimize()
if self.m.status == OptimizationStatus.INFEASIBLE:
logger.info("We have an infeasible model, setting up slack model")
self.setup_slack()
self.m.optimize()
self.solution = [
solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in zip(group, group_vars)
if
var.x >= 0.99
]
if (self.m.status == OptimizationStatus.INFEASIBLE) or (
(self.m.status == OptimizationStatus.OPTIMAL) and not len(solution)
):
logger.info("We have an infeasible model, setting up slack model")
self.setup_slack()
self.m.optimize()
solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in
zip(group, group_vars)
if
var.x >= 0.99
]
self.solution = solution
self.solution_gain = self.m.objective.x
self.solution_cost = sum([component['cost'] for component in self.solution])

View file

@ -1,13 +1,17 @@
def prepare_input_measures(property_recommendations, goal):
def prepare_input_measures(property_recommendations, goal, housing_type):
"""
Basic function to convert recommendations_to_upload to a format that is
suitable for the optimiser - large
:param property_recommendations: object containing the recommendations, created in the plan trigger api
:param goal: goal to be optimised for, should be one of the keys in gain_map. E.g. if the gain is SAP points,
the goal should reflect that desired gain
:param housing_type: type of housing the recommendations are for - should be one of "Social" or "Private"
:return: Nested list of input measures
"""
if housing_type not in ["Social", "Private"]:
raise ValueError("Invalid housing type - investigate me")
goal_map = {
"Increase EPC": "sap_points"
}
@ -16,6 +20,10 @@ def prepare_input_measures(property_recommendations, goal):
if not goal_key:
raise NotImplementedError("Not implemented this gain type - investigate me")
# We don't include suspended and solid floor insulation as possible measures in private housing, because
# of the need to decant the tenant
ignored_measures = ["suspended_floor_insulation", "solid_floor_insulation"] if housing_type == "Private" else []
input_measures = []
for recs in property_recommendations:
input_measures.append(
@ -26,7 +34,7 @@ def prepare_input_measures(property_recommendations, goal):
"gain": rec[goal_key],
"type": rec["type"]
}
for rec in recs
for rec in recs if rec["type"] not in ignored_measures
]
)

View file

@ -511,6 +511,7 @@ FLOOR_LEVEL_MAP = {
"Ground": 0,
"ground floor": 0,
"mid floor": 1,
"top floor": 5,
"20+": 20,
"21st or above": 21,
**{str(i).zfill(2): i for i in range(0, 21)},

View file

@ -311,6 +311,7 @@ def get_roof_u_value(
return float(u_value)
def estimate_number_of_floors(property_type):
"""
Using the property type, we estimate the number of floors in the property
@ -324,7 +325,7 @@ def estimate_number_of_floors(property_type):
number_of_floors = 2
else:
raise NotImplementedError("Implement me")
return number_of_floors
@ -432,7 +433,6 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati
Rsi = 0.17 # in m²K/W
Rse = 0.04 # in m²K/W
lambda_ins = 0.035 # thermal conductivity of floor insulation in W/m·K
wall_thickness = [x[age_band] for x in default_wall_thickness if x["type"] == wall_type][0]
if wall_thickness is None and wall_type == "park home":
# We don't know enough and likely won't make recommendations
@ -754,3 +754,16 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned):
cavity_age = (datetime.now() - pd.to_datetime(df["inspection-date"].max())).days
return cavity_age
def check_simulation_difference(old_config, new_config):
"""
Given two dictionaries, that describe the heating control configurations, this method will compare the two
and pick out the differences. These differences will be things that have been added and things that have been
removed. This will be used to determine how we should be updating the configuration in the simulation
:return:
"""
differences = {key + "_ending": new_config[key] for key in new_config if old_config[key] != new_config[key]}
return differences

View file

@ -195,3 +195,32 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
return None
return data
def read_excel_from_s3(bucket_name, file_key, header_row):
"""
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param header_row: The row number to use as the header (0-indexed).
:return: A pandas DataFrame containing the data from the Excel file.
"""
# Ensure the file_key is an Excel file
if not file_key.endswith((".xls", ".xlsx")):
raise ValueError("The specified file does not appear to be an Excel file.")
# Use the read_io_from_s3 function to get the data as a BytesIO object
excel_buffer = read_io_from_s3(bucket_name, file_key)
# Read the Excel file into a pandas DataFrame
df = pd.read_excel(excel_buffer, header=header_row)
# Drop columns where all values are NaN
df.dropna(axis=1, how='all', inplace=True)
# Reset index if the first column is just an index or entirely NaN
df.reset_index(drop=True, inplace=True)
return df