integrating the non-intrusive recommendations for ashp and solarpv into backend engine

This commit is contained in:
Khalim Conn-Kowlessar 2024-08-05 15:35:16 +01:00
parent 19c471f614
commit 92fcd080a8
12 changed files with 613 additions and 146 deletions

View file

@ -2,6 +2,7 @@ import os
import ast
from itertools import groupby
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from etl.epc.Dataset import TrainingDataset
@ -211,9 +212,24 @@ class Property:
if n_bedrooms not in [None, ""]:
n_bedrooms = int(round(float(n_bedrooms) + 1e-5))
number_of_floors = kwargs.get("number_of_floors", None)
if number_of_floors not in [None, ""]:
number_of_floors = int(round(float(number_of_floors) + 1e-5))
insulation_floor_area = kwargs.get("insulation_floor_area", None)
if insulation_floor_area not in [None, ""]:
insulation_floor_area = float(insulation_floor_area)
insulation_wall_area = kwargs.get("insulation_wall_area", None)
if insulation_wall_area not in [None, ""]:
insulation_wall_area = float(insulation_wall_area)
return {
"n_bathrooms": n_bathrooms,
"n_bedrooms": n_bedrooms,
"number_of_floors": number_of_floors,
"insulation_floor_area": insulation_floor_area,
"insulation_wall_area": insulation_wall_area,
"building_id": kwargs.get("building_id", None),
}
@ -222,6 +238,9 @@ class Property:
self.n_bathrooms = kwargs.get("n_bathrooms", None)
self.n_bedrooms = kwargs.get("n_bedrooms", None)
self.building_id = kwargs.get("building_id", None)
self.number_of_floors = kwargs.get("number_of_floors", None)
self.insulation_floor_area = kwargs.get("insulation_floor_area", None)
self.insulation_wall_area = kwargs.get("insulation_wall_area", None)
def create_base_difference_epc_record(self, cleaned_lookup: dict):
"""
@ -1060,18 +1079,22 @@ class Property:
# We can update the number of floors if we have this information in the condition data
self.number_of_floors = int(self.energy_assessment_condition_data["number_of_floors"]) \
if condition_data.get("number_of_floors") is not None \
if (condition_data.get("number_of_floors") is not None) and (self.number_of_floors is not None) \
else self.number_of_floors
self.perimeter = float(self.energy_assessment_condition_data["perimeter"]) \
if condition_data.get("perimeter") is not None \
else estimate_perimeter(
floor_area=self.floor_area / self.number_of_floors,
num_rooms=self.number_of_rooms / self.number_of_floors
)
# If we already have this, we re-engineer the perimeter
if self.insulation_floor_area is not None:
self.perimeter = np.sqrt(self.insulation_floor_area) * 4
else:
self.perimeter = float(self.energy_assessment_condition_data["perimeter"]) \
if condition_data.get("perimeter") is not None \
else estimate_perimeter(
floor_area=self.floor_area / self.number_of_floors,
num_rooms=self.number_of_rooms / self.number_of_floors
)
self.insulation_wall_area = float(self.energy_assessment_condition_data["insulation_wall_area"]) \
if condition_data.get("insulation_wall_area") is not None \
if (condition_data.get("insulation_wall_area") is not None) and (self.insulation_wall_area is not None) \
else estimate_external_wall_area(
num_floors=self.number_of_floors,
floor_height=self.floor_height,
@ -1079,9 +1102,12 @@ class Property:
built_form=self.data["built-form"],
)
self.insulation_floor_area = float(self.energy_assessment_condition_data["main_dwelling_ground_floor_area"]) \
if condition_data.get("main_dwelling_ground_floor_area") is not None \
else self.floor_area / self.number_of_floors
if self.insulation_floor_area is not None:
self.insulation_floor_area = float(
self.energy_assessment_condition_data["main_dwelling_ground_floor_area"]
) if (condition_data.get("main_dwelling_ground_floor_area") is not None) else (
self.floor_area / self.number_of_floors
)
def set_floor_level(self):
self.floor_level = (

View file

@ -388,7 +388,7 @@ async def trigger_plan(body: PlanTriggerRequest):
property_non_invasive_recommendations = next((
x for x in non_invasive_recommendations if
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
(x["uprn"] == config["uprn"])
), {})
input_properties.append(
@ -432,6 +432,25 @@ async def trigger_plan(body: PlanTriggerRequest):
environment=get_settings().ENVIRONMENT
)
epcs_for_scoring = pd.DataFrame([energy_consumption_client.prepare_new_data(p) for p in input_properties])
# What do we need?
# We need an estimate of each properties energy consumption now, as well as the cost of heating and hot water
# The newest EPC may have been done quite some time ago, and so we should take this into consideration when
# producing the estimate for cost. With that said, we already have a methodology which will re-map the cost
# when the EPC was produced to a cost for today, however could we use the ML models.
# In theory, we could just score the kwh models via the API, pass the results into the get_components function
# and insert the kwh figures into the property and we're done
# TODO: Need to check if we need to re-map when scoring new data or not
# We need to prepare the EPC so it's in the same format as the training data
# TODO: DELETE ME
# from utils.s3 import read_dataframe_from_s3_parquet
# train = read_dataframe_from_s3_parquet(
# bucket_name="retrofit-data-dev",
# file_key="energy_consumption/2024-07-08/energy_consumption_dataset.parquet"
# )
# We need to prepare the EPC so it's in the same format as the training data
logger.info("Getting spatial data")
for p in input_properties:
p.get_components(cleaned=cleaned, energy_consumption_client=energy_consumption_client)
@ -444,6 +463,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# extensions, since it doesn't seem to do a great job
# TODO: For simple properties, we should do a comparison/check between the solar API's roof area and the
# basic estimate of roof area
# TODO: TEMP SWITCHED OFF
building_ids = [
{
"building_id": p.building_id,
@ -481,109 +501,112 @@ async def trigger_plan(body: PlanTriggerRequest):
"uprn": p.uprn
} for p in input_properties if p.building_id is None
]
if building_ids:
# Find the unique longitude and latitude pairs for each building id
unique_coordinates = {}
building_uprns = {}
for entry in building_ids:
building_id = entry['building_id']
coordinate_pair = {'longitude': entry['longitude'], 'latitude': entry['latitude']}
if False:
if building_ids:
# Find the unique longitude and latitude pairs for each building id
unique_coordinates = {}
building_uprns = {}
for entry in building_ids:
building_id = entry['building_id']
coordinate_pair = {'longitude': entry['longitude'], 'latitude': entry['latitude']}
if building_id not in unique_coordinates:
unique_coordinates[building_id] = []
if building_id not in unique_coordinates:
unique_coordinates[building_id] = []
if coordinate_pair not in unique_coordinates[building_id]:
unique_coordinates[building_id].append(coordinate_pair)
if coordinate_pair not in unique_coordinates[building_id]:
unique_coordinates[building_id].append(coordinate_pair)
if building_id not in building_uprns:
building_uprns[building_id] = []
if building_id not in building_uprns:
building_uprns[building_id] = []
if entry['uprn'] not in building_uprns[building_id]:
building_uprns[building_id].append(
{
"uprn": entry['uprn'], "longitude": entry['longitude'], "latitude": entry['latitude']
}
)
solar_panel_configuration = {}
for building_id, coordinates in unique_coordinates.items():
if len(coordinates) > 1:
raise NotImplementedError("more than one coordinate for a building - handle me")
coordinates = coordinates[0]
energy_consumption = sum(
[entry['energy_consumption'] for entry in building_ids if entry['building_id'] == building_id]
)
solar_api_client.get(
longitude=coordinates["longitude"],
latitude=coordinates["latitude"],
energy_consumption=energy_consumption,
is_building=True,
session=session
)
solar_panel_configuration[building_id] = {
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"n_units": len([entry for entry in building_ids if entry['building_id'] == building_id])
}
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it exists
solar_api_client.save_to_db(
session=session, uprns_to_location=building_uprns[building_id], scenario_type="building"
)
# Insert this into the properties that have this building id
for p in input_properties:
if p.building_id == building_id:
unit_solar_panel_configuration = solar_panel_configuration[building_id].copy()
unit_solar_panel_configuration["unit_share_of_energy"] = (
[x for x in building_ids if x["property_id"] == p.id][0]["energy_consumption"] /
energy_consumption
if entry['uprn'] not in building_uprns[building_id]:
building_uprns[building_id].append(
{
"uprn": entry['uprn'], "longitude": entry['longitude'], "latitude": entry['latitude']
}
)
p.set_solar_panel_configuration(unit_solar_panel_configuration)
if individual_units:
# Model the solar potential at the property level
for unit in individual_units:
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
if not property_instance.is_solar_pv_valid():
continue
solar_panel_configuration = {}
for building_id, coordinates in unique_coordinates.items():
if len(coordinates) > 1:
raise NotImplementedError("more than one coordinate for a building - handle me")
solar_api_client.get(
longitude=unit["longitude"],
latitude=unit["latitude"],
energy_consumption=unit["energy_consumption"],
is_building=False,
session=session,
uprn=unit["uprn"],
property_instance=property_instance
)
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it exists
solar_api_client.save_to_db(
session=session,
uprns_to_location=[
{
"uprn": property_instance.uprn,
"longitude": property_instance.spatial["longitude"],
"latitude": property_instance.spatial["latitude"]
}
],
scenario_type="unit"
)
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
coordinates = coordinates[0]
energy_consumption = sum(
[entry['energy_consumption'] for entry in building_ids if entry['building_id'] == building_id]
)
solar_api_client.get(
longitude=coordinates["longitude"],
latitude=coordinates["latitude"],
energy_consumption=energy_consumption,
is_building=True,
session=session
)
solar_panel_configuration[building_id] = {
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"unit_share_of_energy": 1
},
roof_area=solar_api_client.roof_area
)
"n_units": len([entry for entry in building_ids if entry['building_id'] == building_id])
}
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it
# exists
solar_api_client.save_to_db(
session=session, uprns_to_location=building_uprns[building_id], scenario_type="building"
)
# Insert this into the properties that have this building id
for p in input_properties:
if p.building_id == building_id:
unit_solar_panel_configuration = solar_panel_configuration[building_id].copy()
unit_solar_panel_configuration["unit_share_of_energy"] = (
[x for x in building_ids if x["property_id"] == p.id][0]["energy_consumption"] /
energy_consumption
)
p.set_solar_panel_configuration(unit_solar_panel_configuration)
if individual_units:
# Model the solar potential at the property level
for unit in individual_units:
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
if not property_instance.is_solar_pv_valid():
continue
solar_api_client.get(
longitude=unit["longitude"],
latitude=unit["latitude"],
energy_consumption=unit["energy_consumption"],
is_building=False,
session=session,
uprn=unit["uprn"],
property_instance=property_instance
)
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it
# exists
solar_api_client.save_to_db(
session=session,
uprns_to_location=[
{
"uprn": property_instance.uprn,
"longitude": property_instance.spatial["longitude"],
"latitude": property_instance.spatial["latitude"]
}
],
scenario_type="unit"
)
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"unit_share_of_energy": 1
},
roof_area=solar_api_client.roof_area
)
logger.info("Getting components and epc recommendations")
recommendations = {}

View file

@ -33,6 +33,8 @@ class PlanTriggerRequest(BaseModel):
"solar_pv",
# Specific measures
"air_source_heat_pump",
"internal_wall_insulation",
"external_wall_insulation"
}
_allowed_goals = {"Increasing EPC"}

View file

@ -6,6 +6,7 @@ from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percenta
from sklearn.feature_selection import RFECV
from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet, read_csv_from_s3
from utils.logger import setup_logger
from backend.Property import Property
logger = setup_logger()
@ -506,6 +507,57 @@ class EnergyConsumptionModel:
return prediction
@staticmethod
def prepare_new_data(p: Property):
"""
Given an instance of the property class, this method will ensure that the EPC is ready for scoring with the
kwh models. In the backend, we perform some cleaning and transformation on an EPC so we just ensure that the
data is in the format required by the model
:return:
"""
epc = p.data.copy()
numeric_cols = [
'current-energy-efficiency',
'potential-energy-efficiency', 'environment-impact-current',
'environment-impact-potential', 'energy-consumption-current',
'energy-consumption-potential', 'co2-emissions-current',
'co2-emiss-curr-per-floor-area', 'co2-emissions-potential',
'lighting-cost-current', 'lighting-cost-potential',
'heating-cost-current', 'heating-cost-potential',
'hot-water-cost-current', 'hot-water-cost-potential',
'total-floor-area', 'multi-glaze-proportion',
'extension-count', 'number-habitable-rooms', 'number-heated-rooms',
'low-energy-lighting', 'number-open-fireplaces',
'wind-turbine-count', 'unheated-corridor-length',
'floor-height', 'photo-supply', 'fixed-lighting-outlets-count',
'low-energy-fixed-light-count',
]
for v in numeric_cols:
if epc[v] is not None:
epc[v] = float(epc[v])
bools_to_remap = ['mains-gas-flag', 'flat-top-storey']
bool_map = {
True: "Y",
False: "N",
None: "N",
"Y": "Y",
"N": "N"
}
for v in bools_to_remap:
epc[v] = bool_map[epc[v]]
no_data = {
"floor-level": "NODATA!",
"floor-energy-eff": "NO DATA!"
}
for v, fill_val in no_data.items():
if pd.isnull(epc[v]):
epc[v] = fill_val
return epc
@staticmethod
def calculate_percentage_decrease(start_efficiency, end_efficiency, consumption_averages):

View file

View file

@ -0,0 +1,249 @@
import inspect
import pandas as pd
from etl.epc.settings import EARLIEST_EPC_DATE
from pathlib import Path
import numpy as np
from utils.s3 import save_csv_to_s3
src_file_path = inspect.getfile(lambda: None)
EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates"
CUSTOMER_DATA_DIRECTORY = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/Data"
USER_ID = 8
PORTFOLIO_ID = 88
def make_asset_list():
"""
Set up a small asset list for the study
"""
# Read in EPC data for Lewes
lewes_directory = EPC_DIRECTORY / "domestic-E07000063-Lewes/certificates.csv"
epc_data = pd.read_csv(lewes_directory, low_memory=False)
# Rename the columns to the same format as the api returns
epc_data.columns = [c.replace("_", "-").lower() for c in epc_data.columns]
# Take just date before the date threshold
epc_data = epc_data[epc_data["lodgement-date"] >= EARLIEST_EPC_DATE]
epc_data = epc_data[~pd.isnull(epc_data["uprn"])]
epc_data["uprn"] = epc_data["uprn"].astype(int).astype(str)
# /Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/Data/
# We read in the multiple data sources
address_base = pd.read_csv(
f"{CUSTOMER_DATA_DIRECTORY}/OS AddressBase Premium/OS AddressBase Premium.csv",
low_memory=False,
)
# Filter on resi
address_base = address_base[address_base["Primary Code Description"] == "Residential"]
address_base["UPRN"] = address_base["UPRN"].astype(int).astype(str)
pv_potential = pd.read_csv(
f"{CUSTOMER_DATA_DIRECTORY}/Domestic Rooftop PV Potential/Domestic Rooftop PV Potential.csv",
low_memory=False,
)
pv_potential["UPRN"] = pv_potential["UPRN"].astype(int).astype(str)
ashp_potential = pd.read_csv(
f"{CUSTOMER_DATA_DIRECTORY}/Air Source Heat Pump Potential/Air Source Heat Pump Potential.csv",
low_memory=False,
)
ashp_potential["UPRN"] = ashp_potential["UPRN"].astype(int).astype(str)
insulation_potential = pd.read_csv(
f"{CUSTOMER_DATA_DIRECTORY}/Insulation Potential/Insulation Potential.csv",
low_memory=False,
)
insulation_potential["UPRN"] = insulation_potential["UPRN"].astype(int).astype(str)
renewables_cost = pd.read_csv(
f"{CUSTOMER_DATA_DIRECTORY}/Low Carbon Technology Costs/Low Carbon Technology Costs.csv",
low_memory=False,
)
renewables_cost["UPRN"] = renewables_cost["UPRN"].astype(int).astype(str)
# Merge the EPC data onto address base
asset_list = address_base[
[
"UPRN", "Class Description", "Relative Height - Eaves",
]
].merge(
epc_data[
["uprn", "current-energy-efficiency", "current-energy-rating", "address1", "postcode", "floor-height",
"property-type", "built-form"]],
how="left",
left_on="UPRN",
right_on="uprn"
).drop(
columns=["uprn"]
).merge(
insulation_potential[["UPRN", "EPC Rating", "Wall Area [m^2]", "Building Area [m^2]"]],
how="left",
on="UPRN"
).rename(
columns={"Wall Area [m^2]": "insulation_wall_area", "Building Area [m^2]": "floor_area"}
)
# Take properties below a B - there are 2844 units
asset_list = asset_list[asset_list["current-energy-efficiency"].astype(float) <= 80]
# Drop caravans
asset_list = asset_list[asset_list["Class Description"] != "Caravan"]
asset_list = asset_list[~pd.isnull(asset_list["current-energy-efficiency"])]
# Take a 10% sample, for properties that have an EPC, with a seed
asset_list = asset_list.sample(frac=0.1, random_state=42)
AVG_FLOOR_HEIGHT = asset_list["floor-height"].median()
def estimate_n_floors(
building_height, floor_height, address_base_property_description, epc_property_type,
):
if address_base_property_description == "Self Contained Flat (Includes Maisonette / Apartment)":
if epc_property_type == "Flat":
return 1
if epc_property_type == "House":
return 2
return NotImplementedError("Implement me")
if pd.isnull(floor_height):
return np.round(building_height / AVG_FLOOR_HEIGHT)
return np.round(building_height / floor_height)
# Estimate the number of floors
asset_list["number_of_floors"] = asset_list.apply(
lambda x: estimate_n_floors(
building_height=x["Relative Height - Eaves"],
floor_height=x["floor-height"],
address_base_property_description=x["Class Description"],
epc_property_type=x["property-type"],
),
axis=1
)
# D 0.419929
# C 0.391459
# E 0.160142
# F 0.017794
# G 0.010676
# Total asset list:
# D 0.450409
# C 0.412016
# E 0.110203
# F 0.020263
# G 0.007110
# We do the followings:
# 1) Create final asset list
# 2) Create Non-intrusive recommendations
# 3) Create a third party costing object
cost_testing = renewables_cost.merge(
insulation_potential, how="inner", on="UPRN"
)
cost_testing["cwi_cost_per_m2"] = cost_testing["Insulation - Cavity Wall - Total"] / cost_testing["Wall Area [m^2]"]
# Their cavity wall insulation is £8 per m^2
cost_testing["ewi_cost_per_m2"] = cost_testing["Insulation - External Wall - Total"] / cost_testing[
"Wall Area [m^2]"]
final_asset_list = asset_list.rename(
columns={"UPRN": "uprn", "address1": "address", "floor_area": "insulation_floor_area"}
)[["uprn", "address", "postcode", "insulation_wall_area", "insulation_floor_area", "number_of_floors"]]
# Create non-invasive recommendations, which come from the solar potential and ASHP potential data sources
non_invasive_recommendations = []
for _, row in final_asset_list.iterrows():
property_ashp_potential = ashp_potential[
(ashp_potential["UPRN"] == row["uprn"]) & ashp_potential["Overall Suitability Rating"]
]
property_pv_potential = pv_potential[
(pv_potential["UPRN"] == row["uprn"]) & pv_potential["Overall Suitability"]
]
property_costs = renewables_cost[renewables_cost["UPRN"] == row["uprn"]]
property_non_invasive_recs = []
if not property_ashp_potential.empty:
property_non_invasive_recs.append(
{
"type": "air_source_heat_pump",
"size": property_ashp_potential["Recommended Heat Pump Size [kW]"].values[0],
"cost": property_costs["Air Source Heat Pump - Total"].values[0],
"ashp_only_heating_recommendation": True
}
)
if not property_pv_potential.empty:
property_non_invasive_recs.append(
{
"type": "solar_pv",
"array_wattage": property_pv_potential["Recommended Array Size [kW]"].values[0] * 1000,
"initial_ac_kwh_per_year": property_pv_potential["Annual Generation [kWh]"].values[0],
"panneled_roof_area": property_pv_potential["Roof area suitable for PV [m^2]"].values[0],
"cost": property_costs["Rooftop PV - Total"].values[0],
}
)
non_invasive_recommendations.append(
{
"uprn": row["uprn"],
"recommendations": property_non_invasive_recs,
}
)
# Save the asset list
# Store the asset list in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/pilot.csv"
save_csv_to_s3(
dataframe=final_asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# Store non-invasive recommendations in S3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
# Create two scenarios
# Scenario A
body1 = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"scenario_name": "Fabric - no solid wall",
"multi_plan": True,
"exclusions": ["internal_wall_insulation", "external_wall_insulation", "floor_insulation"],
"budget": None,
}
print(body1)
# Scenario B - deep fabric, no exclusions
body2 = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"scenario_name": "Deep Fabric",
"multi_plan": True,
"budget": None,
}
print(body2)

View file

@ -0,0 +1,60 @@
# We use some sample properties from Newhaven to use as a testing dataset for implementing the model fixes
import inspect
import pandas as pd
from etl.epc.settings import EARLIEST_EPC_DATE
from pathlib import Path
from utils.s3 import save_csv_to_s3
src_file_path = inspect.getfile(lambda: None)
EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates"
USER_ID = 8
PORTFOLIO_ID = -1
def app():
"""
This application is tasked with pulling a large quantity of data from the find my epc website, containing the
estimated energy consumption for properties
:return:
"""
lewes_directory = EPC_DIRECTORY / "domestic-E07000063-Lewes/certificates.csv"
data = pd.read_csv(lewes_directory, low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold
data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE]
data = data[~pd.isnull(data["uprn"])]
data = data[data["current-energy-efficiency"].astype(float) < 52]
data = data.sample(10)
# Create an asset list
asset_list = data[["uprn", "address1", "postcode"]].copy().rename(columns={"address1": "address"})
asset_list["uprn"] = asset_list["uprn"].astype(str)
filename = f"{USER_ID}/{PORTFOLIO_ID}/pilot.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "B",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"budget": None,
}
print(body)

View file

@ -1014,7 +1014,7 @@ class Costs:
"labour_days": labour_days
}
def solar_pv(self, wattage: float, has_battery: bool = False):
def solar_pv(self, wattage: float, has_battery: bool = False, array_cost=None):
"""
Calculates the total cost for solar PV based data provided by the MCS dashboard, which contains
@ -1028,13 +1028,17 @@ class Costs:
https://www.checkatrade.com/blog/cost-guides/cost-of-solar-panel-installation/
:param wattage: Peak wattage of the solar PV system]
:param has_battery: Bool, whether the system includes a battery
:param array_cost: float, containing the cost of the solar PV array
"""
# Get the cost data relevant to the region
regional_cost = MCS_SOLAR_PV_COST_DATA["-".join(["average_cost_per_kwh", self.region])]
kw = wattage / 1000
total_cost = kw * regional_cost
if array_cost is not None:
total_cost = array_cost
else:
kw = wattage / 1000
total_cost = kw * regional_cost
if has_battery:
# The battery cost is based on the £3500 quote, recieved from installers

View file

@ -57,13 +57,20 @@ class HeatingRecommender:
# in the Costs class, stored as SYSTEM_FLUSH_COST
exclusions = [] if exclusions is None else exclusions
non_invasive_ashp_recommendation = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "air_source_heat_pump"), {}
)
# This option will prevent other heating recommendations from being specified, other than an ASHP
ashp_only_heating_recommendation = non_invasive_ashp_recommendation.get(
"ashp_only_heating_recommendation", False
)
self.heating_recommendations = []
self.heating_control_recommendations = []
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
if self.is_high_heat_retention_valid():
if self.is_high_heat_retention_valid() and not ashp_only_heating_recommendation:
# Recommend high heat retention storage heaters
# TODO: We need to allow for the possibility that the property aleady has storage heaters, but just
# needs the controls
@ -91,13 +98,13 @@ class HeatingRecommender:
self.property.data["mains-gas-flag"]
)
if (
if ((
has_boiler or
no_heating_has_mains or
electic_heating_has_mains or
has_gas_heaters or
portable_heaters_has_mains
):
) and not ashp_only_heating_recommendation):
# This indicates that the home previously did not have a boiler in place and so would require
# an overhaul to the system - right now, this is all reasons, apart from if there is an existing boiler
system_change = not has_boiler
@ -118,7 +125,9 @@ class HeatingRecommender:
if self.property.is_ashp_valid(exclusions=exclusions):
self.recommend_air_source_heat_pump(
phase=phase, has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations
phase=phase,
has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations,
)
return
@ -194,14 +203,21 @@ class HeatingRecommender:
:return:
"""
# Look for a non-intrusive recommendation
non_intrusive_recommendation = next((
r for r in self.property.non_invasive_recommendations if r["type"] == "air_source_heat_pump"
), {})
controls_recommender = HeatingControlRecommender(self.property)
controls_recommender.recommend(heating_description="Air source heat pump, radiators, electric")
ashp_costs = self.costs.air_source_heat_pump()
# We add the costs of the heating controls, onto each key in the costs dictionary
if controls_recommender.recommendation:
for key in ashp_costs:
ashp_costs[key] += controls_recommender.recommendation[0][key]
if non_intrusive_recommendation:
# Update with non-intrusive recommendation
if non_intrusive_recommendation.get("cost"):
ashp_costs.update(
{"total": non_intrusive_recommendation["cost"], "subtotal": None, "vat": None}
)
already_installed = "air_source_heat_pump" in self.property.already_installed
@ -213,6 +229,14 @@ class HeatingRecommender:
if already_installed:
ashp_costs = override_costs(ashp_costs)
if non_intrusive_recommendation and not all([x is None for x in controls_recommendations]):
# We just use the ttzc control
controls_recommendations = [
x for x in controls_recommendations if (
x["description_simulation"]["mainheatcont-description"] == "Time and temperature zone control"
)
]
# This is a map from the heating controls description to the description of the air source heat pump set up
ashp_descriptions = {
"Time and temperature zone control": (
@ -233,7 +257,8 @@ class HeatingRecommender:
if controls_rec:
for key in ashp_costs_with_controls:
ashp_costs_with_controls[key] += controls_rec[key]
if ashp_costs_with_controls[key] is not None:
ashp_costs_with_controls[key] += controls_rec[key]
if controls_rec is None:
description = "Install an air source heat pump."
@ -245,19 +270,19 @@ class HeatingRecommender:
# If the property does not have existing cavity and loft insulation, we include a note that the cost
# includes the boiler upgrade scheme and that the cavity and loft need to be treated, to ensure access
# to the funding
if has_cavity_or_loft_recommendations:
description = description + (
f" The cost includes the £"
f"{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant. "
f"You must ensure that the property has an insulated cavity and "
f"270mm+ loft insulation to qualify for the grant"
)
else:
description = description + (
f" The cost includes the £{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant"
)
if not non_intrusive_recommendation:
if has_cavity_or_loft_recommendations:
description = description + (
f" The cost includes the £"
f"{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant. "
f"You must ensure that the property has an insulated cavity and "
f"270mm+ loft insulation to qualify for the grant"
)
else:
description = description + (
f" The cost includes the £{BOILER_UPGRADE_SCHEME_ASHP_VALUE} boiler upgrade scheme grant"
)
print("TEMP UPDATED FOR 77 Perryn!!!!!")
simulation_config = {
"mainheat_energy_eff_ending": "Good",
"hot_water_energy_eff_ending": "Good"

View file

@ -66,7 +66,7 @@ class Recommendations:
# Building Fabric
if "wall_insulation" not in self.exclusions:
self.wall_recomender.recommend(phase=phase)
self.wall_recomender.recommend(phase=phase, exclusions=self.exclusions)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1

View file

@ -1,6 +1,8 @@
import numpy as np
import pandas as pd
from recommendations.Costs import Costs
from recommendations.recommendation_utils import override_costs
from recommendations.recommendation_utils import override_costs, esimtate_pitched_roof_area
class SolarPvRecommendations:
@ -150,17 +152,37 @@ class SolarPvRecommendations:
self.recommend_building_analysis(phase)
return
panel_performance = self.property.solar_panel_configuration["panel_performance"]
roof_area = self.property.roof_area
non_invasive_recommendation = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "solar_pv"), {}
)
solar_configurations = panel_performance.head(3).reset_index(drop=True)
if non_invasive_recommendation:
roof_area = esimtate_pitched_roof_area(
floor_area=self.property.insulation_floor_area, floor_height=self.property.data["floor-height"]
)
solar_configurations = pd.DataFrame(
[
{
"array_wattage": non_invasive_recommendation["array_wattage"],
"initial_ac_kwh_per_year": non_invasive_recommendation["initial_ac_kwh_per_year"],
"panneled_roof_area": non_invasive_recommendation["panneled_roof_area"]
}
]
)
else:
panel_performance = self.property.solar_panel_configuration["panel_performance"]
roof_area = self.property.roof_area
solar_configurations = panel_performance.head(3).reset_index(drop=True)
# We combine each of these configurations with estimates with and without a battery
for rank, recommendation_config in solar_configurations.iterrows():
roof_coverage_percent = round(recommendation_config["panneled_roof_area"] / roof_area * 100)
for has_battery in [False, True]:
cost_result = self.costs.solar_pv(
wattage=recommendation_config["array_wattage"], has_battery=has_battery
wattage=recommendation_config["array_wattage"],
has_battery=has_battery,
array_cost=non_invasive_recommendation["cost"] if non_invasive_recommendation else None
)
kw = np.floor(recommendation_config["array_wattage"] / 100) / 10
if has_battery:

View file

@ -184,7 +184,7 @@ class WallRecommendations(Definitions):
return ewi_recommendations
def recommend(self, phase=0):
def recommend(self, phase=0, exclusions=None):
# if building built after 1990 + we're able to identify U-value +
# U-value less than 0.18 and if in or close to a conversation area,
# recommend internal wall insulation as a possible measure
@ -262,7 +262,7 @@ class WallRecommendations(Definitions):
# Remaining wall types are treated with IWI or EWI
if (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) and self.is_suitable_for_solid_insulation():
self.find_insulation(u_value, phase)
self.find_insulation(u_value, phase, exclusions=exclusions)
return
# If the u-value is within regulations, we don't do anything
@ -552,7 +552,7 @@ class WallRecommendations(Definitions):
return recommendations
def find_insulation(self, u_value, phase):
def find_insulation(self, u_value, phase, exclusions=None):
"""
This function contains the logic for finding potential insulation measures for a property, depending
on the parts available and whether the property can have external wall insulation installed
@ -564,8 +564,10 @@ class WallRecommendations(Definitions):
# we separate the logic for for recommending them, therefore we don't
# consider diminishing returns between the two as they are considered to be separate measures
exclusions = [] if exclusions is None else exclusions
ewi_recommendations = []
if self.ewi_valid():
if self.ewi_valid() and "external_wall_insulation" not in exclusions:
ewi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(
@ -575,12 +577,14 @@ class WallRecommendations(Definitions):
phase=phase,
)
iwi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),
non_insulation_materials=self.internal_wall_non_insulation_materials,
phase=phase,
)
iwi_recommendations = []
if "internal_wall_insulation" not in exclusions:
iwi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),
non_insulation_materials=self.internal_wall_non_insulation_materials,
phase=phase,
)
self.recommendations += ewi_recommendations + iwi_recommendations