From 3002a2c740d6cebc59fd337ef5d4a48032c6433c Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 9 Aug 2024 10:38:40 +0100 Subject: [PATCH] created KwhData class --- etl/bill_savings/KwhData.py | 118 +++++++++++++++++++++++++++++ etl/bill_savings/data_combining.py | 98 +----------------------- 2 files changed, 121 insertions(+), 95 deletions(-) diff --git a/etl/bill_savings/KwhData.py b/etl/bill_savings/KwhData.py index e69de29b..ad7a375a 100644 --- a/etl/bill_savings/KwhData.py +++ b/etl/bill_savings/KwhData.py @@ -0,0 +1,118 @@ +import re +import pandas as pd +from datetime import datetime +from tqdm import tqdm +from utils.logger import setup_logger +from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet + +logger = setup_logger() + + +class KwhData: + COLS_TO_STRINGIFY = ["main-heating-controls", "floor-level"] + + def __init__(self, bucket): + self.run_date = datetime.now().strftime("%Y-%m-%d") + self.bucket = bucket + self.data = None + + self.consumption_data_filepath = None + self.consumption_averages_filepath = None + + @staticmethod + def extract_kwh_value(text: str): + """ + Extract the numerical kWh value from a given string. + + :param text: The input string containing the kWh value. + :return: The extracted numerical kWh value as an integer. + """ + # Use regular expression to find the numerical value followed by "kWh per year" + match = re.search(r'([\d,]+) kWh per year', text) + + if match: + # Remove commas from the extracted value and convert to integer + kwh_value = int(match.group(1).replace(',', '')) + return kwh_value + else: + # If no match is found, return None or raise an exception + return None + + def combine(self): + """ + Given the data that is collected containing the kwh values for heating and hot water, this method will combine + and save the data + :return: + """ + + # Firstly, list all of the saved files in s3 + data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data") + + complete_data = [] + for files in tqdm(data_files): + dataset_run_date = files.split("/")[-1].split(".")[0] + # Extract the date from the file name + dataset_run_date = pd.Timestamp(dataset_run_date) + + # Load the data from the file + data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files) + + # We check that the retrieved energy consumption sufficiently matches the EPC data + internal_dataset = [] + for x in data: + epc_data = x["epc"] + epc_sap = epc_data["current-energy-efficiency"] + epc_potential_sap = epc_data["potential-energy-efficiency"] + # Make sure this matches the extracted sap + if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int( + x["potential_epc_efficiency"] + ): + continue + + heating_kwh = self.extract_kwh_value(x["heating_text"]) + hot_water_kwh = self.extract_kwh_value(x["hot_water_text"]) + internal_dataset.append( + { + **epc_data, + "heating_kwh": heating_kwh, + "hot_water_kwh": hot_water_kwh, + "dataset_run_date": dataset_run_date + } + ) + + complete_data.extend(internal_dataset) + + df = pd.DataFrame(complete_data) + # Because we collate multiple runs into a single data source, it's possible that we have duplicated data at + # the uprn level, so we dedupe based on the newest dataset_run_date + + df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first") + df = df.drop(columns=["dataset_run_date"]) + + for col in self.COLS_TO_STRINGIFY: + df[col] = df[col].astype(str) + + # Save the data back to s3, but this time as a parquet file + self.consumption_data_filepath = f"energy_consumption/{self.run_date}/energy_consumption_dataset.parquet" + logger.info(f"Storing energy consumption dataset in s3 at {self.consumption_data_filepath}") + save_dataframe_to_s3_parquet( + bucket_name=self.bucket, + file_key=self.consumption_data_filepath, + df=df + ) + + # We also estimate the energy consumption reduction from this data, by band + df["total_consumption"] = df["heating_kwh"] + df["hot_water_kwh"] + consumption_averages = df.groupby("current-energy-efficiency")["total_consumption"].mean().reset_index() + df = df.drop(columns=["total_consumption"]) + + self.consumption_averages_filepath = f"energy_consumption/{self.run_date}/consumption_averages.parquet" + logger.info(f"Storing consumption averages in s3 at {self.consumption_averages_filepath}") + # Save the consumption averages back to s3 + save_dataframe_to_s3_parquet( + bucket_name="retrofit-data-dev", + file_key=self.consumption_averages_filepath, + df=consumption_averages + ) + + self.data = df diff --git a/etl/bill_savings/data_combining.py b/etl/bill_savings/data_combining.py index dece3834..970c92bf 100644 --- a/etl/bill_savings/data_combining.py +++ b/etl/bill_savings/data_combining.py @@ -1,32 +1,4 @@ -import re -from datetime import datetime -from tqdm import tqdm - -import pandas as pd - -from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet - -# These columns we co-erce to strings before saving -PROBLEMATIC_COLUMNS = ["main-heating-controls", "floor-level"] - - -def extract_kwh_value(text): - """ - Extract the numerical kWh value from a given string. - - :param text: The input string containing the kWh value. - :return: The extracted numerical kWh value as an integer. - """ - # Use regular expression to find the numerical value followed by "kWh per year" - match = re.search(r'([\d,]+) kWh per year', text) - - if match: - # Remove commas from the extracted value and convert to integer - kwh_value = int(match.group(1).replace(',', '')) - return kwh_value - else: - # If no match is found, return None or raise an exception - return None +from etl.bill_savings.KwhData import KwhData def app(): @@ -36,69 +8,5 @@ def app(): :return: """ - # Firstly, list all of the saved files in s3 - data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data") - - run_date = datetime.now().strftime("%Y-%m-%d") - - complete_data = [] - for files in tqdm(data_files): - dataset_run_date = files.split("/")[-1].split(".")[0] - # Extract the date from the file name - dataset_run_date = pd.Timestamp(dataset_run_date) - - # Load the data from the file - data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files) - - # We check that the retrieved energy consumption sufficiently matches the EPC data - internal_dataset = [] - for x in data: - epc_data = x["epc"] - epc_sap = epc_data["current-energy-efficiency"] - epc_potential_sap = epc_data["potential-energy-efficiency"] - # Make sure this matches the extracted sap - if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int( - x["potential_epc_efficiency"] - ): - continue - - heating_kwh = extract_kwh_value(x["heating_text"]) - hot_water_kwh = extract_kwh_value(x["hot_water_text"]) - internal_dataset.append( - { - **epc_data, - "heating_kwh": heating_kwh, - "hot_water_kwh": hot_water_kwh, - "dataset_run_date": dataset_run_date - } - ) - - complete_data.extend(internal_dataset) - - df = pd.DataFrame(complete_data) - # Because we collate multiple runs into a single data source, it's possible that we have duplicated data at - # the uprn level, so we dedupe based on the newest dataset_run_date - - df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first") - df = df.drop(columns=["dataset_run_date"]) - - for col in PROBLEMATIC_COLUMNS: - df[col] = df[col].astype(str) - - # Save the data back to s3, but this time as a parquet file - save_dataframe_to_s3_parquet( - bucket_name="retrofit-data-dev", - file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet", - df=df - ) - - # We also estimate the energy consumption reduction from this data, by band - df["total_consumption"] = df["heating_kwh"] + df["hot_water_kwh"] - consumption_averages = df.groupby("current-energy-efficiency")["total_consumption"].mean().reset_index() - - # Save the consumption averages back to s3 - save_dataframe_to_s3_parquet( - bucket_name="retrofit-data-dev", - file_key=f"energy_consumption/{run_date}/consumption_averages.parquet", - df=consumption_averages - ) + kwh_data_client = KwhData(bucket="retrofit-datalake-dev") + kwh_data_client.combine()