diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 07fdbe94..20e72b8d 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -1,6 +1,6 @@ from datetime import datetime -import numpy as np +from tqdm import tqdm import pandas as pd from etl.epc.Record import EPCRecord from backend.SearchEpc import SearchEpc @@ -37,7 +37,6 @@ from recommendations.Recommendations import Recommendations from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet from backend.ml_models.Valuation import PropertyValuation -from backend.ml_models.AnnualBillSavings import AnnualBillSavings logger = setup_logger() @@ -66,7 +65,7 @@ async def trigger_plan(body: PlanTriggerRequest): ) input_properties = [] - for config in plan_input: + for config in tqdm(plan_input): # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly epc_searcher = SearchEpc( @@ -142,8 +141,6 @@ async def trigger_plan(body: PlanTriggerRequest): # Property recommendations p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds) - # TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires - # decanting the tenant recommender = Recommendations(property_instance=p, materials=materials) property_recommendations, property_representative_recommendations = recommender.recommend() @@ -201,6 +198,8 @@ async def trigger_plan(body: PlanTriggerRequest): expected_adjusted_energy=expected_adjusted_energy ) + # TODO: For the private customer, we should probably NOT allow floor insulation, because it often requires + # decanting the tenant input_measures = prepare_input_measures(recommendations_with_impact, body.goal) if body.budget: diff --git a/etl/customers/urban_splash.py b/etl/customers/urban_splash.py new file mode 100644 index 00000000..6c371879 --- /dev/null +++ b/etl/customers/urban_splash.py @@ -0,0 +1,153 @@ +import os + +import pandas as pd +from tqdm import tqdm + +from dotenv import load_dotenv +from utils.s3 import read_excel_from_s3 +from backend.SearchEpc import SearchEpc +from epc_api.client import EpcClient +from utils.s3 import save_csv_to_s3 + +# Read in the .env file in backend +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + +USER_ID = 8 +PORTFOLIO_ID = 65 + + +def app(): + """ + This application will read in the Urban Splash data, in the dev AWS account, and pre-process it. There are a + few issues with the file, including incorrect postcodes. + + The customer is interested in the following: + - Getting properties to an EPC C + - Doing do within a budget of £5,000 + :return: + """ + + potential_postcodes = ["BD9 5BQ", "BD9 5BR", "BD9 5BN"] + + raw_asset_list = read_excel_from_s3( + bucket_name="retrofit-datalake-dev", + file_key="customers/urban_splash/raw_asset_list/USRF - Velvet Mill EPC.xlsx", + header_row=2 + ) + + # We have a series of apartment numbers that are "Apartment 001", "Apartment 002", etc. We need to convert these + # to "Apartment 1", "Apartment 2", etc. + raw_asset_list["address1"] = raw_asset_list["Unit Number"].str.replace( + "Apartment 00", "Apartment ", regex=True + ) + raw_asset_list["address1"] = raw_asset_list["address1"].str.replace( + "Apartment 0", "Apartment ", regex=True + ) + + # For each entry in the asset list, we make an api call to the EPC database to get the EPC data. We'll retrieve the + # uprn for the property, as well as a nice address and postcode that we can use. We'll also try and deduce the + # likely wall construction, since many of the homes are new builds, based on their newest EPC + + epc_data = [] + processed_asset_list = [] + for _, row in tqdm(raw_asset_list.iterrows(), total=len(raw_asset_list)): + + newest_epc = None + idx = 0 + + while newest_epc is None: + postcode = potential_postcodes[idx] + searcher = SearchEpc( + address1=row.address1, postcode=postcode, auth_token=EPC_AUTH_TOKEN, os_api_key="" + ) + searcher.find_property(skip_os=True) + + if searcher.newest_epc is None: + if idx == len(potential_postcodes) - 1: + break + idx += 1 + else: + newest_epc = searcher.newest_epc + + if newest_epc is None: + raise Exception("FX ME") + + to_append = { + **row.to_dict(), + "uprn": newest_epc["uprn"], + "address": newest_epc["address1"], + "postcode": newest_epc["postcode"], + "walls-description": newest_epc["walls-description"], + "roof-description": newest_epc["roof-description"], + "floor-description": newest_epc["floor-description"], + "total-floor-area": newest_epc["total-floor-area"], + "full-address": newest_epc["address"] + } + + processed_asset_list.append(to_append) + epc_data.append(newest_epc) + + processed_asset_list_df = pd.DataFrame(processed_asset_list) + epc_data_df = pd.DataFrame(epc_data) + + # We store this data + # Store the data in s3 + filename = f"{USER_ID}/{PORTFOLIO_ID}/test_inputs.csv" + save_csv_to_s3( + dataframe=processed_asset_list_df, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Private", + "goal": "Increase EPC", + "goal_value": "C", + "trigger_file_path": filename, + "budget": 5000, + } + print(body) + + # Some basic analysis on the heating, heating controls and hot water systems + + # All of the heating systems are rated very poor, poor or average. When it's average, they are all also + # "Room heaters, electric", but the house has "Programmer and appliance thermostats" for the heating controls. + # which is more efficient + pd.set_option('display.max_rows', 500) + pd.set_option('display.max_columns', 500) + pd.set_option('display.width', 1000) + + # Heating + print(epc_data_df[["mainheat-description", "mainheatcont-description", "mainheat-energy-eff"]].drop_duplicates()) + # mainheat-description mainheatcont-description mainheat-energy-eff + # 0 Room heaters, electric Programmer and room thermostat Very Poor + # 12 Room heaters, electric Programmer and appliance thermostats Average + # 20 Electric storage heaters, radiators Celect-type controls Poor + + # Hot water + print(epc_data_df[["hotwater-description", "hot-water-energy-eff"]].drop_duplicates()) + # hotwater-description hot-water-energy-eff + # 0 Electric immersion, standard tariff Very Poor + # 12 Electric immersion, off-peak Average + + # We now retrieve EPCS for all of the properties that are in these postcodes very obviously for the velvet mill + # We'll use this information to get a sense of the likely wall/roof/floor construction for the properties + + # client = EpcClient(auth_token=EPC_AUTH_TOKEN) + # + # neighbouring_epcs = [] + # for pc in potential_postcodes: + # response = client.domestic.search(params={"postcode": pc}, size=1000) + # data = response["rows"] + # + # # keep just rows that are clearly for the velvet mill + # data = [x for x in data if "velvet" in x["address1"].lower()] + # + # neighbouring_epcs.extend(data) + # + # neighbouring_epcs_df = pd.DataFrame(neighbouring_epcs) + # neighbouring_epcs_df["walls-description"].value_counts() + # neighbouring_epcs_df["roof-description"].value_counts() + # neighbouring_epcs_df["floor-description"].value_counts() diff --git a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py index 5ed7d6f2..92956337 100644 --- a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py +++ b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py @@ -18,43 +18,40 @@ from recommendations.recommendation_utils import calculate_cavity_age EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env" +DATA_FOLDER = Path(__file__).parent / "local_data" / "ha_data" logger = setup_logger() load_dotenv(ENV_FILE) class DataLoader: - MIN_ROWS = { - "ha_1": 2, - "ha_6": 2, - "ha_14": 3, # The spreadsheet starts from the third row - "ha_39": 2, - "ha_107": 2, - } - COLUMN_CONFIG = { - "ha_1": { + "HA1": { "address": "Address", "postcode": "Address - Postcode" + }, + "HA6": { + "address": "propertyaddress", + "postcode": "address" # The 'address' column actually contains postcode } } - def __init__(self, files, use_cache): - self.files = files + def __init__(self, directories, use_cache): + self.directories = directories self.use_cache = use_cache self.data = {} def create_asset_list_matching_address(self, ha_name, asset_list): - if ha_name in ["ha_1", "ha_6"]: + if ha_name in ["HA1", "HA6"]: asset_list["matching_address"] = asset_list[ self.COLUMN_CONFIG[ha_name]["address"] ].str.lower().str.strip() asset_list["matching_postcode"] = asset_list[ self.COLUMN_CONFIG[ha_name]["postcode"] ].str.lower().str.strip() - elif ha_name == "ha_14": + elif ha_name == "HA14": # Create matching_address by concatenating Address 1, Address 2, Address 3, Address 4, Postcode asset_list["matching_address"] = asset_list["Address 1"].str.lower().str.strip() + ", " + \ asset_list["Address 2"].str.lower().str.strip() + ", " + \ @@ -62,7 +59,7 @@ class DataLoader: asset_list["Address 4"].str.lower().str.strip() + ", " + \ asset_list["Postcode"].str.lower().str.strip() asset_list["matching_postcode"] = asset_list["Postcode"].str.lower().str.strip() - elif ha_name == "ha_39": + elif ha_name == "HA39": # Create matching_address by concatenating add_1, add_2, add_3, add_4, add_5, post_code asset_list["matching_address"] = asset_list["add_1"].astype(str).str.lower().str.strip() + ", " + \ asset_list["add_2"].astype(str).str.lower().str.strip() + ", " + \ @@ -71,7 +68,7 @@ class DataLoader: asset_list["add_5"].astype(str).str.lower().str.strip() + ", " + \ asset_list["post_code"].astype(str).str.lower().str.strip() asset_list["matching_postcode"] = asset_list["post_code"].str.lower().str.strip() - elif ha_name == "ha_107": + elif ha_name == "HA107": # Create matching_address by concatenating House No, Street, Town, District, Postcode asset_list["matching_address"] = asset_list["House No"].astype(str).str.lower().str.strip() + ", " + \ asset_list["Street"].str.lower().str.strip() + ", " + \ @@ -87,7 +84,7 @@ class DataLoader: def append_asset_list_built_form(self, ha_name, asset_list): # Finally, we process property_type or built form, where needed - if ha_name == "ha_6": + if ha_name == "HA6": asset_list["built_form"] = asset_list["Property Type"].apply(self.identify_built_form_ha6) return asset_list @@ -99,7 +96,7 @@ class DataLoader: :return: """ - if ha_name in ["ha_107"]: + if ha_name in ["HA107"]: asset_list["HouseNo"] = asset_list["House No"].copy() else: split_addresses = asset_list['matching_address'].str.split(',', expand=True) @@ -113,32 +110,41 @@ class DataLoader: return asset_list - def load_asset_list(self, file_path, ha_name, sheet_name=None): - workbook = openpyxl.load_workbook(file_path) - if sheet_name is not None: - sheet = workbook[sheet_name] + @staticmethod + def create_ciga_list_house_no(ha_name, ciga_list): + """ + This function will append the House number onto the asset list + :return: + """ + + if ha_name in ["HA6"]: + split_addresses = ciga_list['Matched Address'].str.split(',', expand=True) + house_numbers = split_addresses[0].str.split(' ', expand=True) + # THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how + # many columns there might be + house_numbers = house_numbers.iloc[:, 0:1] + house_numbers.columns = ['HouseNo'] + + ciga_list = pd.concat([ciga_list, house_numbers[["HouseNo"]]], axis=1) else: - sheet = workbook.active - sheet_colnames = [cell.value for cell in sheet[self.MIN_ROWS[ha_name] - 1]] + raise NotImplementedError("Implement me") + + return ciga_list + + def load_asset_list(self, filepath, ha_name): + workbook = openpyxl.load_workbook(filepath) + asset_sheet = workbook["Assets"] + asset_sheet_colnames = [cell.value for cell in asset_sheet[1]] rows_data = [] - rows_colors = [] - for row in tqdm( - sheet.iter_rows(min_row=self.MIN_ROWS[ha_name], values_only=False) - ): # Assuming the first row is headers - + for row in asset_sheet.iter_rows(min_row=2, values_only=False): row_data = [cell.value for cell in row] # This will get you the cell values - row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None - # row_color = COLOR_INDEX[row_color] rows_data.append(row_data) - rows_colors.append(row_color) - asset_list = pd.DataFrame(rows_data, columns=sheet_colnames) + asset_list = pd.DataFrame(rows_data, columns=asset_sheet_colnames) asset_list = asset_list.loc[:, asset_list.columns.notnull()] - asset_list['row_color'] = rows_colors - - # Remove entirely empty roww - consider all rows apart from row_color + # Remove entirely empty rows - consider all rows apart from row_color asset_list = asset_list.loc[asset_list.loc[:, asset_list.columns != 'row_color'].notnull().any(axis=1)] # Add in asset_list_row_id @@ -151,77 +157,43 @@ class DataLoader: asset_list = self.append_asset_list_built_form(ha_name=ha_name, asset_list=asset_list) - return asset_list + # We check if there is a survey list + survey_list = pd.DataFrame() + if "ECO Surveys" in workbook.sheetnames: + survey_sheet = workbook["ECO Surveys"] + survey_rows = [] + for row in survey_sheet.iter_rows(min_row=2, values_only=False): # Assuming the first row is headers + row_data = [cell.value for cell in row] # This will get you the cell values + survey_rows.append(row_data) - def load_survey_list(self, file_path, ha_name, asset_list, sheet_name=None): - survey_workbook = openpyxl.load_workbook(file_path) - if sheet_name is not None: - survey_sheet = survey_workbook[sheet_name] - else: - survey_sheet = survey_workbook.active + survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]]) + # Remove columns that are None + survey_list = survey_list.loc[:, survey_list.columns.notnull()] + survey_list["survey_list_row_id"] = [ha_name + "_survey_" + str(i) for i in range(0, len(survey_list))] + # Perform survey list merge + survey_list = self.merge_surveys_to_assets(asset_list, survey_list, ha_name) - survey_rows = [] - survey_colors = [] + # We check if there are CIGA checks + ciga_list = pd.DataFrame() + if "CIGA Checks" in workbook.sheetnames: + ciga_sheet = workbook["CIGA Checks"] + ciga_rows = [] + for row in ciga_sheet.iter_rows(min_row=2, values_only=False): + row_data = [cell.value for cell in row] # This will get you the cell values + ciga_rows.append(row_data) - for row in tqdm(survey_sheet.iter_rows(min_row=2, values_only=False)): # Assuming the first row is headers - row_data = [cell.value for cell in row] # This will get you the cell values - row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None - survey_rows.append(row_data) - survey_colors.append(row_color) + ciga_list = pd.DataFrame(ciga_rows, columns=[cell.value for cell in ciga_sheet[1]]) + # Remove columns that are None + ciga_list = ciga_list.loc[:, ciga_list.columns.notnull()] + ciga_list = self.create_ciga_list_house_no(ha_name, ciga_list) + # Perform ciga list merge + ciga_list = self.merge_ciga_to_assets(asset_list, ciga_list, ha_name) - survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]]) - # Remove columns that are None - survey_list = survey_list.loc[:, survey_list.columns.notnull()] - survey_list["row_colour"] = survey_colors - - # The survey list has 4 possible colours: - # PURPLE - Installer advised install complete and a complimentary post works EPC has been completed. - # GREEN - Installer advised install complete. - # RED - Cancelled - # BLUE - Loft Only Installed - # NO FILL - No official update from installer (could be installed or cancelled) - - survey_list["row_colour_name"] = np.where( - survey_list["row_colour"] == survey_list_colours["red"], "red", - np.where(survey_list["row_colour"] == survey_list_colours["green"], "green", - np.where(survey_list["row_colour"] == survey_list_colours["purple"], "purple", - np.where(survey_list["row_colour"] == survey_list_colours["blue"], "blue", "no fill"))) - ) - - survey_list["row_meaning"] = np.where( - survey_list["row_colour_name"] == "red", "Cancelled", - np.where( - survey_list["row_colour_name"] == "green", - "Installer advised install complete", - np.where( - survey_list["row_colour_name"] == "purple", - "Installer advised install complete and a complimentary post works EPC has been completed", - np.where( - survey_list["row_colour_name"] == "blue", - "Loft Only Installed", - "No official update from installer (could be installed or cancelled)" - ) - ) - ) - ) - - # Add in asset_list_row_id - survey_list["survey_list_row_id"] = [ha_name + "_surveys_" + str(i) for i in range(0, len(survey_list))] - - # We now do the matching between the asset list and the survey list. - # What we'll get from this is a lookup table from the asset list to the survey list - - if ha_name == "ha_6": - matched_lookup = self.merge_ha_6(asset_list, survey_list) - else: - raise NotImplementedError("Only HA 6 has surveys") - - return survey_list, matched_lookup + return asset_list, survey_list, ciga_list @staticmethod - def merge_ha_6(asset_list, survey_list): + def correct_ha6_asset_list(asset_list): - # Correct the asset list across propertyaddress and matching_address asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Baggott Place", "Baggotts Place") asset_list["matching_address"] = asset_list["matching_address"].str.replace("baggott place", "baggotts place") @@ -234,6 +206,11 @@ class DataLoader: asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Moffat Way", "Moffatt Way") asset_list["matching_address"] = asset_list["matching_address"].str.replace("moffat way", "moffatt way") + return asset_list + + @staticmethod + def correct_ha6_survey_list(survey_list): + # Correct the survey list survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace( "Seabridge Road", "Seabridge Lane" @@ -358,10 +335,23 @@ class DataLoader: "Post Code" ] = "ST5 7BY" - missed_postcodes = [ - postcode.lower() for postcode in survey_list["Post Code"] if - postcode.lower() not in asset_list["matching_postcode"].values - ] + return survey_list + + def merge_surveys_to_assets(self, asset_list, survey_list, ha_name): + + # Correct the asset list + asset_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_asset_list") + asset_list = asset_list_correction_function(asset_list) + # Correct the survey list + survey_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_survey_list") + survey_list = survey_list_correction_function(survey_list) + + missed_postcodes = [] + if ha_name == "HA6": + missed_postcodes = [ + postcode.lower() for postcode in survey_list["Post Code"] if + postcode.lower() not in asset_list["matching_postcode"].values + ] matching_lookup = [] for _, row in tqdm(survey_list.iterrows(), total=len(survey_list)): @@ -405,7 +395,54 @@ class DataLoader: matching_lookup = pd.DataFrame(matching_lookup) - return matching_lookup + # Merge onto the survey list + survey_list = survey_list.merge(matching_lookup, how='left', on="survey_list_row_id") + + return survey_list + + def merge_ciga_to_assets(self, asset_list, ciga_list, ha_name): + matching_lookup = [] + for _, row in tqdm(ciga_list.iterrows(), total=len(ciga_list)): + + house_number = row["HouseNo"] + if isinstance(house_number, str): + house_number = house_number.lower().strip() + + # Filter on the postcode + df = asset_list[ + asset_list["matching_address"].str.contains(row["Matched Postcode"].lower().strip()) + ].copy() + + df = df[df["HouseNo"] == str(house_number)] + # TODO: Might need to consider street name at some point + if df.shape[0] != 1: + + if df.shape[0] != 1: + df = df[df["matching_postcode"].str.lower().str.contains(row["Post Code"].lower())] + if df.shape[0] != 1: + postcode_lower = row["Post Code"].lower() + if postcode_lower in missed_postcodes: + matching_lookup.append( + { + "survey_list_row_id": row["survey_list_row_id"], + "asset_list_row_id": None, + } + ) + continue + + print(row["Street / Block Name"]) + print(house_number) + print(row["Post Code"].lower()) + raise ValueError("Investigate") + + matching_lookup.append( + { + "survey_list_row_id": row["survey_list_row_id"], + "asset_list_row_id": df["asset_list_row_id"].values[0], + } + ) + + matching_lookup = pd.DataFrame(matching_lookup) @staticmethod def identify_built_form_ha6(property_string): @@ -445,16 +482,17 @@ class DataLoader: return data = {} - for ha_name, file_config in self.files.items(): + for filepath in self.directories: + ha_name = filepath.split("/")[2] # Load asset list logger.info("Loading asset list for {}".format(ha_name)) - asset_list = self.load_asset_list( - file_path=file_config["asset_list"]["filepath"], + asset_list, survey_list, ciga_list = self.load_asset_list( + filepath=filepath, ha_name=ha_name, - sheet_name=file_config["asset_list"]["sheetname"] ) if file_config.get("survey_list"): + # TODO: Delete this logger.info("Loading survey list for {}".format(ha_name)) survey_list, matched_lookup = self.load_survey_list( asset_list=asset_list, @@ -1240,13 +1278,16 @@ def analyse_ha_data(outputs, loader): def app(): """ - This app contains the housign association analysis for HAs 1, 6, 14, 39 and 107. + This app contains the housin association analysis for HAs 1, 6, 14, 39 and 107. Only HA 6 has surveys :return: """ use_cache = False + # List all of the data in the folder + directories = [str(list(entry.iterdir())[0]) for entry in DATA_FOLDER.iterdir() if entry.is_dir()] + files = { "ha_1": { "asset_list": { @@ -1284,7 +1325,7 @@ def app(): } } - loader = DataLoader(files, use_cache) + loader = DataLoader(directories, use_cache) loader.load() # TODO: We probably need to make sure that we have all of the columns that we need diff --git a/etl/epc_clean/app.py b/etl/epc_clean/app.py index 593559e0..bbb12d31 100644 --- a/etl/epc_clean/app.py +++ b/etl/epc_clean/app.py @@ -38,6 +38,7 @@ def app(): for directory in tqdm(epc_directories): data = pd.read_csv(directory / "certificates.csv", low_memory=False) + # Rename the columns to the same format as the api returns data.columns = [c.replace("_", "-").lower() for c in data.columns] # Take just date before the date threshold diff --git a/recommendations/Costs.py b/recommendations/Costs.py index 6ea17dce..da34d087 100644 --- a/recommendations/Costs.py +++ b/recommendations/Costs.py @@ -40,6 +40,10 @@ MCS_SOLAR_PV_COST_DATA = { # This is based on quotes from installers BATTERY_COST = 3500 +# This is based on https://www.checkatrade.com/blog/cost-guides/cost-smart-thermostat/ +SMART_APPLIANCE_THERMOSTAT_COST = 400 +PROGRAMMER_COST = 200 + class Costs: """ @@ -878,3 +882,29 @@ class Costs: "labour_hours": 72, "labour_days": 2, } + + def programmer_and_appliance_thermostat(self, has_programmer): + """ + Calculate the total cost of installing a programmer and appliance thermostat + If the property already has a programmer, then the only thing we need to calculate the cost for is the + appliance thermostat + """ + + if has_programmer: + labour_hours = 2 + total_cost = SMART_APPLIANCE_THERMOSTAT_COST + else: + labour_hours = 4 + total_cost = SMART_APPLIANCE_THERMOSTAT_COST + PROGRAMMER_COST + + subtotal_before_vat = total_cost / (1 + self.VAT_RATE) + vat = total_cost - subtotal_before_vat + + # We estimate the cost of an appliance thermostat at £400, which is the upper end of the range + return { + "total": total_cost, + "subtotal": subtotal_before_vat, + "vat": vat, + "labour_hours": labour_hours, + "labour_days": 1, + } diff --git a/recommendations/FloorRecommendations.py b/recommendations/FloorRecommendations.py index 40d9fb10..713d5f92 100644 --- a/recommendations/FloorRecommendations.py +++ b/recommendations/FloorRecommendations.py @@ -71,9 +71,7 @@ class FloorRecommendations(Definitions): def recommend(self, phase=0): u_value = self.property.floor["thermal_transmittance"] - property_type = self.property.data["property-type"] - floor_area = self.property.insulation_floor_area year_built = self.property.year_built @@ -90,6 +88,10 @@ class FloorRecommendations(Definitions): ): return + # If the property is a new build flat, we won't recommend floor upgrades + if len(self.property.full_sap_epc) and (property_type == "Flat"): + return + if u_value: # By being built more recently than this, it means that the property was likely build with soild @@ -101,16 +103,17 @@ class FloorRecommendations(Definitions): # The floor is already compliant return - u_value = get_floor_u_value( - floor_type=self.property.floor_type, - area=floor_area, - perimeter=self.property.perimeter, - age_band=self.property.age_band, - insulation_thickness=self.property.floor["insulation_thickness"], - wall_type=self.property.wall_type - ) + if u_value is None: + u_value = get_floor_u_value( + floor_type=self.property.floor_type, + area=floor_area, + perimeter=self.property.perimeter, + age_band=self.property.age_band, + insulation_thickness=self.property.floor["insulation_thickness"], + wall_type=self.property.wall_type + ) - self.estimated_u_value = u_value + self.estimated_u_value = u_value if u_value < self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE: return diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py new file mode 100644 index 00000000..e7a6948d --- /dev/null +++ b/recommendations/HeatingRecommender.py @@ -0,0 +1,57 @@ +from recommendations.Costs import Costs +from backend.Property import Property + + +class HeatingRecommender: + + def __init__(self, property_instance: Property): + self.property = property_instance + self.costs = Costs(self.property) + + self.recommendations = [] + + def recommend(self, phase=0): + # This first iteration of the recommender will provide very basic recommendation + if self.property.main_heating == "Room heaters, electric": + self.recommend_room_heaters_electric(phase=phase) + return + + def recommend_room_heaters_electric(self, phase): + """ + If the home has Room heaters, electric, we start by identifying potential heating controls that could + be upgraded, that would provide a practical impact. This will be the least invasive improvement. + + We can then consider the heating system itself + :return: + """ + if self.property.data["mainheat-energy-eff"] in ["Poor", "Very Poor"]: + # We recommend Programmer and appliance thermostats as the heating control. This has an average energy + # efficiency rating, and is likely to be more efficient than the current heating controls. if the + # rating is poor or very poor, the home may have a Programmer and room thermostat, which is less efficient + # than a Programmer and appliance thermostats, because it allows for much more granular control at not + # just a room level but individual heater/appliance level + + # Note: A room thermostat is commonly placed in a hallway, and it measures the temperature of the air + # surrounding it. It then sends a signal to the heating system to turn on or off, depending on the + # temperature. An appliance thermostat, on the other hand, is placed on the heater/appliance itself, and + # measures the temperature of the heater/appliance. This allows for much more granular control, and + # prevents overheating. + + # In order to cost, we check if the property already has a programmer, and therefor we will just need to + # add the cost of the appliance thermostats + has_programmer = self.property.main_heating_controls["switch_system"] == "programmer" + self.recommendations.append( + { + "phase": phase, + "parts": [ + # TODO + ], + "type": "heating_control", + "description": "Upgrade heating controls to Programmer and Appliance or Smart" + "Thermostats for more precise heating control, and prevention of overheating", + "starting_u_value": None, + "new_u_value": None, + "sap_points": None, + **self.costs.programmer_and_appliance_thermostat(has_programmer=has_programmer) + } + ) diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index d1eec41b..ed9917ef 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -11,6 +11,7 @@ from recommendations.FireplaceRecommendations import FireplaceRecommendations from recommendations.LightingRecommendations import LightingRecommendations from recommendations.SolarPvRecommendations import SolarPvRecommendations from recommendations.WindowsRecommendations import WindowsRecommendations +from recommendations.HeatingRecommender import HeatingRecommender from backend.ml_models.AnnualBillSavings import AnnualBillSavings @@ -42,6 +43,7 @@ class Recommendations: self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials) self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials) self.solar_recommender = SolarPvRecommendations(property_instance=property_instance) + self.heating_recommender = HeatingRecommender(property_instance=property_instance) def recommend(self): @@ -89,6 +91,12 @@ class Recommendations: property_recommendations.append(self.windows_recommender.recommendation) phase += 1 + # Heating controls recommendations + self.heating_recommender.recommend(phase=phase) + if self.heating_recommender.recommendation: + property_recommendations.append(self.heating_recommender.recommendation) + phase += 1 + # Fireplace sealing recommendations self.fireplace_recommender.recommend(phase=phase) if self.fireplace_recommender.recommendation: diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index 467c6ad3..9b731af4 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -121,7 +121,7 @@ class WallRecommendations(Definitions): u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE ): # Recommend insulation - self.find_insulation(u_value) + self.find_insulation(u_value, phase) return # We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already diff --git a/recommendations/rdsap_tables.py b/recommendations/rdsap_tables.py index e396f727..98cda9ab 100644 --- a/recommendations/rdsap_tables.py +++ b/recommendations/rdsap_tables.py @@ -511,6 +511,7 @@ FLOOR_LEVEL_MAP = { "Ground": 0, "ground floor": 0, "mid floor": 1, + "top floor": 5, "20+": 20, "21st or above": 21, **{str(i).zfill(2): i for i in range(0, 21)}, diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index 872a1c5b..395cd2ea 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -311,6 +311,7 @@ def get_roof_u_value( return float(u_value) + def estimate_number_of_floors(property_type): """ Using the property type, we estimate the number of floors in the property @@ -324,7 +325,7 @@ def estimate_number_of_floors(property_type): number_of_floors = 2 else: raise NotImplementedError("Implement me") - + return number_of_floors @@ -432,7 +433,6 @@ def get_floor_u_value(floor_type, area, perimeter, age_band, wall_type, insulati Rsi = 0.17 # in m²K/W Rse = 0.04 # in m²K/W lambda_ins = 0.035 # thermal conductivity of floor insulation in W/m·K - wall_thickness = [x[age_band] for x in default_wall_thickness if x["type"] == wall_type][0] if wall_thickness is None and wall_type == "park home": # We don't know enough and likely won't make recommendations diff --git a/utils/s3.py b/utils/s3.py index 3d6cf038..cb55094a 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -195,3 +195,32 @@ def read_pickle_from_s3(bucket_name, s3_file_name): return None return data + + +def read_excel_from_s3(bucket_name, file_key, header_row): + """ + Read an Excel file from an S3 bucket and return it as a pandas DataFrame. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :param header_row: The row number to use as the header (0-indexed). + :return: A pandas DataFrame containing the data from the Excel file. + """ + + # Ensure the file_key is an Excel file + if not file_key.endswith((".xls", ".xlsx")): + raise ValueError("The specified file does not appear to be an Excel file.") + + # Use the read_io_from_s3 function to get the data as a BytesIO object + excel_buffer = read_io_from_s3(bucket_name, file_key) + + # Read the Excel file into a pandas DataFrame + df = pd.read_excel(excel_buffer, header=header_row) + + # Drop columns where all values are NaN + df.dropna(axis=1, how='all', inplace=True) + + # Reset index if the first column is just an index or entirely NaN + df.reset_index(drop=True, inplace=True) + + return df