created KwhData class

This commit is contained in:
Khalim Conn-Kowlessar 2024-08-09 10:38:40 +01:00
parent 66d2a401e8
commit 3002a2c740
2 changed files with 121 additions and 95 deletions

View file

@ -0,0 +1,118 @@
import re
import pandas as pd
from datetime import datetime
from tqdm import tqdm
from utils.logger import setup_logger
from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet
logger = setup_logger()
class KwhData:
COLS_TO_STRINGIFY = ["main-heating-controls", "floor-level"]
def __init__(self, bucket):
self.run_date = datetime.now().strftime("%Y-%m-%d")
self.bucket = bucket
self.data = None
self.consumption_data_filepath = None
self.consumption_averages_filepath = None
@staticmethod
def extract_kwh_value(text: str):
"""
Extract the numerical kWh value from a given string.
:param text: The input string containing the kWh value.
:return: The extracted numerical kWh value as an integer.
"""
# Use regular expression to find the numerical value followed by "kWh per year"
match = re.search(r'([\d,]+) kWh per year', text)
if match:
# Remove commas from the extracted value and convert to integer
kwh_value = int(match.group(1).replace(',', ''))
return kwh_value
else:
# If no match is found, return None or raise an exception
return None
def combine(self):
"""
Given the data that is collected containing the kwh values for heating and hot water, this method will combine
and save the data
:return:
"""
# Firstly, list all of the saved files in s3
data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data")
complete_data = []
for files in tqdm(data_files):
dataset_run_date = files.split("/")[-1].split(".")[0]
# Extract the date from the file name
dataset_run_date = pd.Timestamp(dataset_run_date)
# Load the data from the file
data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files)
# We check that the retrieved energy consumption sufficiently matches the EPC data
internal_dataset = []
for x in data:
epc_data = x["epc"]
epc_sap = epc_data["current-energy-efficiency"]
epc_potential_sap = epc_data["potential-energy-efficiency"]
# Make sure this matches the extracted sap
if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int(
x["potential_epc_efficiency"]
):
continue
heating_kwh = self.extract_kwh_value(x["heating_text"])
hot_water_kwh = self.extract_kwh_value(x["hot_water_text"])
internal_dataset.append(
{
**epc_data,
"heating_kwh": heating_kwh,
"hot_water_kwh": hot_water_kwh,
"dataset_run_date": dataset_run_date
}
)
complete_data.extend(internal_dataset)
df = pd.DataFrame(complete_data)
# Because we collate multiple runs into a single data source, it's possible that we have duplicated data at
# the uprn level, so we dedupe based on the newest dataset_run_date
df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first")
df = df.drop(columns=["dataset_run_date"])
for col in self.COLS_TO_STRINGIFY:
df[col] = df[col].astype(str)
# Save the data back to s3, but this time as a parquet file
self.consumption_data_filepath = f"energy_consumption/{self.run_date}/energy_consumption_dataset.parquet"
logger.info(f"Storing energy consumption dataset in s3 at {self.consumption_data_filepath}")
save_dataframe_to_s3_parquet(
bucket_name=self.bucket,
file_key=self.consumption_data_filepath,
df=df
)
# We also estimate the energy consumption reduction from this data, by band
df["total_consumption"] = df["heating_kwh"] + df["hot_water_kwh"]
consumption_averages = df.groupby("current-energy-efficiency")["total_consumption"].mean().reset_index()
df = df.drop(columns=["total_consumption"])
self.consumption_averages_filepath = f"energy_consumption/{self.run_date}/consumption_averages.parquet"
logger.info(f"Storing consumption averages in s3 at {self.consumption_averages_filepath}")
# Save the consumption averages back to s3
save_dataframe_to_s3_parquet(
bucket_name="retrofit-data-dev",
file_key=self.consumption_averages_filepath,
df=consumption_averages
)
self.data = df

View file

@ -1,32 +1,4 @@
import re
from datetime import datetime
from tqdm import tqdm
import pandas as pd
from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet
# These columns we co-erce to strings before saving
PROBLEMATIC_COLUMNS = ["main-heating-controls", "floor-level"]
def extract_kwh_value(text):
"""
Extract the numerical kWh value from a given string.
:param text: The input string containing the kWh value.
:return: The extracted numerical kWh value as an integer.
"""
# Use regular expression to find the numerical value followed by "kWh per year"
match = re.search(r'([\d,]+) kWh per year', text)
if match:
# Remove commas from the extracted value and convert to integer
kwh_value = int(match.group(1).replace(',', ''))
return kwh_value
else:
# If no match is found, return None or raise an exception
return None
from etl.bill_savings.KwhData import KwhData
def app():
@ -36,69 +8,5 @@ def app():
:return:
"""
# Firstly, list all of the saved files in s3
data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data")
run_date = datetime.now().strftime("%Y-%m-%d")
complete_data = []
for files in tqdm(data_files):
dataset_run_date = files.split("/")[-1].split(".")[0]
# Extract the date from the file name
dataset_run_date = pd.Timestamp(dataset_run_date)
# Load the data from the file
data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files)
# We check that the retrieved energy consumption sufficiently matches the EPC data
internal_dataset = []
for x in data:
epc_data = x["epc"]
epc_sap = epc_data["current-energy-efficiency"]
epc_potential_sap = epc_data["potential-energy-efficiency"]
# Make sure this matches the extracted sap
if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int(
x["potential_epc_efficiency"]
):
continue
heating_kwh = extract_kwh_value(x["heating_text"])
hot_water_kwh = extract_kwh_value(x["hot_water_text"])
internal_dataset.append(
{
**epc_data,
"heating_kwh": heating_kwh,
"hot_water_kwh": hot_water_kwh,
"dataset_run_date": dataset_run_date
}
)
complete_data.extend(internal_dataset)
df = pd.DataFrame(complete_data)
# Because we collate multiple runs into a single data source, it's possible that we have duplicated data at
# the uprn level, so we dedupe based on the newest dataset_run_date
df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first")
df = df.drop(columns=["dataset_run_date"])
for col in PROBLEMATIC_COLUMNS:
df[col] = df[col].astype(str)
# Save the data back to s3, but this time as a parquet file
save_dataframe_to_s3_parquet(
bucket_name="retrofit-data-dev",
file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet",
df=df
)
# We also estimate the energy consumption reduction from this data, by band
df["total_consumption"] = df["heating_kwh"] + df["hot_water_kwh"]
consumption_averages = df.groupby("current-energy-efficiency")["total_consumption"].mean().reset_index()
# Save the consumption averages back to s3
save_dataframe_to_s3_parquet(
bucket_name="retrofit-data-dev",
file_key=f"energy_consumption/{run_date}/consumption_averages.parquet",
df=consumption_averages
)
kwh_data_client = KwhData(bucket="retrofit-datalake-dev")
kwh_data_client.combine()