From eac20467657fdc181659402de0d92ea4e473e64c Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 10:31:52 +0100 Subject: [PATCH 01/19] removed rubbish code from epc clean --- backend/apis/GoogleSolarApi.py | 4 ++ backend/app/plan/router.py | 22 +++++----- backend/ml_models/AnnualBillSavings.py | 11 +++++ etl/bill_savings/data_collection.py | 56 ++++++++++++++++++++++++++ etl/epc_clean/app.py | 3 -- 5 files changed, 82 insertions(+), 14 deletions(-) create mode 100644 etl/bill_savings/data_collection.py diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index 6d2ddf6c..d29e3da5 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -213,6 +213,10 @@ class GoogleSolarApi: # 1) Convert Solar Energy AD production from the DC production panel_performance["initial_ac_kwh_per_year"] = panel_performance["yearly_dc_energy"] * self.dc_to_ac_rate + # This is just a benchmark figure, based on the national figure. This doesn't not respect the fact that a + # property could be 100% electric + average_electricity_consumption + # Remove anything where the total ac energy is less than half of the array wattage panel_performance = panel_performance[ (panel_performance["initial_ac_kwh_per_year"] / panel_performance["array_warrage"]) >= 0.5 diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 80392c88..258449c2 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -284,16 +284,16 @@ async def trigger_plan(body: PlanTriggerRequest): property_id, is_new = create_property( session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn ) - # if not is_new: - # continue - # - # create_property_targets( - # session, - # property_id=property_id, - # portfolio_id=body.portfolio_id, - # epc_target=body.goal_value, - # heat_demand_target=None - # ) + if not is_new: + continue + + create_property_targets( + session, + property_id=property_id, + portfolio_id=body.portfolio_id, + epc_target=body.goal_value, + heat_demand_target=None + ) epc_records = { 'original_epc': epc_searcher.newest_epc.copy(), @@ -356,7 +356,7 @@ async def trigger_plan(body: PlanTriggerRequest): p.get_spatial_data(uprn_filenames) # Call Google Solar API # TODO: Complete me - # solar_performance = solar_api_client.get(longitude=p.spatial["longitude"], latitude=p.spatial["latitude"]) + solar_performance = solar_api_client.get(longitude=p.spatial["longitude"], latitude=p.spatial["latitude"]) logger.info("Getting components and epc recommendations") recommendations = {} diff --git a/backend/ml_models/AnnualBillSavings.py b/backend/ml_models/AnnualBillSavings.py index 7395ab6b..e6494bcd 100644 --- a/backend/ml_models/AnnualBillSavings.py +++ b/backend/ml_models/AnnualBillSavings.py @@ -1,5 +1,16 @@ import numpy as np +QUARTERLY_ENERGY_PRICES = [ + # 2024 Q1 + {"start": "2024-01-01", "end": "2024-03-31", "electricity": 0.2, "gas": 0.042}, + # 2023 Q4 + {"start": "2023-10-01", "end": "2023-12-31", "electricity": 0.202, "gas": 0.51}, + # 2023 Q3 + {"start": "2023-07-01", "end": "2023-09-30", "electricity": 0.188, "gas": 0.46}, + # 2023 Q2 + {"start": "2023-04-01", "end": "2023-06-30", "electricity": 0.177, "gas": 0.456}, +] + class AnnualBillSavings: """ diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py new file mode 100644 index 00000000..25023894 --- /dev/null +++ b/etl/bill_savings/data_collection.py @@ -0,0 +1,56 @@ +import inspect +import pandas as pd +from tqdm import tqdm +from etl.epc_clean.EpcClean import EpcClean +from etl.epc.settings import EARLIEST_EPC_DATE +from pathlib import Path + +src_file_path = inspect.getfile(lambda: None) + +EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates" + + +def app(): + """ + This application is tasked with pulling a large quantity of data from the find my epc website, containing the + estimated energy consumption for properties + :return: + """ + + cleaned_data = {} + epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] + + data = [] + for directory in tqdm(epc_directories): + data = pd.read_csv(directory / "certificates.csv", low_memory=False) + # Rename the columns to the same format as the api returns + data.columns = [c.replace("_", "-").lower() for c in data.columns] + # Take just date before the date threshold + data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE] + + data = data[~pd.isnull(data["uprn"])] + data = data[data["mains-gas-flag"] == "N"] + data = data[data["main-fuel"] == "electricity (not community)"] + data[data["current-energy-efficiency"].astype(float) > 80]["uprn"].astype(int) + + # Convert to list of dictioaries as returned by the api + data = data.to_dict("records") + + # Incorporate input data into cleaning + cleaner = EpcClean(data) + + cleaner.clean() + # Extended cleaned_data + for k, data in cleaner.cleaned.items(): + if k not in cleaned_data: + cleaned_data[k] = data + else: + existing_descriptions = [x["original_description"] for x in cleaned_data[k]] + new_data = [x for x in data if x["original_description"] not in existing_descriptions] + cleaned_data[k].extend(new_data) + + # Basic check to make sure all descriptions are unique + for _, cleaned in cleaned_data.items(): + descriptions = [x["original_description"] for x in cleaned] + if len(descriptions) != len(set(descriptions)): + raise ValueError("Duplicated descriptions found, check me") diff --git a/etl/epc_clean/app.py b/etl/epc_clean/app.py index 59561b3c..1d833b72 100644 --- a/etl/epc_clean/app.py +++ b/etl/epc_clean/app.py @@ -39,11 +39,8 @@ def app(): cleaned_data = {} epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] - WALLS = [] for directory in tqdm(epc_directories): data = pd.read_csv(directory / "certificates.csv", low_memory=False) - z = data["WALLS_DESCRIPTION"].unique().tolist() - WALLS.extend(z) # Rename the columns to the same format as the api returns data.columns = [c.replace("_", "-").lower() for c in data.columns] # Take just date before the date threshold From 1db6dfebdfa29854315ea2896e33bf779a0a9ddb Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 11:06:11 +0100 Subject: [PATCH 02/19] created basic data collection process --- etl/bill_savings/data_collection.py | 110 ++++++++++++++++++++++------ 1 file changed, 86 insertions(+), 24 deletions(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 25023894..22b12c6e 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -1,13 +1,79 @@ +import time + +import requests import inspect import pandas as pd from tqdm import tqdm -from etl.epc_clean.EpcClean import EpcClean +from bs4 import BeautifulSoup from etl.epc.settings import EARLIEST_EPC_DATE from pathlib import Path +import numpy as np src_file_path = inspect.getfile(lambda: None) EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates" +SEARCH_POSTCODE_URL = ( + "https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}" +) +BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" + + +def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str): + """ + For a post code and address, we pull out all the required data from the find my epc website + """ + + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/111.0.0.0 Safari/537.36' + } + postcode_input = postcode.replace(" ", "+") + postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) + postcode_response = requests.get(postcode_search, headers=headers) + + postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") + address_links_full = postcode_res.findAll('a', {'class': 'govuk-link', 'rel': 'nofollow'}) + address_links = {element.text.lstrip().rstrip(): BASE_ENERGY_URL + element['href'] for element in + address_links_full} + + address_cleaned = address.replace(",", "").replace(" ", "").lower() + address_links_cleaned = [ + x.replace(",", "").replace(" ", "").lower() for x in list(address_links.keys()) + ] + + index_of_address = [key.startswith(address_cleaned) for key in address_links_cleaned] + if sum(index_of_address) > 1: + # If we have two or more addresses, we can't be sure which one is the correct one so we exit for simplicity + return None + chosen_epc = address_links[list(address_links.keys())[np.where(index_of_address)[0][0]]] + + epc_certificate = chosen_epc.split('/')[-1] + + address_response = requests.get(chosen_epc, headers=headers) + address_res = BeautifulSoup(address_response.text, features="html.parser") + + ratings = address_res.find('desc', {'id': 'svg-desc'}).text + current_rating = ratings.split(".")[0] + potential_rating = ratings.split(".")[1] + + # Retrieve the energy consumption + bills = address_res.find('div', {'id': 'bills-affected'}) + heating_text = bills.find_all('li')[0].text + hot_water_text = bills.find_all('li')[1].text + + resulting_data = { + 'uprn': uprn, + 'address': address, + 'epc_certificate': epc_certificate, + 'current_epc_rating': current_rating.split(' ')[-6], + 'current_epc_efficiency': int(current_rating.split(' ')[-1]), + 'potential_epc_rating': potential_rating.split(' ')[-6], + "potential_epc_efficiency": int(potential_rating.split(' ')[-1]), + "heating_text": heating_text, + "hot_water_text": hot_water_text, + } + + return resulting_data def app(): @@ -20,7 +86,9 @@ def app(): cleaned_data = {} epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] - data = [] + sample_size = 100 + + energy_consumption_data = [] for directory in tqdm(epc_directories): data = pd.read_csv(directory / "certificates.csv", low_memory=False) # Rename the columns to the same format as the api returns @@ -28,29 +96,23 @@ def app(): # Take just date before the date threshold data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE] - data = data[~pd.isnull(data["uprn"])] - data = data[data["mains-gas-flag"] == "N"] - data = data[data["main-fuel"] == "electricity (not community)"] - data[data["current-energy-efficiency"].astype(float) > 80]["uprn"].astype(int) + data = data.sample(sample_size) + # We use the addreess data to find the related information - # Convert to list of dictioaries as returned by the api - data = data.to_dict("records") + collected_data = [] + for _, property_data in data.iterrows(): + # Sleep for a random time between 0.1 and 1.5 seconds + time.sleep(np.random.uniform(0.1, 1.5)) - # Incorporate input data into cleaning - cleaner = EpcClean(data) + uprn = int(property_data["uprn"]) + address = property_data["address1"] + postcode = property_data["postcode"] - cleaner.clean() - # Extended cleaned_data - for k, data in cleaner.cleaned.items(): - if k not in cleaned_data: - cleaned_data[k] = data - else: - existing_descriptions = [x["original_description"] for x in cleaned_data[k]] - new_data = [x for x in data if x["original_description"] not in existing_descriptions] - cleaned_data[k].extend(new_data) + response = retrieve_find_my_epc_data( + uprn=uprn, + postcode=postcode, + address=address + ) + collected_data.append(response) - # Basic check to make sure all descriptions are unique - for _, cleaned in cleaned_data.items(): - descriptions = [x["original_description"] for x in cleaned] - if len(descriptions) != len(set(descriptions)): - raise ValueError("Duplicated descriptions found, check me") + energy_consumption_data.extend(energy_consumption_data) From 3b3c6c3cc4bd8e028efef268ac1ef797e72134ff Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 11:55:23 +0100 Subject: [PATCH 03/19] Added more robust address selection --- etl/bill_savings/data_collection.py | 85 +++++++++++++++++++++++------ 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 22b12c6e..793c13c4 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -1,4 +1,6 @@ import time +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta import requests import inspect @@ -8,6 +10,7 @@ from bs4 import BeautifulSoup from etl.epc.settings import EARLIEST_EPC_DATE from pathlib import Path import numpy as np +from utils.s3 import save_pickle_to_s3 src_file_path = inspect.getfile(lambda: None) @@ -18,7 +21,13 @@ SEARCH_POSTCODE_URL = ( BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" -def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str): +def calculate_expiry_date(lodgement_date): + lodgement_date_dt = datetime.strptime(lodgement_date, '%Y-%m-%d') + expiry_date_dt = lodgement_date_dt + relativedelta(years=10) - timedelta(days=1) + return expiry_date_dt.strftime('%d %B %Y') + + +def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_expiry_date: str): """ For a post code and address, we pull out all the required data from the find my epc website """ @@ -31,22 +40,52 @@ def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str): postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) postcode_response = requests.get(postcode_search, headers=headers) - postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") - address_links_full = postcode_res.findAll('a', {'class': 'govuk-link', 'rel': 'nofollow'}) - address_links = {element.text.lstrip().rstrip(): BASE_ENERGY_URL + element['href'] for element in - address_links_full} - address_cleaned = address.replace(",", "").replace(" ", "").lower() - address_links_cleaned = [ - x.replace(",", "").replace(" ", "").lower() for x in list(address_links.keys()) - ] + postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") + rows = postcode_res.find_all('tr', class_='govuk-table__row') - index_of_address = [key.startswith(address_cleaned) for key in address_links_cleaned] - if sum(index_of_address) > 1: - # If we have two or more addresses, we can't be sure which one is the correct one so we exit for simplicity + extracted_table = [] + for row in rows: + # Extract the address and URL + address_tag = row.find('a', class_='govuk-link') + if address_tag is None: + continue + extracted_address = None + extracted_address_url = None + if address_tag: + extracted_address = address_tag.text.strip() + extracted_address_url = address_tag['href'] + + extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower() + if not extracted_address_cleaned.startswith(address_cleaned): + continue + + # If the address is a match, we can extract the data + + # Extract the expiry date + expiry_date_tag = row.find('td', class_='govuk-table__cell date') + expiry_date = None + if expiry_date_tag is not None: + expiry_date = expiry_date_tag.parent.find('span').text.strip() + + extracted_table.append( + { + "extracted_address": extracted_address, + "extracted_address_url": extracted_address_url, + "expiry_date": expiry_date + } + ) + + extracted_table = [entry for entry in extracted_table if entry['expiry_date'] == expected_expiry_date] + + if len(extracted_table) > 1: + print("Multiple candidates found, skipping for now") return None - chosen_epc = address_links[list(address_links.keys())[np.where(index_of_address)[0][0]]] + if not extracted_table: + raise Exception("Fix me") + + chosen_epc = BASE_ENERGY_URL + extracted_table[0]['extracted_address_url'] epc_certificate = chosen_epc.split('/')[-1] address_response = requests.get(chosen_epc, headers=headers) @@ -83,7 +122,6 @@ def app(): :return: """ - cleaned_data = {} epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] sample_size = 100 @@ -96,6 +134,10 @@ def app(): # Take just date before the date threshold data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE] + data = data[~pd.isnull(data["uprn"])] + # Take just the newest EPC per uprn, based on lodgement-date + data = data.sort_values("lodgement-date", ascending=False).drop_duplicates("uprn") + data = data.sample(sample_size) # We use the addreess data to find the related information @@ -107,12 +149,23 @@ def app(): uprn = int(property_data["uprn"]) address = property_data["address1"] postcode = property_data["postcode"] + expected_expiry_date = calculate_expiry_date(property_data["lodgement-date"]) response = retrieve_find_my_epc_data( uprn=uprn, postcode=postcode, - address=address + address=address, + expected_expiry_date=expected_expiry_date ) + if response is None: + continue collected_data.append(response) - energy_consumption_data.extend(energy_consumption_data) + energy_consumption_data.extend(collected_data) + + # Store the pickle in s3 + save_time = datetime.now() + save_pickle_to_s3( + energy_consumption_data, bucket_name="retrofit-datalake-dev", + s3_file_name=f"energy_consumption_data/{save_time}.pkl" + ) From 298bb5a148db4a43ef752d65ed5fba99671c0e6c Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 12:00:24 +0100 Subject: [PATCH 04/19] extract leading zero from date --- etl/bill_savings/data_collection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 793c13c4..873bf957 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -24,7 +24,7 @@ BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" def calculate_expiry_date(lodgement_date): lodgement_date_dt = datetime.strptime(lodgement_date, '%Y-%m-%d') expiry_date_dt = lodgement_date_dt + relativedelta(years=10) - timedelta(days=1) - return expiry_date_dt.strftime('%d %B %Y') + return expiry_date_dt.strftime('%-d %B %Y') def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_expiry_date: str): From b8e769347936fd5df8c299484d5e61f942f45dfc Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 12:06:18 +0100 Subject: [PATCH 05/19] Adding epc directory to output --- etl/bill_savings/data_collection.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 873bf957..26ed156e 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -159,7 +159,12 @@ def app(): ) if response is None: continue - collected_data.append(response) + collected_data.append( + { + **response, + "epc_directory": directory + } + ) energy_consumption_data.extend(collected_data) From d562324dd906c8c8ab49ac27845c3c4faac4da12 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 12:22:35 +0100 Subject: [PATCH 06/19] skip cases with no candidates --- etl/bill_savings/data_collection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 26ed156e..521a3783 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -83,7 +83,8 @@ def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_e return None if not extracted_table: - raise Exception("Fix me") + print("No candidates found, skipping for now") + return None chosen_epc = BASE_ENERGY_URL + extracted_table[0]['extracted_address_url'] epc_certificate = chosen_epc.split('/')[-1] From 654251c084b0c8f2729834235725187636c3c433 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 12:25:16 +0100 Subject: [PATCH 07/19] handle case of no respone --- etl/bill_savings/data_collection.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 521a3783..2632c296 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -102,8 +102,8 @@ def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_e hot_water_text = bills.find_all('li')[1].text resulting_data = { - 'uprn': uprn, - 'address': address, + 'extracted_uprn': uprn, + 'extracted_address': address, 'epc_certificate': epc_certificate, 'current_epc_rating': current_rating.split(' ')[-6], 'current_epc_efficiency': int(current_rating.split(' ')[-1]), @@ -163,6 +163,7 @@ def app(): collected_data.append( { **response, + "epc": property_data.to_dict(), "epc_directory": directory } ) From 4bcd17596e9931508c1df3cfc13b16d440b36980 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 13:11:23 +0100 Subject: [PATCH 08/19] handle missing bills data --- etl/bill_savings/data_collection.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 2632c296..1f787d48 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -98,8 +98,11 @@ def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_e # Retrieve the energy consumption bills = address_res.find('div', {'id': 'bills-affected'}) - heating_text = bills.find_all('li')[0].text - hot_water_text = bills.find_all('li')[1].text + bills_list = bills.find_all('li') + if not bills_list: + return None + heating_text = bills_list[0].text + hot_water_text = bills_list[1].text resulting_data = { 'extracted_uprn': uprn, From dd0deab0ee3274d8503093483e242dbfee10c4ff Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 14:59:16 +0100 Subject: [PATCH 09/19] setting up energy consumption model class --- etl/bill_savings/EnergyConsumptionModel.py | 89 ++++++++++++++++++++ etl/bill_savings/data_collation.py | 94 ++++++++++++++++++++++ etl/bill_savings/data_collection.py | 9 ++- utils/s3.py | 30 +++++++ 4 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 etl/bill_savings/EnergyConsumptionModel.py create mode 100644 etl/bill_savings/data_collation.py diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py new file mode 100644 index 00000000..2ca88da5 --- /dev/null +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -0,0 +1,89 @@ +import pandas as pd +from datetime import datetime +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression +from sklearn.metrics import mean_squared_error, r2_score +from utils.s3 import save_pickle_to_s3, read_pickle_from_s3 + + +class EnergyConsumptionModel: + FEATURES = ['feature_1', 'feature_2'] + TARGETS = ['heating_kwh', 'hot_water_kwh'] + + def __init__(self, model_paths=None): + self.models = {} + self.model_paths = model_paths or {} + self.data = None + + self.X_train = None + self.X_test = None + self.y_train = None + self.y_test = None + + if model_paths: + for target, path in model_paths.items(): + self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) + + def read_dataset(self, file_path): + self.data = pd.read_csv(file_path) + + def feature_engineering(self): + # Example feature engineering steps + self.data['feature_1'] = self.data['original_feature_1'] ** 2 + self.data['feature_2'] = self.data['original_feature_2'] ** 0.5 + # Add more feature engineering steps as required + + def split_dataset(self, target, test_size=0.2, random_state=42): + X = self.data[self.FEATURES] + y = self.data[target] + self.X_train, self.X_test, self.y_train, self.y_test = train_test_split( + X, y, test_size=test_size, random_state=random_state + ) + + def fit_model(self, target): + self.models[target] = LinearRegression() + self.models[target].fit(self.X_train, self.y_train) + + def evaluate_model(self, target): + y_pred = self.models[target].predict(self.X_test) + mse = mean_squared_error(self.y_test, y_pred) + r2 = r2_score(self.y_test, y_pred) + return {'MSE': mse, 'R2': r2} + + def save_model(self, target): + run_date = datetime.now().strftime("%Y-%m-%d") + save_pickle_to_s3( + self.models[target], + bucket_name="retrofit-model-directory-dev", + s3_file_name=f"model_directory/energy_consumption_model/{target}_{run_date}.pkl" + ) + + def score_new_data(self, new_data, target): + if target not in self.models: + raise ValueError(f"Model for target {target} not loaded or trained") + + new_data_transformed = self.transform_new_data(new_data) + return self.models[target].predict(new_data_transformed) + + def transform_new_data(self, new_data): + # Apply the same transformations as in feature_engineering + new_data['feature_1'] = new_data['original_feature_1'] ** 2 + new_data['feature_2'] = new_data['original_feature_2'] ** 0.5 + return new_data[self.FEATURES] + +# Example usage: +# model = EnergyConsumptionModel() +# model.read_dataset('/mnt/data/energy_consumption_dataset.csv') +# model.feature_engineering() + +# For heating_kwh +# model.split_dataset(target='heating_kwh') +# model.fit_model(target='heating_kwh') +# print(model.evaluate_model(target='heating_kwh')) +# model.save_model(target='heating_kwh') + +# For hot_water_kwh +# model.split_dataset(target='hot_water_kwh') +# model.fit_model(target='hot_water_kwh') +# print(model.evaluate_model(target='hot_water_kwh')) +# model.save_model(target='hot_water_kwh') diff --git a/etl/bill_savings/data_collation.py b/etl/bill_savings/data_collation.py new file mode 100644 index 00000000..ef2b286b --- /dev/null +++ b/etl/bill_savings/data_collation.py @@ -0,0 +1,94 @@ +import re +from datetime import datetime + +import pandas as pd + +from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet + +# These columns we co-erce to strings before saving +PROBLEMATIC_COLUMNS = ["main-heating-controls"] + + +def extract_kwh_value(text): + """ + Extract the numerical kWh value from a given string. + + :param text: The input string containing the kWh value. + :return: The extracted numerical kWh value as an integer. + """ + # Use regular expression to find the numerical value followed by "kWh per year" + match = re.search(r'([\d,]+) kWh per year', text) + + if match: + # Remove commas from the extracted value and convert to integer + kwh_value = int(match.group(1).replace(',', '')) + return kwh_value + else: + # If no match is found, return None or raise an exception + return None + + +def app(): + """ + Given the files written in our datalake in s3, this application will collate the data into a single file + and store it back in s3 for analysis + :return: + """ + + # Firstly, list all of the saved files in s3 + data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data") + + run_date = datetime.now().strftime("%Y-%m-%d") + + complete_data = [] + for files in data_files: + dataset_run_date = files.split("/")[-1].split(".")[0] + # Extract the date from the file name + dataset_run_date = pd.Timestamp(dataset_run_date) + + # Load the data from the file + data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files) + + # We check that the retrieved energy consumption sufficiently matches the EPC data + internal_dataset = [] + for x in data: + epc_data = x["epc"] + epc_sap = epc_data["current-energy-efficiency"] + epc_potential_sap = epc_data["potential-energy-efficiency"] + # Make sure this matches the extracted sap + if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int( + x["potential_epc_efficiency"] + ): + continue + + heating_kwh = extract_kwh_value(x["heating_text"]) + hot_water_kwh = extract_kwh_value(x["hot_water_text"]) + internal_dataset.append( + { + **epc_data, + "heating_kwh": heating_kwh, + "hot_water_kwh": hot_water_kwh, + "dataset_run_date": dataset_run_date + } + ) + + complete_data.extend(internal_dataset) + + df = pd.DataFrame(complete_data) + # Because we collate multiple runs into a single data source, it's possible that we have duplicated data at + # the uprn level, so we dedupe based on the newest dataset_run_date + + df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first") + df = df.drop(columns=["dataset_run_date"]) + + for col in PROBLEMATIC_COLUMNS: + df[col] = df[col].astype(str) + + # Save the data back to s3, but this time as a parquet file + save_dataframe_to_s3_parquet( + bucket_name="retrofit-data-dev", + file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet", + df=df + ) + + df.to_csv("energy_consumption_dataset.csv", index=False) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 1f787d48..3b503122 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -131,7 +131,8 @@ def app(): sample_size = 100 energy_consumption_data = [] - for directory in tqdm(epc_directories): + for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): + data = pd.read_csv(directory / "certificates.csv", low_memory=False) # Rename the columns to the same format as the api returns data.columns = [c.replace("_", "-").lower() for c in data.columns] @@ -147,8 +148,8 @@ def app(): collected_data = [] for _, property_data in data.iterrows(): - # Sleep for a random time between 0.1 and 1.5 seconds - time.sleep(np.random.uniform(0.1, 1.5)) + # Sleep for a random time between 0.1 and 1.4 seconds + time.sleep(np.random.uniform(0.1, 1.4)) uprn = int(property_data["uprn"]) address = property_data["address1"] @@ -167,7 +168,7 @@ def app(): { **response, "epc": property_data.to_dict(), - "epc_directory": directory + "epc_directory": str(directory) } ) diff --git a/utils/s3.py b/utils/s3.py index 05482271..1b14ca97 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -246,3 +246,33 @@ def read_csv_from_s3(bucket_name, filepath): data = list(reader) return data + + +def list_files_in_s3_folder(bucket_name, folder_name): + """ + List all files in a given folder in an S3 bucket. + + :param bucket_name: The name of the S3 bucket. + :param folder_name: The folder name within the S3 bucket. + :return: A list of file keys in the specified S3 folder. + """ + try: + s3 = boto3.client('s3') + response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) + + if 'Contents' not in response: + logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.") + return [] + + file_keys = [content['Key'] for content in response['Contents']] + return file_keys + + except NoCredentialsError: + logger.error("Credentials not available.") + return [] + except PartialCredentialsError: + logger.error("Incomplete credentials provided.") + return [] + except Exception as e: + logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}') + return [] From 7790822e76cd968e0af10b76ff451e73d2362b56 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 16:06:01 +0100 Subject: [PATCH 10/19] making the data objects dictionaries for different targets --- etl/bill_savings/EnergyConsumptionModel.py | 109 +++++++++++++++------ etl/bill_savings/data_collection.py | 6 +- 2 files changed, 81 insertions(+), 34 deletions(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 2ca88da5..d2c77e48 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -3,51 +3,87 @@ from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error, r2_score -from utils.s3 import save_pickle_to_s3, read_pickle_from_s3 +from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet class EnergyConsumptionModel: - FEATURES = ['feature_1', 'feature_2'] + FEATURES = { + "heating_kwh": [ + "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", + "heating-cost-current", + ], + "hot_water_kwh": [ + "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", + "hot-water-cost-current" + ] + } TARGETS = ['heating_kwh', 'hot_water_kwh'] + CATEGORICAL_COLUMNS = ["lodgement-year", "lodgement-month"] + NUMERICAL_COLUMNS = ["current-energy-efficiency", "energy-consumption-current", "heating-cost-current", + "hot-water-cost-current"] def __init__(self, model_paths=None): self.models = {} self.model_paths = model_paths or {} self.data = None + self.dummy_columns = None - self.X_train = None - self.X_test = None - self.y_train = None - self.y_test = None + self.x_train = {} + self.x_test = {} + self.y_train = {} + self.y_test = {} if model_paths: for target, path in model_paths.items(): self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) def read_dataset(self, file_path): - self.data = pd.read_csv(file_path) + self.data = read_dataframe_from_s3_parquet(bucket_name="retrofit-data-dev", file_key=file_path) def feature_engineering(self): - # Example feature engineering steps - self.data['feature_1'] = self.data['original_feature_1'] ** 2 - self.data['feature_2'] = self.data['original_feature_2'] ** 0.5 - # Add more feature engineering steps as required + # Extract date features + self.data["lodgement-date"] = pd.to_datetime(self.data["lodgement-date"]) + self.data["lodgement-year"] = self.data["lodgement-date"].dt.year + self.data["lodgement-month"] = self.data["lodgement-date"].dt.month + + # Convert data types + self.data[self.NUMERICAL_COLUMNS] = self.data[self.NUMERICAL_COLUMNS].apply(pd.to_numeric) + self.data[self.CATEGORICAL_COLUMNS] = self.data[self.CATEGORICAL_COLUMNS].astype(str) + + # Convert categorical columns to dummies + self.data = pd.get_dummies(self.data, columns=self.CATEGORICAL_COLUMNS, drop_first=True) + + # Store the dummy columns + self.dummy_columns = {} + for target in self.TARGETS: + target_features = self.FEATURES[target] + dummy_feature_columns = [] + for feature in target_features: + if feature in self.CATEGORICAL_COLUMNS: + dummy_feature_columns.extend([col for col in self.data.columns if col.startswith(feature + '_')]) + else: + dummy_feature_columns.append(feature) + self.dummy_columns[target] = dummy_feature_columns def split_dataset(self, target, test_size=0.2, random_state=42): - X = self.data[self.FEATURES] + + if target not in self.TARGETS: + raise ValueError(f"Target {target} not in {self.TARGETS}") + + x = self.data[self.dummy_columns[target]] y = self.data[target] - self.X_train, self.X_test, self.y_train, self.y_test = train_test_split( - X, y, test_size=test_size, random_state=random_state + self.x_train[target], self.x_test[target], self.y_train[target], self.y_test[target] = train_test_split( + x, y, test_size=test_size, random_state=random_state ) def fit_model(self, target): self.models[target] = LinearRegression() - self.models[target].fit(self.X_train, self.y_train) + self.models[target].fit(self.x_train[target], self.y_train[target]) def evaluate_model(self, target): - y_pred = self.models[target].predict(self.X_test) - mse = mean_squared_error(self.y_test, y_pred) - r2 = r2_score(self.y_test, y_pred) + y_pred = self.models[target].predict(self.x_test[target]) + mse = mean_squared_error(self.y_test[target], y_pred) + r2 = r2_score(self.y_test[target], y_pred) return {'MSE': mse, 'R2': r2} def save_model(self, target): @@ -67,23 +103,32 @@ class EnergyConsumptionModel: def transform_new_data(self, new_data): # Apply the same transformations as in feature_engineering - new_data['feature_1'] = new_data['original_feature_1'] ** 2 - new_data['feature_2'] = new_data['original_feature_2'] ** 0.5 - return new_data[self.FEATURES] + new_data["lodgement-date"] = pd.to_datetime(new_data["lodgement-date"]) + new_data["lodgement-year"] = new_data["lodgement-date"].dt.year + new_data["lodgement-month"] = new_data["lodgement-date"].dt.month + + # Convert categorical columns to dummies + new_data = pd.get_dummies(new_data, columns=self.CATEGORICAL_COLUMNS, drop_first=True) + + # Align new data with the dummy columns from training data + new_data = new_data.reindex(columns=self.dummy_columns, fill_value=0) + + return new_data.drop(columns=[target for target in self.TARGETS if target in new_data.columns]) + # Example usage: -# model = EnergyConsumptionModel() -# model.read_dataset('/mnt/data/energy_consumption_dataset.csv') -# model.feature_engineering() +model = EnergyConsumptionModel() +model.read_dataset('energy_consumption/2024-07-02/energy_consumption_dataset.parquet') +model.feature_engineering() # For heating_kwh -# model.split_dataset(target='heating_kwh') -# model.fit_model(target='heating_kwh') -# print(model.evaluate_model(target='heating_kwh')) -# model.save_model(target='heating_kwh') +model.split_dataset(target='heating_kwh') +model.fit_model(target='heating_kwh') +print(model.evaluate_model(target='heating_kwh')) +model.save_model(target='heating_kwh') # For hot_water_kwh -# model.split_dataset(target='hot_water_kwh') -# model.fit_model(target='hot_water_kwh') -# print(model.evaluate_model(target='hot_water_kwh')) -# model.save_model(target='hot_water_kwh') +model.split_dataset(target='hot_water_kwh') +model.fit_model(target='hot_water_kwh') +print(model.evaluate_model(target='hot_water_kwh')) +model.save_model(target='hot_water_kwh') diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 3b503122..79afa936 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -132,6 +132,9 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): + # Skip the first 50 + if i < 50: + continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) # Rename the columns to the same format as the api returns @@ -148,8 +151,7 @@ def app(): collected_data = [] for _, property_data in data.iterrows(): - # Sleep for a random time between 0.1 and 1.4 seconds - time.sleep(np.random.uniform(0.1, 1.4)) + time.sleep(np.random.uniform(0.3, 2)) uprn = int(property_data["uprn"]) address = property_data["address1"] From 39a4c2e975d1cb72ab09da36b101ba8e23f9c777 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 17:29:34 +0100 Subject: [PATCH 11/19] updated to use xgboost - much better performance --- etl/bill_savings/EnergyConsumptionModel.py | 146 +++++++++++++++++---- 1 file changed, 123 insertions(+), 23 deletions(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index d2c77e48..ca221175 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -1,16 +1,23 @@ import pandas as pd +from xgboost import XGBRegressor from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression -from sklearn.metrics import mean_squared_error, r2_score +from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error +from sklearn.feature_selection import RFECV from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class EnergyConsumptionModel: FEATURES = { "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "heating-cost-current", + "heating-cost-current", "main-fuel", "total-floor-area", "number-heated-rooms", "number-habitable-rooms", + "mainheat-energy-eff" ], "hot_water_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", @@ -18,34 +25,52 @@ class EnergyConsumptionModel: ] } TARGETS = ['heating_kwh', 'hot_water_kwh'] - CATEGORICAL_COLUMNS = ["lodgement-year", "lodgement-month"] + CATEGORICAL_COLUMNS = [ + "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", + "number-habitable-rooms", "mainheat-energy-eff" + ] NUMERICAL_COLUMNS = ["current-energy-efficiency", "energy-consumption-current", "heating-cost-current", - "hot-water-cost-current"] + "hot-water-cost-current", "total-floor-area"] def __init__(self, model_paths=None): self.models = {} self.model_paths = model_paths or {} self.data = None + self.input_data = None self.dummy_columns = None + self.training_predictions = {} + self.testing_predictions = {} self.x_train = {} self.x_test = {} self.y_train = {} self.y_test = {} + self.selected_features = {} if model_paths: for target, path in model_paths.items(): self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) def read_dataset(self, file_path): + """Reads the dataset from the specified file path.""" + logging.info(f"Reading dataset from {file_path}") self.data = read_dataframe_from_s3_parquet(bucket_name="retrofit-data-dev", file_key=file_path) + self.input_data = self.data.copy() def feature_engineering(self): - # Extract date features + """Performs feature engineering on the dataset.""" + logging.info("Starting feature engineering") self.data["lodgement-date"] = pd.to_datetime(self.data["lodgement-date"]) self.data["lodgement-year"] = self.data["lodgement-date"].dt.year self.data["lodgement-month"] = self.data["lodgement-date"].dt.month + # Modify number of heated rooms and number of habitable rooms + # self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply(lambda x: "10+" if x > 10 else + # str(x)) + # self.data["number-habitable-rooms"] = self.data["number-habitable-rooms"].apply( + # lambda x: "10+" if x > 10 else str(x) + # ) + # Convert data types self.data[self.NUMERICAL_COLUMNS] = self.data[self.NUMERICAL_COLUMNS].apply(pd.to_numeric) self.data[self.CATEGORICAL_COLUMNS] = self.data[self.CATEGORICAL_COLUMNS].astype(str) @@ -65,28 +90,97 @@ class EnergyConsumptionModel: dummy_feature_columns.append(feature) self.dummy_columns[target] = dummy_feature_columns - def split_dataset(self, target, test_size=0.2, random_state=42): + logging.info("Feature engineering completed") + def split_dataset(self, target, test_size=0.2, random_state=42): + """Splits the dataset into training and testing sets.""" if target not in self.TARGETS: raise ValueError(f"Target {target} not in {self.TARGETS}") + logging.info(f"Splitting dataset for target {target}") x = self.data[self.dummy_columns[target]] y = self.data[target] self.x_train[target], self.x_test[target], self.y_train[target], self.y_test[target] = train_test_split( x, y, test_size=test_size, random_state=random_state ) + def feature_selection(self, target): + """Performs feature selection using RFECV.""" + if target not in self.TARGETS: + raise ValueError(f"Target {target} not in {self.TARGETS}") + + logging.info(f"Starting feature selection for target {target}") + x = self.x_train[target] + y = self.y_train[target] + + # Initialize the XGBoost model and RFECV + model = XGBRegressor(objective='reg:squarederror') + selector = RFECV(model, step=1, cv=5, scoring='neg_mean_absolute_percentage_error') + selector = selector.fit(x, y) + + # Get the selected features + self.selected_features[target] = x.columns[selector.support_] + + # Update x_train and x_test with selected features + self.x_train[target] = x[self.selected_features[target]] + self.x_test[target] = self.x_test[target][self.selected_features[target]] + + logging.info(f"Feature selection completed for target {target}") + def fit_model(self, target): - self.models[target] = LinearRegression() + """Fits the linear regression model to the training data.""" + logging.info(f"Fitting model for target {target}") + self.models[target] = XGBRegressor(objective='reg:squarederror') self.models[target].fit(self.x_train[target], self.y_train[target]) + logging.info(f"Model fitting completed for target {target}") def evaluate_model(self, target): - y_pred = self.models[target].predict(self.x_test[target]) - mse = mean_squared_error(self.y_test[target], y_pred) - r2 = r2_score(self.y_test[target], y_pred) - return {'MSE': mse, 'R2': r2} + """Evaluates the model on training and testing data.""" + logging.info(f"Evaluating model for target {target}") + y_train_pred = self.models[target].predict(self.x_train[target]) + train_mse = mean_squared_error(self.y_train[target], y_train_pred) + train_r2 = r2_score(self.y_train[target], y_train_pred) + train_mape = mean_absolute_percentage_error(self.y_train[target], y_train_pred) + + self.training_predictions[target] = pd.DataFrame({ + 'Actual': self.y_train[target], + 'Predicted': y_train_pred + }) + + y_test_pred = self.models[target].predict(self.x_test[target]) + test_mse = mean_squared_error(self.y_test[target], y_test_pred) + test_r2 = r2_score(self.y_test[target], y_test_pred) + test_mape = mean_absolute_percentage_error(self.y_test[target], y_test_pred) + + self.testing_predictions[target] = pd.DataFrame({ + 'Actual': self.y_test[target], + 'Predicted': y_test_pred + }) + + feature_importance = pd.DataFrame({ + 'Feature': self.selected_features[target], + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) + + logging.info(f"Evaluation completed for target {target}") + + return { + 'train': { + 'MSE': train_mse, + 'R2': train_r2, + 'MAPE': train_mape, + 'Feature Importance': feature_importance + }, + 'test': { + 'MSE': test_mse, + 'R2': test_r2, + 'MAPE': test_mape + } + } def save_model(self, target): + """Saves the model to S3.""" + logging.info(f"Saving model for target {target}") run_date = datetime.now().strftime("%Y-%m-%d") save_pickle_to_s3( self.models[target], @@ -95,14 +189,17 @@ class EnergyConsumptionModel: ) def score_new_data(self, new_data, target): + """Scores new data using the trained model.""" if target not in self.models: raise ValueError(f"Model for target {target} not loaded or trained") - new_data_transformed = self.transform_new_data(new_data) + new_data_transformed = self.transform_new_data(new_data, target) return self.models[target].predict(new_data_transformed) - def transform_new_data(self, new_data): - # Apply the same transformations as in feature_engineering + def transform_new_data(self, new_data, target): + """Applies the same transformations to new data as were applied to the training data.""" + + # TODO THis should jsut use our other transformation function new_data["lodgement-date"] = pd.to_datetime(new_data["lodgement-date"]) new_data["lodgement-year"] = new_data["lodgement-date"].dt.year new_data["lodgement-month"] = new_data["lodgement-date"].dt.month @@ -111,9 +208,12 @@ class EnergyConsumptionModel: new_data = pd.get_dummies(new_data, columns=self.CATEGORICAL_COLUMNS, drop_first=True) # Align new data with the dummy columns from training data - new_data = new_data.reindex(columns=self.dummy_columns, fill_value=0) + new_data = new_data.reindex(columns=self.dummy_columns[target], fill_value=0) - return new_data.drop(columns=[target for target in self.TARGETS if target in new_data.columns]) + # Select the features used by the model + new_data = new_data[self.selected_features[target]] + + return new_data # Example usage: @@ -123,12 +223,12 @@ model.feature_engineering() # For heating_kwh model.split_dataset(target='heating_kwh') +model.feature_selection(target='heating_kwh') model.fit_model(target='heating_kwh') -print(model.evaluate_model(target='heating_kwh')) -model.save_model(target='heating_kwh') +evaluation_results = model.evaluate_model(target='heating_kwh') +from pprint import pprint -# For hot_water_kwh -model.split_dataset(target='hot_water_kwh') -model.fit_model(target='hot_water_kwh') -print(model.evaluate_model(target='hot_water_kwh')) -model.save_model(target='hot_water_kwh') +pprint(evaluation_results["train"]) +pprint(evaluation_results["test"]) + +importance_df = evaluation_results["train"]["Feature Importance"] From 0a1f728f37705a396f4d18879ae7d89881544ea9 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 17:48:06 +0100 Subject: [PATCH 12/19] implemented xgboost which performs really well --- etl/bill_savings/EnergyConsumptionModel.py | 5 ++--- etl/bill_savings/data_collection.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index ca221175..51972a36 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -2,7 +2,6 @@ import pandas as pd from xgboost import XGBRegressor from datetime import datetime from sklearn.model_selection import train_test_split -from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error from sklearn.feature_selection import RFECV from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet @@ -16,8 +15,8 @@ class EnergyConsumptionModel: FEATURES = { "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "heating-cost-current", "main-fuel", "total-floor-area", "number-heated-rooms", "number-habitable-rooms", - "mainheat-energy-eff" + "heating-cost-current", "total-floor-area", "number-heated-rooms", "number-habitable-rooms", + # "mainheat-energy-eff", "mainheat-description", "main-fuel", ], "hot_water_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 79afa936..24b10d7f 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -133,7 +133,7 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): # Skip the first 50 - if i < 50: + if i < 90: continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) From 14417c37dfe9dcbe5ba717d84e83199c2d58181f Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 18:13:23 +0100 Subject: [PATCH 13/19] error analysis - not working though --- etl/bill_savings/EnergyConsumptionModel.py | 60 +++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 51972a36..27fcc518 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -15,7 +15,8 @@ class EnergyConsumptionModel: FEATURES = { "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "heating-cost-current", "total-floor-area", "number-heated-rooms", "number-habitable-rooms", + "heating-cost-current", "total-floor-area", "number-heated-rooms", + # "number-habitable-rooms", # "mainheat-energy-eff", "mainheat-description", "main-fuel", ], "hot_water_kwh": [ @@ -214,6 +215,63 @@ class EnergyConsumptionModel: return new_data + def error_analysis(self, target, top_n=10, unique_threshold=0.8): + """ + Perform error analysis on the provided model and dataset. + """ + + # Calculate predictions and residuals + y_train_pred = self.models[target].predict(self.x_train[target]) + y_test_pred = self.models[target].predict(self.x_test[target]) + + train_residuals = self.y_train[target] - y_train_pred + test_residuals = self.y_test[target] - y_test_pred + + # Identify top N poorly performing rows by absolute residuals + top_train_indices = train_residuals.abs().nlargest(top_n).index + top_test_indices = test_residuals.abs().nlargest(top_n).index + + top_train_data = self.input_data.loc[top_train_indices] + top_test_data = self.input_data.loc[top_test_indices] + + def exclude_columns(data, threshold): + exclude_cols = [] + num_rows = data.shape[0] + for col in data.columns: + if data[col].dtype == 'object' and data[col].nunique() / num_rows >= threshold: + exclude_cols.append(col) + return exclude_cols + + exclude_cols = exclude_columns(top_train_data, unique_threshold) + + top_train_data = top_train_data.drop(columns=exclude_cols) + top_test_data = top_test_data.drop(columns=exclude_cols) + + # TODO: Not working + + # One-hot encode categorical variables + categorical_columns = top_train_data.select_dtypes(include=['object']).columns.tolist() + top_train_data_encoded = pd.get_dummies(top_train_data, columns=categorical_columns, drop_first=True) + top_test_data_encoded = pd.get_dummies(top_test_data, columns=categorical_columns, drop_first=True) + + # Align the encoded data with the training data + top_train_data_encoded = top_train_data_encoded.reindex(columns=self.x_train[target].columns, fill_value=0) + top_test_data_encoded = top_test_data_encoded.reindex(columns=self.x_test[target].columns, fill_value=0) + + # Correlation analysis with residuals + train_corr = top_train_data_encoded.corrwith(train_residuals.loc[top_train_indices]) + test_corr = top_test_data_encoded.corrwith(test_residuals.loc[top_test_indices]) + + # Return summaries + summary = { + "train_corr": train_corr, + "test_corr": test_corr, + "top_train_data": top_train_data, + "top_test_data": top_test_data + } + + return summary + # Example usage: model = EnergyConsumptionModel() From b0449b9e90560505d4d45a63c41cd5cb5213e345 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 18:28:27 +0100 Subject: [PATCH 14/19] decent performing model --- etl/bill_savings/EnergyConsumptionModel.py | 58 +++++++++++++++------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 27fcc518..6492c7a6 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -16,8 +16,7 @@ class EnergyConsumptionModel: "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", "heating-cost-current", "total-floor-area", "number-heated-rooms", - # "number-habitable-rooms", - # "mainheat-energy-eff", "mainheat-description", "main-fuel", + "mainheat-description", "main-fuel", "mainheat-energy-eff", "number-habitable-rooms", ], "hot_water_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", @@ -104,25 +103,41 @@ class EnergyConsumptionModel: x, y, test_size=test_size, random_state=random_state ) - def feature_selection(self, target): - """Performs feature selection using RFECV.""" + def feature_selection(self, target, cv_folds=3, sample_fraction=0.1, random_state=42): + """ + Performs feature selection using RFECV with XGBoost. + + Parameters: + - target: The target variable for feature selection. + - cv_folds: Number of cross-validation folds. + - sample_fraction: Fraction of the data to use for feature selection. + - random_state: Random state for reproducibility. + """ if target not in self.TARGETS: raise ValueError(f"Target {target} not in {self.TARGETS}") logging.info(f"Starting feature selection for target {target}") - x = self.x_train[target] - y = self.y_train[target] + + # Sample the data if specified + if sample_fraction < 1.0: + x_sample, _, y_sample, _ = train_test_split( + self.x_train[target], self.y_train[target], + train_size=sample_fraction, random_state=random_state + ) + else: + x_sample = self.x_train[target] + y_sample = self.y_train[target] # Initialize the XGBoost model and RFECV - model = XGBRegressor(objective='reg:squarederror') - selector = RFECV(model, step=1, cv=5, scoring='neg_mean_absolute_percentage_error') - selector = selector.fit(x, y) + model = XGBRegressor(objective='reg:squarederror', n_jobs=-1) + selector = RFECV(model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error') + selector = selector.fit(x_sample, y_sample) # Get the selected features - self.selected_features[target] = x.columns[selector.support_] + self.selected_features[target] = x_sample.columns[selector.support_] # Update x_train and x_test with selected features - self.x_train[target] = x[self.selected_features[target]] + self.x_train[target] = self.x_train[target][self.selected_features[target]] self.x_test[target] = self.x_test[target][self.selected_features[target]] logging.info(f"Feature selection completed for target {target}") @@ -218,6 +233,14 @@ class EnergyConsumptionModel: def error_analysis(self, target, top_n=10, unique_threshold=0.8): """ Perform error analysis on the provided model and dataset. + + Parameters: + - target: The target variable to analyze. + - top_n: Number of top residuals to consider for analysis. + - unique_threshold: Threshold to exclude columns with high unique values. + + Returns: + - summary: Dictionary summarizing common features among poorly performing rows. """ # Calculate predictions and residuals @@ -234,6 +257,7 @@ class EnergyConsumptionModel: top_train_data = self.input_data.loc[top_train_indices] top_test_data = self.input_data.loc[top_test_indices] + # Automatically detect and exclude columns def exclude_columns(data, threshold): exclude_cols = [] num_rows = data.shape[0] @@ -247,16 +271,14 @@ class EnergyConsumptionModel: top_train_data = top_train_data.drop(columns=exclude_cols) top_test_data = top_test_data.drop(columns=exclude_cols) - # TODO: Not working - # One-hot encode categorical variables categorical_columns = top_train_data.select_dtypes(include=['object']).columns.tolist() top_train_data_encoded = pd.get_dummies(top_train_data, columns=categorical_columns, drop_first=True) top_test_data_encoded = pd.get_dummies(top_test_data, columns=categorical_columns, drop_first=True) - # Align the encoded data with the training data - top_train_data_encoded = top_train_data_encoded.reindex(columns=self.x_train[target].columns, fill_value=0) - top_test_data_encoded = top_test_data_encoded.reindex(columns=self.x_test[target].columns, fill_value=0) + # Ensure all original columns are included in the encoded data + top_train_data_encoded = top_train_data_encoded.reindex(columns=self.input_data.columns, fill_value=0) + top_test_data_encoded = top_test_data_encoded.reindex(columns=self.input_data.columns, fill_value=0) # Correlation analysis with residuals train_corr = top_train_data_encoded.corrwith(train_residuals.loc[top_train_indices]) @@ -264,6 +286,8 @@ class EnergyConsumptionModel: # Return summaries summary = { + "train_summary": top_train_data.describe(include='all').T, + "test_summary": top_test_data.describe(include='all').T, "train_corr": train_corr, "test_corr": test_corr, "top_train_data": top_train_data, @@ -280,7 +304,7 @@ model.feature_engineering() # For heating_kwh model.split_dataset(target='heating_kwh') -model.feature_selection(target='heating_kwh') +model.feature_selection(target='heating_kwh', cv_folds=3, sample_fraction=0.1) model.fit_model(target='heating_kwh') evaluation_results = model.evaluate_model(target='heating_kwh') from pprint import pprint From 77aaecf04fa45ad285ce5cb3262bf7f549996545 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 18:38:55 +0100 Subject: [PATCH 15/19] Added some additional features --- etl/bill_savings/EnergyConsumptionModel.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 6492c7a6..e0a52e19 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -17,6 +17,10 @@ class EnergyConsumptionModel: "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", "heating-cost-current", "total-floor-area", "number-heated-rooms", "mainheat-description", "main-fuel", "mainheat-energy-eff", "number-habitable-rooms", + "mainheatcont-description", "property-type", "built-form", + # To test + # "hotwater-description" - make a days since lodgment variable? + # A geographic variable ], "hot_water_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", @@ -26,7 +30,7 @@ class EnergyConsumptionModel: TARGETS = ['heating_kwh', 'hot_water_kwh'] CATEGORICAL_COLUMNS = [ "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", - "number-habitable-rooms", "mainheat-energy-eff" + "number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form", ] NUMERICAL_COLUMNS = ["current-energy-efficiency", "energy-consumption-current", "heating-cost-current", "hot-water-cost-current", "total-floor-area"] From 58e60ae3765844580a80b4651a870f0e6d8bea85 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 2 Jul 2024 18:41:10 +0100 Subject: [PATCH 16/19] Added age band --- etl/bill_savings/EnergyConsumptionModel.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index e0a52e19..c02d4c8c 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -17,7 +17,7 @@ class EnergyConsumptionModel: "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", "heating-cost-current", "total-floor-area", "number-heated-rooms", "mainheat-description", "main-fuel", "mainheat-energy-eff", "number-habitable-rooms", - "mainheatcont-description", "property-type", "built-form", + "mainheatcont-description", "property-type", "built-form", "construction-age-band" # To test # "hotwater-description" - make a days since lodgment variable? # A geographic variable @@ -31,6 +31,7 @@ class EnergyConsumptionModel: CATEGORICAL_COLUMNS = [ "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", "number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form", + "construction-age-band" ] NUMERICAL_COLUMNS = ["current-energy-efficiency", "energy-consumption-current", "heating-cost-current", "hot-water-cost-current", "total-floor-area"] From fa6e61f0b9628ab45c7f1d45c4934b46b67dad8c Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 4 Jul 2024 19:52:19 +0100 Subject: [PATCH 17/19] hot water model working nicely --- etl/bill_savings/EnergyConsumptionModel.py | 164 ++++++++++++++---- etl/bill_savings/data_collection.py | 2 +- .../{data_collation.py => data_combining.py} | 7 +- 3 files changed, 137 insertions(+), 36 deletions(-) rename etl/bill_savings/{data_collation.py => data_combining.py} (95%) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index c02d4c8c..89847ca1 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -6,6 +6,7 @@ from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percenta from sklearn.feature_selection import RFECV from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet import logging +from pprint import pprint # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -15,42 +16,58 @@ class EnergyConsumptionModel: FEATURES = { "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "heating-cost-current", "total-floor-area", "number-heated-rooms", - "mainheat-description", "main-fuel", "mainheat-energy-eff", "number-habitable-rooms", - "mainheatcont-description", "property-type", "built-form", "construction-age-band" - # To test - # "hotwater-description" - make a days since lodgment variable? - # A geographic variable + "heating-cost-current", + "total-floor-area", "number-heated-rooms", + "mainheat-description", "mainheat-energy-eff", "main-fuel", + # TESTING + "secondheat-description", + # , , "number-habitable-rooms", + # "mainheatcont-description", + # "co2-emissions-current", + # "property-type", "built-form", ], "hot_water_kwh": [ - "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "hot-water-cost-current" + "lodgement-year", "lodgement-month", + "current-energy-efficiency", + "energy-consumption-current", + "hot-water-cost-current", + "total-floor-area", "number-heated-rooms", + "hotwater-description", "hot-water-energy-eff", "main-fuel", "property-type", "built-form", + "co2-emissions-current", ] } TARGETS = ['heating_kwh', 'hot_water_kwh'] CATEGORICAL_COLUMNS = [ "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", "number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form", - "construction-age-band" + "construction-age-band", "secondheat-description", "hotwater-description", "hot-water-energy-eff", ] - NUMERICAL_COLUMNS = ["current-energy-efficiency", "energy-consumption-current", "heating-cost-current", - "hot-water-cost-current", "total-floor-area"] - def __init__(self, model_paths=None): + def __init__(self, model_paths=None, n_jobs=1): self.models = {} self.model_paths = model_paths or {} + self.n_jobs = n_jobs + self.data = None self.input_data = None self.dummy_columns = None self.training_predictions = {} self.testing_predictions = {} + self.best_iteration = {} self.x_train = {} self.x_test = {} + self.x_val = {} + self.y_val = {} self.y_train = {} self.y_test = {} self.selected_features = {} + self.NUMERICAL_COLUMNS = list({ + x for x in self.FEATURES["heating_kwh"] + self.FEATURES["hot_water_kwh"] + if x not in self.CATEGORICAL_COLUMNS + }) + if model_paths: for target, path in model_paths.items(): self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) @@ -96,18 +113,32 @@ class EnergyConsumptionModel: logging.info("Feature engineering completed") - def split_dataset(self, target, test_size=0.2, random_state=42): - """Splits the dataset into training and testing sets.""" + def split_dataset(self, target, test_size=0.2, validation_size=0.2, random_state=42): + """Splits the dataset into training, validation, and testing sets.""" if target not in self.TARGETS: raise ValueError(f"Target {target} not in {self.TARGETS}") logging.info(f"Splitting dataset for target {target}") - x = self.data[self.dummy_columns[target]] - y = self.data[target] - self.x_train[target], self.x_test[target], self.y_train[target], self.y_test[target] = train_test_split( - x, y, test_size=test_size, random_state=random_state + + # Split into train + validation and test sets + x_train_val, x_test, y_train_val, y_test = train_test_split( + self.data[self.dummy_columns[target]], + self.data[target], + test_size=test_size, + random_state=random_state ) + # Split train + validation into train and validation sets + x_train, x_val, y_train, y_val = train_test_split( + x_train_val, + y_train_val, + test_size=validation_size / (1 - test_size), + random_state=random_state + ) + + self.x_train[target], self.x_val[target], self.x_test[target] = x_train, x_val, x_test + self.y_train[target], self.y_val[target], self.y_test[target] = y_train, y_val, y_test + def feature_selection(self, target, cv_folds=3, sample_fraction=0.1, random_state=42): """ Performs feature selection using RFECV with XGBoost. @@ -134,26 +165,72 @@ class EnergyConsumptionModel: y_sample = self.y_train[target] # Initialize the XGBoost model and RFECV - model = XGBRegressor(objective='reg:squarederror', n_jobs=-1) - selector = RFECV(model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error') + model = self.init_model(feature_selection=True) + selector = RFECV( + model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error', verbose=1, n_jobs=self.n_jobs + ) selector = selector.fit(x_sample, y_sample) # Get the selected features self.selected_features[target] = x_sample.columns[selector.support_] - # Update x_train and x_test with selected features + # Update x_train, x_test and x_val with selected features self.x_train[target] = self.x_train[target][self.selected_features[target]] self.x_test[target] = self.x_test[target][self.selected_features[target]] + self.x_val[target] = self.x_val[target][self.selected_features[target]] logging.info(f"Feature selection completed for target {target}") + def init_model(self, feature_selection=False): + + if feature_selection: + # Set up a smaller model to work it + return XGBRegressor( + objective='reg:squarederror', + n_estimators=50, + learning_rate=0.05, + max_depth=6, + subsample=0.8, + colsample_bytree=0.8, + # n_jobs=self.n_jobs + ) + + return XGBRegressor( + objective='reg:squarederror', + n_estimators=1000, + learning_rate=0.05, + max_depth=6, + subsample=0.8, + colsample_bytree=0.8, + # n_jobs=self.n_jobs + ) + def fit_model(self, target): """Fits the linear regression model to the training data.""" logging.info(f"Fitting model for target {target}") - self.models[target] = XGBRegressor(objective='reg:squarederror') - self.models[target].fit(self.x_train[target], self.y_train[target]) + self.models[target] = self.init_model() + self.models[target].fit( + self.x_train[target], + self.y_train[target], + eval_set=[(self.x_val[target], self.y_val[target])], + early_stopping_rounds=50 + ) logging.info(f"Model fitting completed for target {target}") + # Store the best iteration + self.best_iteration[target] = self.models[target].best_iteration + + def re_train_final_model(self, target): + """Re-trains the final model on the combined training and validation set.""" + logging.info(f"Re-training final model for target {target}") + x_train_val = pd.concat([self.x_train[target], self.x_val[target]]) + y_train_val = pd.concat([self.y_train[target], self.y_val[target]]) + + self.models[target] = self.init_model() + + self.models[target].fit(x_train_val, y_train_val, verbose=False) + logging.info(f"Re-training final model completed for target {target}") + def evaluate_model(self, target): """Evaluates the model on training and testing data.""" logging.info(f"Evaluating model for target {target}") @@ -166,6 +243,9 @@ class EnergyConsumptionModel: 'Actual': self.y_train[target], 'Predicted': y_train_pred }) + self.training_predictions[target]["residual"] = abs( + self.training_predictions[target]["Actual"] - self.training_predictions[target]["Predicted"] + ) y_test_pred = self.models[target].predict(self.x_test[target]) test_mse = mean_squared_error(self.y_test[target], y_test_pred) @@ -176,11 +256,20 @@ class EnergyConsumptionModel: 'Actual': self.y_test[target], 'Predicted': y_test_pred }) + self.testing_predictions[target]["residual"] = abs( + self.testing_predictions[target]["Actual"] - self.testing_predictions[target]["Predicted"] + ) - feature_importance = pd.DataFrame({ - 'Feature': self.selected_features[target], - 'Importance': self.models[target].feature_importances_ - }).sort_values(by='Importance', ascending=False) + if target in self.selected_features: + feature_importance = pd.DataFrame({ + 'Feature': self.selected_features[target], + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) + else: + feature_importance = pd.DataFrame({ + 'Feature': self.x_train[target].columns, + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) logging.info(f"Evaluation completed for target {target}") @@ -303,18 +392,31 @@ class EnergyConsumptionModel: # Example usage: -model = EnergyConsumptionModel() -model.read_dataset('energy_consumption/2024-07-02/energy_consumption_dataset.parquet') +model = EnergyConsumptionModel(n_jobs=2) +model.read_dataset('energy_consumption/2024-07-04/energy_consumption_dataset.parquet') model.feature_engineering() # For heating_kwh model.split_dataset(target='heating_kwh') -model.feature_selection(target='heating_kwh', cv_folds=3, sample_fraction=0.1) +# model.feature_selection(target='heating_kwh', cv_folds=3, sample_fraction=0.1) model.fit_model(target='heating_kwh') + +model.re_train_final_model(target='heating_kwh') evaluation_results = model.evaluate_model(target='heating_kwh') -from pprint import pprint pprint(evaluation_results["train"]) pprint(evaluation_results["test"]) importance_df = evaluation_results["train"]["Feature Importance"] +testing_predictions = model.testing_predictions["heating_kwh"] +testing_predictions = testing_predictions.sort_values("residual", ascending=False) +# Merge on model.input_data, by the index +merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True) + +# For hot_water_kwh +model.split_dataset(target='hot_water_kwh') +model.fit_model(target='hot_water_kwh') +model.re_train_final_model(target='hot_water_kwh') +evaluation_results = model.evaluate_model(target='hot_water_kwh') +pprint(evaluation_results["train"]) +pprint(evaluation_results["test"]) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 24b10d7f..ecc62015 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -133,7 +133,7 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): # Skip the first 50 - if i < 90: + if i < 305: continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) diff --git a/etl/bill_savings/data_collation.py b/etl/bill_savings/data_combining.py similarity index 95% rename from etl/bill_savings/data_collation.py rename to etl/bill_savings/data_combining.py index ef2b286b..a111ecf2 100644 --- a/etl/bill_savings/data_collation.py +++ b/etl/bill_savings/data_combining.py @@ -1,12 +1,13 @@ import re from datetime import datetime +from tqdm import tqdm import pandas as pd from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet # These columns we co-erce to strings before saving -PROBLEMATIC_COLUMNS = ["main-heating-controls"] +PROBLEMATIC_COLUMNS = ["main-heating-controls", "floor-level"] def extract_kwh_value(text): @@ -41,7 +42,7 @@ def app(): run_date = datetime.now().strftime("%Y-%m-%d") complete_data = [] - for files in data_files: + for files in tqdm(data_files): dataset_run_date = files.split("/")[-1].split(".")[0] # Extract the date from the file name dataset_run_date = pd.Timestamp(dataset_run_date) @@ -90,5 +91,3 @@ def app(): file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet", df=df ) - - df.to_csv("energy_consumption_dataset.csv", index=False) From 1320416dc355af0170306bc921064744d436f54b Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 5 Jul 2024 12:15:01 +0100 Subject: [PATCH 18/19] Added new ecr instances --- etl/bill_savings/EnergyConsumptionModel.py | 163 ++++++++++++++++++--- etl/bill_savings/data_collection.py | 2 +- infrastructure/terraform/main.tf | 46 ++++-- 3 files changed, 174 insertions(+), 37 deletions(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 89847ca1..534b8d60 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -1,10 +1,12 @@ import pandas as pd +import numpy as np +import msgpack from xgboost import XGBRegressor from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error from sklearn.feature_selection import RFECV -from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet +from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet, read_from_s3 import logging from pprint import pprint @@ -14,17 +16,36 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %( class EnergyConsumptionModel: FEATURES = { + # "heating_kwh": [ + # "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", + # "heating-cost-current", + # "total-floor-area", "number-heated-rooms", + # "mainheat-description", "mainheat-energy-eff", "main-fuel", "secondheat-description", + # "property-type", "built-form", "mainheatcont-description", 'hotwater-description', 'hot-water-energy-eff', + # # TESTING + # # "walls-description", + # "walls-energy-eff", + # # "roof-description", + # "roof-energy-eff", + # # "floor-description", + # # "county" + # # "co2-emissions-current", - Made it worse + # # TODO: Should hot water features go in here? + # # , , "number-habitable-rooms", + # # + # # + # # + # ], "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "heating-cost-current", - "total-floor-area", "number-heated-rooms", - "mainheat-description", "mainheat-energy-eff", "main-fuel", - # TESTING - "secondheat-description", - # , , "number-habitable-rooms", - # "mainheatcont-description", - # "co2-emissions-current", - # "property-type", "built-form", + "heating-cost-current", "heating-cost-potential", "total-floor-area", "number-heated-rooms", + "mainheat-description", "mainheat-energy-eff", "main-fuel", "secondheat-description", "property-type", + "built-form", "mainheatcont-description", "hotwater-description", "hot-water-energy-eff", + "walls-energy-eff", + "roof-energy-eff", "windows-description", "windows-energy-eff", "floor-description", "flat-top-storey", + "flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation", + "low-energy-lighting", "environment-impact-current", "energy-tariff", + "county", "construction-age-band", "co2-emissions-current" ], "hot_water_kwh": [ "lodgement-year", "lodgement-month", @@ -41,9 +62,15 @@ class EnergyConsumptionModel: "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", "number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form", "construction-age-band", "secondheat-description", "hotwater-description", "hot-water-energy-eff", + "walls-description", "walls-energy-eff", "roof-description", "roof-energy-eff", "floor-description", + "county", + "windows-description", "windows-energy-eff", "flat-top-storey", + "flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation", + "low-energy-lighting", "environment-impact-current", "energy-tariff" ] - def __init__(self, model_paths=None, n_jobs=1): + def __init__(self, cleaned, model_paths=None, n_jobs=1): + self.cleaned = cleaned self.models = {} self.model_paths = model_paths or {} self.n_jobs = n_jobs @@ -85,6 +112,55 @@ class EnergyConsumptionModel: self.data["lodgement-year"] = self.data["lodgement-date"].dt.year self.data["lodgement-month"] = self.data["lodgement-date"].dt.month + # For walls, roof, floor description where we have average thermal transmittance, to avoid too many categories + # we group them + ranges = { + "lessthan 0.1": (0, 0.1), + "0.1 - 0.3": (0.1, 0.3), + "0.3 - 0.5": (0.3, 0.5), + "morethan 0.5": (0.5, 2.5), + } + + # Generate the lookup table + thermal_transmittance_lookup_table = [] + for i in range(1, 251): + value = i / 100 + for label, (low, high) in ranges.items(): + if low < value <= high: + thermal_transmittance_lookup_table.append({"from": value, "to": label}) + break + + # Convert to DataFrame for display + thermal_transmittance_lookup_table = pd.DataFrame(thermal_transmittance_lookup_table) + thermal_transmittance_lookup_table["from"] = thermal_transmittance_lookup_table["from"].astype(str) + + # Apply the lookup table to the data + for feature in ["walls-description", "roof-description", "floor-description"]: + cleaned_df = pd.DataFrame(self.cleaned[feature])[["original_description", "thermal_transmittance"]] + # Round to 2 decimal places and convert to string + cleaned_df["thermal_transmittance"] = cleaned_df["thermal_transmittance"].round(2).astype(str) + + self.data = self.data.merge( + cleaned_df, + how="left", + left_on=feature, + right_on="original_description", + ) + # We now have the thermal transmittance in the data, which we can use to group with the lookup table + self.data = self.data.merge( + thermal_transmittance_lookup_table, + how="left", + left_on="thermal_transmittance", + right_on="from", + ) + # Where "to" is populated, replace feature with to + self.data[feature] = np.where( + ~pd.isnull(self.data["to"]), + self.data["to"], + self.data[feature] + ) + self.data = self.data.drop(columns=["original_description", "thermal_transmittance", "from", "to"]) + # Modify number of heated rooms and number of habitable rooms # self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply(lambda x: "10+" if x > 10 else # str(x)) @@ -192,7 +268,8 @@ class EnergyConsumptionModel: max_depth=6, subsample=0.8, colsample_bytree=0.8, - # n_jobs=self.n_jobs + reg_alpha=0.1, + reg_lambda=0.1 ) return XGBRegressor( @@ -200,26 +277,62 @@ class EnergyConsumptionModel: n_estimators=1000, learning_rate=0.05, max_depth=6, + min_child_weight=3, subsample=0.8, colsample_bytree=0.8, + reg_alpha=0.1, + reg_lambda=0.1 # n_jobs=self.n_jobs ) def fit_model(self, target): - """Fits the linear regression model to the training data.""" + """Fits the model to the training data and removes zero-importance features.""" + logging.info(f"Fitting model for target {target}") - self.models[target] = self.init_model() - self.models[target].fit( + + # Initialize and fit the model + model = self.init_model() + model.fit( self.x_train[target], self.y_train[target], eval_set=[(self.x_val[target], self.y_val[target])], early_stopping_rounds=50 ) - logging.info(f"Model fitting completed for target {target}") + + # Store the model + self.models[target] = model + + # Identify and remove zero-importance features + feature_importance = pd.DataFrame({ + 'Feature': self.x_train[target].columns, + 'Importance': model.feature_importances_ + }) + zero_importance_features = feature_importance[feature_importance['Importance'] == 0]['Feature'].tolist() + + if zero_importance_features: + logging.info(f"Removing zero-importance features for target {target}: {zero_importance_features}") + + self.x_train[target] = self.x_train[target].drop(columns=zero_importance_features) + self.x_val[target] = self.x_val[target].drop(columns=zero_importance_features) + self.x_test[target] = self.x_test[target].drop(columns=zero_importance_features) + + # Re-fit the model with the reduced feature set + model = self.init_model() + model.fit( + self.x_train[target], + self.y_train[target], + eval_set=[(self.x_val[target], self.y_val[target])], + early_stopping_rounds=50 + ) + + # Update the model + self.models[target] = model # Store the best iteration self.best_iteration[target] = self.models[target].best_iteration + logging.info(f"Model fitting completed for target {target}") + def re_train_final_model(self, target): """Re-trains the final model on the combined training and validation set.""" logging.info(f"Re-training final model for target {target}") @@ -391,16 +504,21 @@ class EnergyConsumptionModel: return summary -# Example usage: -model = EnergyConsumptionModel(n_jobs=2) -model.read_dataset('energy_consumption/2024-07-04/energy_consumption_dataset.parquet') +# Usage: +cleaned = read_from_s3( + s3_file_name="cleaned_epc_data/cleaned.bson", + bucket_name="retrofit-data-dev" +) + +cleaned = msgpack.unpackb(cleaned, raw=False) + +model = EnergyConsumptionModel(cleaned=cleaned, n_jobs=2) +model.read_dataset('energy_consumption/2024-07-05/energy_consumption_dataset.parquet') model.feature_engineering() # For heating_kwh model.split_dataset(target='heating_kwh') -# model.feature_selection(target='heating_kwh', cv_folds=3, sample_fraction=0.1) model.fit_model(target='heating_kwh') - model.re_train_final_model(target='heating_kwh') evaluation_results = model.evaluate_model(target='heating_kwh') @@ -410,8 +528,11 @@ pprint(evaluation_results["test"]) importance_df = evaluation_results["train"]["Feature Importance"] testing_predictions = model.testing_predictions["heating_kwh"] testing_predictions = testing_predictions.sort_values("residual", ascending=False) +training_predictions = model.training_predictions["heating_kwh"] +training_predictions = training_predictions.sort_values("residual", ascending=False) # Merge on model.input_data, by the index merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True) +merged_data_train = training_predictions.merge(model.input_data, left_index=True, right_index=True) # For hot_water_kwh model.split_dataset(target='hot_water_kwh') diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index ecc62015..4d913e8f 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -133,7 +133,7 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): # Skip the first 50 - if i < 305: + if i < 36: continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index 0da850c5..f968aba8 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -49,30 +49,30 @@ resource "aws_security_group" "allow_db" { ingress { # TLS (change to whatever ports you need) - from_port = 5432 - to_port = 5432 - protocol = "tcp" + from_port = 5432 + to_port = 5432 + protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] } egress { - from_port = 0 - to_port = 0 - protocol = "-1" + from_port = 0 + to_port = 0 + protocol = "-1" cidr_blocks = ["0.0.0.0/0"] } } resource "aws_db_instance" "default" { - allocated_storage = var.allocated_storage - engine = "postgres" - engine_version = "14.10" - instance_class = var.instance_class - db_name = var.database_name - username = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_username"] - password = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_password"] - parameter_group_name = "default.postgres14" - skip_final_snapshot = true + allocated_storage = var.allocated_storage + engine = "postgres" + engine_version = "14.10" + instance_class = var.instance_class + db_name = var.database_name + username = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_username"] + password = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_password"] + parameter_group_name = "default.postgres14" + skip_final_snapshot = true vpc_security_group_ids = [aws_security_group.allow_db.id] lifecycle { prevent_destroy = true @@ -187,6 +187,22 @@ module "lambda_heat_prediction_ecr" { source = "./modules/ecr" } +# ECR repos for lighting cost, heating cost and hot water cost models +module "lambda_lighting_cost_prediction_ecr" { + ecr_name = "lighting-cost-prediction-${var.stage}" + source = "./modules/ecr" +} + +module "lambda_heating_cost_prediction_ecr" { + ecr_name = "heating-cost-prediction-${var.stage}" + source = "./modules/ecr" +} + +module "lambda_hot_water_cost_prediction_ecr" { + ecr_name = "hot-water-cost-prediction-${var.stage}" + source = "./modules/ecr" +} + ############################################## # CDN - Cloudfront ############################################## From a665b2897a6ff4281fd6bc1b9f0c55d7ac666d4b Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 5 Jul 2024 12:27:01 +0100 Subject: [PATCH 19/19] added required buckets for ecr --- etl/bill_savings/EnergyConsumptionModel.py | 31 +++++----------------- infrastructure/terraform/main.tf | 17 ++++++++++++ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 534b8d60..b616be08 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -16,26 +16,6 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %( class EnergyConsumptionModel: FEATURES = { - # "heating_kwh": [ - # "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - # "heating-cost-current", - # "total-floor-area", "number-heated-rooms", - # "mainheat-description", "mainheat-energy-eff", "main-fuel", "secondheat-description", - # "property-type", "built-form", "mainheatcont-description", 'hotwater-description', 'hot-water-energy-eff', - # # TESTING - # # "walls-description", - # "walls-energy-eff", - # # "roof-description", - # "roof-energy-eff", - # # "floor-description", - # # "county" - # # "co2-emissions-current", - Made it worse - # # TODO: Should hot water features go in here? - # # , , "number-habitable-rooms", - # # - # # - # # - # ], "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", "heating-cost-current", "heating-cost-potential", "total-floor-area", "number-heated-rooms", @@ -45,7 +25,9 @@ class EnergyConsumptionModel: "roof-energy-eff", "windows-description", "windows-energy-eff", "floor-description", "flat-top-storey", "flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation", "low-energy-lighting", "environment-impact-current", "energy-tariff", - "county", "construction-age-band", "co2-emissions-current" + "county", "construction-age-band", "co2-emissions-current", + # TODO: Testing + "lighting-cost-current", "hot-water-cost-current", "current-energy-rating" ], "hot_water_kwh": [ "lodgement-year", "lodgement-month", @@ -66,7 +48,7 @@ class EnergyConsumptionModel: "county", "windows-description", "windows-energy-eff", "flat-top-storey", "flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation", - "low-energy-lighting", "environment-impact-current", "energy-tariff" + "low-energy-lighting", "environment-impact-current", "energy-tariff", "current-energy-rating" ] def __init__(self, cleaned, model_paths=None, n_jobs=1): @@ -162,8 +144,9 @@ class EnergyConsumptionModel: self.data = self.data.drop(columns=["original_description", "thermal_transmittance", "from", "to"]) # Modify number of heated rooms and number of habitable rooms - # self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply(lambda x: "10+" if x > 10 else - # str(x)) + self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply( + lambda x: "16_or_more" if x > 15 else str(x) + ) # self.data["number-habitable-rooms"] = self.data["number-habitable-rooms"].apply( # lambda x: "10+" if x > 10 else str(x) # ) diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index f968aba8..b90c73a8 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -145,6 +145,23 @@ module "retrofit_heat_predictions" { allowed_origins = var.allowed_origins } +module "retrofit_lighting_cost_predictions" { + source = "./modules/s3" + bucketname = "retrofit-lighting-cost-predictions-${var.stage}" + allowed_origins = var.allowed_origins +} + +module "retrofit_heating_cost_predictions" { + source = "./modules/s3" + bucketname = "retrofit-heating-cost-predictions-${var.stage}" + allowed_origins = var.allowed_origins +} + +module "retrofit_hot_water_cost_predictions" { + source = "./modules/s3" + bucketname = "retrofit-hot-water-cost-predictions-${var.stage}" + allowed_origins = var.allowed_origins +} # Set up the route53 record for the API module "route53" {