working on heating control recommendation for urban splash

This commit is contained in:
Khalim Conn-Kowlessar 2024-02-19 13:02:43 +00:00
parent 81e1ca65d0
commit 1c44b07662
12 changed files with 451 additions and 129 deletions

View file

@ -1,6 +1,6 @@
from datetime import datetime
import numpy as np
from tqdm import tqdm
import pandas as pd
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
@ -37,7 +37,6 @@ from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from backend.ml_models.Valuation import PropertyValuation
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
logger = setup_logger()
@ -66,7 +65,7 @@ async def trigger_plan(body: PlanTriggerRequest):
)
input_properties = []
for config in plan_input:
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
epc_searcher = SearchEpc(
@ -142,8 +141,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# Property recommendations
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
# TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires
# decanting the tenant
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations, property_representative_recommendations = recommender.recommend()
@ -201,6 +198,8 @@ async def trigger_plan(body: PlanTriggerRequest):
expected_adjusted_energy=expected_adjusted_energy
)
# TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires
# decanting the tenant
input_measures = prepare_input_measures(recommendations_with_impact, body.goal)
if body.budget:

View file

@ -0,0 +1,153 @@
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from utils.s3 import read_excel_from_s3
from backend.SearchEpc import SearchEpc
from epc_api.client import EpcClient
from utils.s3 import save_csv_to_s3
# Read in the .env file in backend
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
USER_ID = 8
PORTFOLIO_ID = 65
def app():
"""
This application will read in the Urban Splash data, in the dev AWS account, and pre-process it. There are a
few issues with the file, including incorrect postcodes.
The customer is interested in the following:
- Getting properties to an EPC C
- Doing do within a budget of £5,000
:return:
"""
potential_postcodes = ["BD9 5BQ", "BD9 5BR", "BD9 5BN"]
raw_asset_list = read_excel_from_s3(
bucket_name="retrofit-datalake-dev",
file_key="customers/urban_splash/raw_asset_list/USRF - Velvet Mill EPC.xlsx",
header_row=2
)
# We have a series of apartment numbers that are "Apartment 001", "Apartment 002", etc. We need to convert these
# to "Apartment 1", "Apartment 2", etc.
raw_asset_list["address1"] = raw_asset_list["Unit Number"].str.replace(
"Apartment 00", "Apartment ", regex=True
)
raw_asset_list["address1"] = raw_asset_list["address1"].str.replace(
"Apartment 0", "Apartment ", regex=True
)
# For each entry in the asset list, we make an api call to the EPC database to get the EPC data. We'll retrieve the
# uprn for the property, as well as a nice address and postcode that we can use. We'll also try and deduce the
# likely wall construction, since many of the homes are new builds, based on their newest EPC
epc_data = []
processed_asset_list = []
for _, row in tqdm(raw_asset_list.iterrows(), total=len(raw_asset_list)):
newest_epc = None
idx = 0
while newest_epc is None:
postcode = potential_postcodes[idx]
searcher = SearchEpc(
address1=row.address1, postcode=postcode, auth_token=EPC_AUTH_TOKEN, os_api_key=""
)
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
if idx == len(potential_postcodes) - 1:
break
idx += 1
else:
newest_epc = searcher.newest_epc
if newest_epc is None:
raise Exception("FX ME")
to_append = {
**row.to_dict(),
"uprn": newest_epc["uprn"],
"address": newest_epc["address1"],
"postcode": newest_epc["postcode"],
"walls-description": newest_epc["walls-description"],
"roof-description": newest_epc["roof-description"],
"floor-description": newest_epc["floor-description"],
"total-floor-area": newest_epc["total-floor-area"],
"full-address": newest_epc["address"]
}
processed_asset_list.append(to_append)
epc_data.append(newest_epc)
processed_asset_list_df = pd.DataFrame(processed_asset_list)
epc_data_df = pd.DataFrame(epc_data)
# We store this data
# Store the data in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/test_inputs.csv"
save_csv_to_s3(
dataframe=processed_asset_list_df,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename,
"budget": 5000,
}
print(body)
# Some basic analysis on the heating, heating controls and hot water systems
# All of the heating systems are rated very poor, poor or average. When it's average, they are all also
# "Room heaters, electric", but the house has "Programmer and appliance thermostats" for the heating controls.
# which is more efficient
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
# Heating
print(epc_data_df[["mainheat-description", "mainheatcont-description", "mainheat-energy-eff"]].drop_duplicates())
# mainheat-description mainheatcont-description mainheat-energy-eff
# 0 Room heaters, electric Programmer and room thermostat Very Poor
# 12 Room heaters, electric Programmer and appliance thermostats Average
# 20 Electric storage heaters, radiators Celect-type controls Poor
# Hot water
print(epc_data_df[["hotwater-description", "hot-water-energy-eff"]].drop_duplicates())
# hotwater-description hot-water-energy-eff
# 0 Electric immersion, standard tariff Very Poor
# 12 Electric immersion, off-peak Average
# We now retrieve EPCS for all of the properties that are in these postcodes very obviously for the velvet mill
# We'll use this information to get a sense of the likely wall/roof/floor construction for the properties
# client = EpcClient(auth_token=EPC_AUTH_TOKEN)
#
# neighbouring_epcs = []
# for pc in potential_postcodes:
# response = client.domestic.search(params={"postcode": pc}, size=1000)
# data = response["rows"]
#
# # keep just rows that are clearly for the velvet mill
# data = [x for x in data if "velvet" in x["address1"].lower()]
#
# neighbouring_epcs.extend(data)
#
# neighbouring_epcs_df = pd.DataFrame(neighbouring_epcs)
# neighbouring_epcs_df["walls-description"].value_counts()
# neighbouring_epcs_df["roof-description"].value_counts()
# neighbouring_epcs_df["floor-description"].value_counts()

View file

@ -18,43 +18,40 @@ from recommendations.recommendation_utils import calculate_cavity_age
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
DATA_FOLDER = Path(__file__).parent / "local_data" / "ha_data"
logger = setup_logger()
load_dotenv(ENV_FILE)
class DataLoader:
MIN_ROWS = {
"ha_1": 2,
"ha_6": 2,
"ha_14": 3, # The spreadsheet starts from the third row
"ha_39": 2,
"ha_107": 2,
}
COLUMN_CONFIG = {
"ha_1": {
"HA1": {
"address": "Address",
"postcode": "Address - Postcode"
},
"HA6": {
"address": "propertyaddress",
"postcode": "address" # The 'address' column actually contains postcode
}
}
def __init__(self, files, use_cache):
self.files = files
def __init__(self, directories, use_cache):
self.directories = directories
self.use_cache = use_cache
self.data = {}
def create_asset_list_matching_address(self, ha_name, asset_list):
if ha_name in ["ha_1", "ha_6"]:
if ha_name in ["HA1", "HA6"]:
asset_list["matching_address"] = asset_list[
self.COLUMN_CONFIG[ha_name]["address"]
].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list[
self.COLUMN_CONFIG[ha_name]["postcode"]
].str.lower().str.strip()
elif ha_name == "ha_14":
elif ha_name == "HA14":
# Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode
asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \
asset_list["Address 2"].str.lower().str.strip() + ", " + \
@ -62,7 +59,7 @@ class DataLoader:
asset_list["Address 4"].str.lower().str.strip() + ", " + \
asset_list["Postcode"].str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip()
elif ha_name == "ha_39":
elif ha_name == "HA39":
# Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code
asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \
asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \
@ -71,7 +68,7 @@ class DataLoader:
asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \
asset_list["post_code"].astype(str).str.lower().str.strip()
asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip()
elif ha_name == "ha_107":
elif ha_name == "HA107":
# Create matching_address by concatenating House No, Street, Town, District, Postcode
asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \
asset_list["Street"].str.lower().str.strip() + ", " + \
@ -87,7 +84,7 @@ class DataLoader:
def append_asset_list_built_form(self, ha_name, asset_list):
# Finally, we process property_type or built form, where needed
if ha_name == "ha_6":
if ha_name == "HA6":
asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6)
return asset_list
@ -99,7 +96,7 @@ class DataLoader:
:return:
"""
if ha_name in ["ha_107"]:
if ha_name in ["HA107"]:
asset_list["HouseNo"] = asset_list["House No"].copy()
else:
split_addresses = asset_list['matching_address'].str.split(',', expand=True)
@ -113,32 +110,41 @@ class DataLoader:
return asset_list
def load_asset_list(self, file_path, ha_name, sheet_name=None):
workbook = openpyxl.load_workbook(file_path)
if sheet_name is not None:
sheet = workbook[sheet_name]
@staticmethod
def create_ciga_list_house_no(ha_name, ciga_list):
"""
This function will append the House number onto the asset list
:return:
"""
if ha_name in ["HA6"]:
split_addresses = ciga_list['Matched Address'].str.split(',', expand=True)
house_numbers = split_addresses[0].str.split(' ', expand=True)
# THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how
# many columns there might be
house_numbers = house_numbers.iloc[:, 0:1]
house_numbers.columns = ['HouseNo']
ciga_list = pd.concat([ciga_list, house_numbers[["HouseNo"]]], axis=1)
else:
sheet = workbook.active
sheet_colnames = [cell.value for cell in sheet[self.MIN_ROWS[ha_name] - 1]]
raise NotImplementedError("Implement me")
return ciga_list
def load_asset_list(self, filepath, ha_name):
workbook = openpyxl.load_workbook(filepath)
asset_sheet = workbook["Assets"]
asset_sheet_colnames = [cell.value for cell in asset_sheet[1]]
rows_data = []
rows_colors = []
for row in tqdm(
sheet.iter_rows(min_row=self.MIN_ROWS[ha_name], values_only=False)
): # Assuming the first row is headers
for row in asset_sheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None
# row_color = COLOR_INDEX[row_color]
rows_data.append(row_data)
rows_colors.append(row_color)
asset_list = pd.DataFrame(rows_data, columns=sheet_colnames)
asset_list = pd.DataFrame(rows_data, columns=asset_sheet_colnames)
asset_list = asset_list.loc[:, asset_list.columns.notnull()]
asset_list['row_color'] = rows_colors
# Remove entirely empty roww - consider all rows apart from row_color
# Remove entirely empty rows - consider all rows apart from row_color
asset_list = asset_list.loc[asset_list.loc[:, asset_list.columns != 'row_color'].notnull().any(axis=1)]
# Add in asset_list_row_id
@ -151,77 +157,43 @@ class DataLoader:
asset_list = self.append_asset_list_built_form(ha_name=ha_name, asset_list=asset_list)
return asset_list
# We check if there is a survey list
survey_list = pd.DataFrame()
if "ECO Surveys" in workbook.sheetnames:
survey_sheet = workbook["ECO Surveys"]
survey_rows = []
for row in survey_sheet.iter_rows(min_row=2, values_only=False): # Assuming the first row is headers
row_data = [cell.value for cell in row] # This will get you the cell values
survey_rows.append(row_data)
def load_survey_list(self, file_path, ha_name, asset_list, sheet_name=None):
survey_workbook = openpyxl.load_workbook(file_path)
if sheet_name is not None:
survey_sheet = survey_workbook[sheet_name]
else:
survey_sheet = survey_workbook.active
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
# Remove columns that are None
survey_list = survey_list.loc[:, survey_list.columns.notnull()]
survey_list["survey_list_row_id"] = [ha_name + "_survey_" + str(i) for i in range(0, len(survey_list))]
# Perform survey list merge
survey_list = self.merge_surveys_to_assets(asset_list, survey_list, ha_name)
survey_rows = []
survey_colors = []
# We check if there are CIGA checks
ciga_list = pd.DataFrame()
if "CIGA Checks" in workbook.sheetnames:
ciga_sheet = workbook["CIGA Checks"]
ciga_rows = []
for row in ciga_sheet.iter_rows(min_row=2, values_only=False):
row_data = [cell.value for cell in row] # This will get you the cell values
ciga_rows.append(row_data)
for row in tqdm(survey_sheet.iter_rows(min_row=2, values_only=False)): # Assuming the first row is headers
row_data = [cell.value for cell in row] # This will get you the cell values
row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None
survey_rows.append(row_data)
survey_colors.append(row_color)
ciga_list = pd.DataFrame(ciga_rows, columns=[cell.value for cell in ciga_sheet[1]])
# Remove columns that are None
ciga_list = ciga_list.loc[:, ciga_list.columns.notnull()]
ciga_list = self.create_ciga_list_house_no(ha_name, ciga_list)
# Perform ciga list merge
ciga_list = self.merge_ciga_to_assets(asset_list, ciga_list, ha_name)
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
# Remove columns that are None
survey_list = survey_list.loc[:, survey_list.columns.notnull()]
survey_list["row_colour"] = survey_colors
# The survey list has 4 possible colours:
# PURPLE - Installer advised install complete and a complimentary post works EPC has been completed.
# GREEN - Installer advised install complete.
# RED - Cancelled
# BLUE - Loft Only Installed
# NO FILL - No official update from installer (could be installed or cancelled)
survey_list["row_colour_name"] = np.where(
survey_list["row_colour"] == survey_list_colours["red"], "red",
np.where(survey_list["row_colour"] == survey_list_colours["green"], "green",
np.where(survey_list["row_colour"] == survey_list_colours["purple"], "purple",
np.where(survey_list["row_colour"] == survey_list_colours["blue"], "blue", "no fill")))
)
survey_list["row_meaning"] = np.where(
survey_list["row_colour_name"] == "red", "Cancelled",
np.where(
survey_list["row_colour_name"] == "green",
"Installer advised install complete",
np.where(
survey_list["row_colour_name"] == "purple",
"Installer advised install complete and a complimentary post works EPC has been completed",
np.where(
survey_list["row_colour_name"] == "blue",
"Loft Only Installed",
"No official update from installer (could be installed or cancelled)"
)
)
)
)
# Add in asset_list_row_id
survey_list["survey_list_row_id"] = [ha_name + "_surveys_" + str(i) for i in range(0, len(survey_list))]
# We now do the matching between the asset list and the survey list.
# What we'll get from this is a lookup table from the asset list to the survey list
if ha_name == "ha_6":
matched_lookup = self.merge_ha_6(asset_list, survey_list)
else:
raise NotImplementedError("Only HA 6 has surveys")
return survey_list, matched_lookup
return asset_list, survey_list, ciga_list
@staticmethod
def merge_ha_6(asset_list, survey_list):
def correct_ha6_asset_list(asset_list):
# Correct the asset list across propertyaddress and matching_address
asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Baggott Place", "Baggotts Place")
asset_list["matching_address"] = asset_list["matching_address"].str.replace("baggott place", "baggotts place")
@ -234,6 +206,11 @@ class DataLoader:
asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Moffat Way", "Moffatt Way")
asset_list["matching_address"] = asset_list["matching_address"].str.replace("moffat way", "moffatt way")
return asset_list
@staticmethod
def correct_ha6_survey_list(survey_list):
# Correct the survey list
survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace(
"Seabridge Road", "Seabridge Lane"
@ -358,10 +335,23 @@ class DataLoader:
"Post Code"
] = "ST5 7BY"
missed_postcodes = [
postcode.lower() for postcode in survey_list["Post Code"] if
postcode.lower() not in asset_list["matching_postcode"].values
]
return survey_list
def merge_surveys_to_assets(self, asset_list, survey_list, ha_name):
# Correct the asset list
asset_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_asset_list")
asset_list = asset_list_correction_function(asset_list)
# Correct the survey list
survey_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_survey_list")
survey_list = survey_list_correction_function(survey_list)
missed_postcodes = []
if ha_name == "HA6":
missed_postcodes = [
postcode.lower() for postcode in survey_list["Post Code"] if
postcode.lower() not in asset_list["matching_postcode"].values
]
matching_lookup = []
for _, row in tqdm(survey_list.iterrows(), total=len(survey_list)):
@ -405,7 +395,54 @@ class DataLoader:
matching_lookup = pd.DataFrame(matching_lookup)
return matching_lookup
# Merge onto the survey list
survey_list = survey_list.merge(matching_lookup, how='left', on="survey_list_row_id")
return survey_list
def merge_ciga_to_assets(self, asset_list, ciga_list, ha_name):
matching_lookup = []
for _, row in tqdm(ciga_list.iterrows(), total=len(ciga_list)):
house_number = row["HouseNo"]
if isinstance(house_number, str):
house_number = house_number.lower().strip()
# Filter on the postcode
df = asset_list[
asset_list["matching_address"].str.contains(row["Matched Postcode"].lower().strip())
].copy()
df = df[df["HouseNo"] == str(house_number)]
# TODO: Might need to consider street name at some point
if df.shape[0] != 1:
if df.shape[0] != 1:
df = df[df["matching_postcode"].str.lower().str.contains(row["Post Code"].lower())]
if df.shape[0] != 1:
postcode_lower = row["Post Code"].lower()
if postcode_lower in missed_postcodes:
matching_lookup.append(
{
"survey_list_row_id": row["survey_list_row_id"],
"asset_list_row_id": None,
}
)
continue
print(row["Street / Block Name"])
print(house_number)
print(row["Post Code"].lower())
raise ValueError("Investigate")
matching_lookup.append(
{
"survey_list_row_id": row["survey_list_row_id"],
"asset_list_row_id": df["asset_list_row_id"].values[0],
}
)
matching_lookup = pd.DataFrame(matching_lookup)
@staticmethod
def identify_built_form_ha6(property_string):
@ -445,16 +482,17 @@ class DataLoader:
return
data = {}
for ha_name, file_config in self.files.items():
for filepath in self.directories:
ha_name = filepath.split("/")[2]
# Load asset list
logger.info("Loading asset list for {}".format(ha_name))
asset_list = self.load_asset_list(
file_path=file_config["asset_list"]["filepath"],
asset_list, survey_list, ciga_list = self.load_asset_list(
filepath=filepath,
ha_name=ha_name,
sheet_name=file_config["asset_list"]["sheetname"]
)
if file_config.get("survey_list"):
# TODO: Delete this
logger.info("Loading survey list for {}".format(ha_name))
survey_list, matched_lookup = self.load_survey_list(
asset_list=asset_list,
@ -1240,13 +1278,16 @@ def analyse_ha_data(outputs, loader):
def app():
"""
This app contains the housign association analysis for HAs 1, 6, 14, 39 and 107.
This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107.
Only HA 6 has surveys
:return:
"""
use_cache = False
# List all of the data in the folder
directories = [str(list(entry.iterdir())[0]) for entry in DATA_FOLDER.iterdir() if entry.is_dir()]
files = {
"ha_1": {
"asset_list": {
@ -1284,7 +1325,7 @@ def app():
}
}
loader = DataLoader(files, use_cache)
loader = DataLoader(directories, use_cache)
loader.load()
# TODO: We probably need to make sure that we have all of the columns that we need

View file

@ -38,6 +38,7 @@ def app():
for directory in tqdm(epc_directories):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold

View file

@ -40,6 +40,10 @@ MCS_SOLAR_PV_COST_DATA = {
# This is based on quotes from installers
BATTERY_COST = 3500
# This is based on https://www.checkatrade.com/blog/cost-guides/cost-smart-thermostat/
SMART_APPLIANCE_THERMOSTAT_COST = 400
PROGRAMMER_COST = 200
class Costs:
"""
@ -878,3 +882,29 @@ class Costs:
"labour_hours": 72,
"labour_days": 2,
}
def programmer_and_appliance_thermostat(self, has_programmer):
"""
Calculate the total cost of installing a programmer and appliance thermostat
If the property already has a programmer, then the only thing we need to calculate the cost for is the
appliance thermostat
"""
if has_programmer:
labour_hours = 2
total_cost = SMART_APPLIANCE_THERMOSTAT_COST
else:
labour_hours = 4
total_cost = SMART_APPLIANCE_THERMOSTAT_COST + PROGRAMMER_COST
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# We estimate the cost of an appliance thermostat at £400, which is the upper end of the range
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
"labour_days": 1,
}

View file

@ -71,9 +71,7 @@ class FloorRecommendations(Definitions):
def recommend(self, phase=0):
u_value = self.property.floor["thermal_transmittance"]
property_type = self.property.data["property-type"]
floor_area = self.property.insulation_floor_area
year_built = self.property.year_built
@ -90,6 +88,10 @@ class FloorRecommendations(Definitions):
):
return
# If the property is a new build flat, we won't recommend floor upgrades
if len(self.property.full_sap_epc) and (property_type == "Flat"):
return
if u_value:
# By being built more recently than this, it means that the property was likely build with soild
@ -101,16 +103,17 @@ class FloorRecommendations(Definitions):
# The floor is already compliant
return
u_value = get_floor_u_value(
floor_type=self.property.floor_type,
area=floor_area,
perimeter=self.property.perimeter,
age_band=self.property.age_band,
insulation_thickness=self.property.floor["insulation_thickness"],
wall_type=self.property.wall_type
)
if u_value is None:
u_value = get_floor_u_value(
floor_type=self.property.floor_type,
area=floor_area,
perimeter=self.property.perimeter,
age_band=self.property.age_band,
insulation_thickness=self.property.floor["insulation_thickness"],
wall_type=self.property.wall_type
)
self.estimated_u_value = u_value
self.estimated_u_value = u_value
if u_value < self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
return

View file

@ -0,0 +1,57 @@
from recommendations.Costs import Costs
from backend.Property import Property
class HeatingRecommender:
def __init__(self, property_instance: Property):
self.property = property_instance
self.costs = Costs(self.property)
self.recommendations = []
def recommend(self, phase=0):
# This first iteration of the recommender will provide very basic recommendation
if self.property.main_heating == "Room heaters, electric":
self.recommend_room_heaters_electric(phase=phase)
return
def recommend_room_heaters_electric(self, phase):
"""
If the home has Room heaters, electric, we start by identifying potential heating controls that could
be upgraded, that would provide a practical impact. This will be the least invasive improvement.
We can then consider the heating system itself
:return:
"""
if self.property.data["mainheat-energy-eff"] in ["Poor", "Very Poor"]:
# We recommend Programmer and appliance thermostats as the heating control. This has an average energy
# efficiency rating, and is likely to be more efficient than the current heating controls. if the
# rating is poor or very poor, the home may have a Programmer and room thermostat, which is less efficient
# than a Programmer and appliance thermostats, because it allows for much more granular control at not
# just a room level but individual heater/appliance level
# Note: A room thermostat is commonly placed in a hallway, and it measures the temperature of the air
# surrounding it. It then sends a signal to the heating system to turn on or off, depending on the
# temperature. An appliance thermostat, on the other hand, is placed on the heater/appliance itself, and
# measures the temperature of the heater/appliance. This allows for much more granular control, and
# prevents overheating.
# In order to cost, we check if the property already has a programmer, and therefor we will just need to
# add the cost of the appliance thermostats
has_programmer = self.property.main_heating_controls["switch_system"] == "programmer"
self.recommendations.append(
{
"phase": phase,
"parts": [
# TODO
],
"type": "heating_control",
"description": "Upgrade heating controls to Programmer and Appliance or Smart"
"Thermostats for more precise heating control, and prevention of overheating",
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**self.costs.programmer_and_appliance_thermostat(has_programmer=has_programmer)
}
)

View file

@ -11,6 +11,7 @@ from recommendations.FireplaceRecommendations import FireplaceRecommendations
from recommendations.LightingRecommendations import LightingRecommendations
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from recommendations.WindowsRecommendations import WindowsRecommendations
from recommendations.HeatingRecommender import HeatingRecommender
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
@ -42,6 +43,7 @@ class Recommendations:
self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials)
self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials)
self.solar_recommender = SolarPvRecommendations(property_instance=property_instance)
self.heating_recommender = HeatingRecommender(property_instance=property_instance)
def recommend(self):
@ -89,6 +91,12 @@ class Recommendations:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
# Heating controls recommendations
self.heating_recommender.recommend(phase=phase)
if self.heating_recommender.recommendation:
property_recommendations.append(self.heating_recommender.recommendation)
phase += 1
# Fireplace sealing recommendations
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:

View file

@ -121,7 +121,7 @@ class WallRecommendations(Definitions):
u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE
):
# Recommend insulation
self.find_insulation(u_value)
self.find_insulation(u_value, phase)
return
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already

View file

@ -511,6 +511,7 @@ FLOOR_LEVEL_MAP = {
"Ground": 0,
"ground floor": 0,
"mid floor": 1,
"top floor": 5,
"20+": 20,
"21st or above": 21,
**{str(i).zfill(2): i for i in range(0, 21)},

View file

@ -311,6 +311,7 @@ def get_roof_u_value(
return float(u_value)
def estimate_number_of_floors(property_type):
"""
Using the property type, we estimate the number of floors in the property
@ -324,7 +325,7 @@ def estimate_number_of_floors(property_type):
number_of_floors = 2
else:
raise NotImplementedError("Implement me")
return number_of_floors
@ -432,7 +433,6 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati
Rsi = 0.17 # in m²K/W
Rse = 0.04 # in m²K/W
lambda_ins = 0.035 # thermal conductivity of floor insulation in W/m·K
wall_thickness = [x[age_band] for x in default_wall_thickness if x["type"] == wall_type][0]
if wall_thickness is None and wall_type == "park home":
# We don't know enough and likely won't make recommendations

View file

@ -195,3 +195,32 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
return None
return data
def read_excel_from_s3(bucket_name, file_key, header_row):
"""
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param header_row: The row number to use as the header (0-indexed).
:return: A pandas DataFrame containing the data from the Excel file.
"""
# Ensure the file_key is an Excel file
if not file_key.endswith((".xls", ".xlsx")):
raise ValueError("The specified file does not appear to be an Excel file.")
# Use the read_io_from_s3 function to get the data as a BytesIO object
excel_buffer = read_io_from_s3(bucket_name, file_key)
# Read the Excel file into a pandas DataFrame
df = pd.read_excel(excel_buffer, header=header_row)
# Drop columns where all values are NaN
df.dropna(axis=1, how='all', inplace=True)
# Reset index if the first column is just an index or entirely NaN
df.reset_index(drop=True, inplace=True)
return df