setting up energy consumption model class

This commit is contained in:
Khalim Conn-Kowlessar 2024-07-02 14:59:16 +01:00
parent 4bcd17596e
commit dd0deab0ee
4 changed files with 218 additions and 4 deletions

View file

@ -0,0 +1,89 @@
import pandas as pd
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from utils.s3 import save_pickle_to_s3, read_pickle_from_s3
class EnergyConsumptionModel:
FEATURES = ['feature_1', 'feature_2']
TARGETS = ['heating_kwh', 'hot_water_kwh']
def __init__(self, model_paths=None):
self.models = {}
self.model_paths = model_paths or {}
self.data = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
if model_paths:
for target, path in model_paths.items():
self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path)
def read_dataset(self, file_path):
self.data = pd.read_csv(file_path)
def feature_engineering(self):
# Example feature engineering steps
self.data['feature_1'] = self.data['original_feature_1'] ** 2
self.data['feature_2'] = self.data['original_feature_2'] ** 0.5
# Add more feature engineering steps as required
def split_dataset(self, target, test_size=0.2, random_state=42):
X = self.data[self.FEATURES]
y = self.data[target]
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
def fit_model(self, target):
self.models[target] = LinearRegression()
self.models[target].fit(self.X_train, self.y_train)
def evaluate_model(self, target):
y_pred = self.models[target].predict(self.X_test)
mse = mean_squared_error(self.y_test, y_pred)
r2 = r2_score(self.y_test, y_pred)
return {'MSE': mse, 'R2': r2}
def save_model(self, target):
run_date = datetime.now().strftime("%Y-%m-%d")
save_pickle_to_s3(
self.models[target],
bucket_name="retrofit-model-directory-dev",
s3_file_name=f"model_directory/energy_consumption_model/{target}_{run_date}.pkl"
)
def score_new_data(self, new_data, target):
if target not in self.models:
raise ValueError(f"Model for target {target} not loaded or trained")
new_data_transformed = self.transform_new_data(new_data)
return self.models[target].predict(new_data_transformed)
def transform_new_data(self, new_data):
# Apply the same transformations as in feature_engineering
new_data['feature_1'] = new_data['original_feature_1'] ** 2
new_data['feature_2'] = new_data['original_feature_2'] ** 0.5
return new_data[self.FEATURES]
# Example usage:
# model = EnergyConsumptionModel()
# model.read_dataset('/mnt/data/energy_consumption_dataset.csv')
# model.feature_engineering()
# For heating_kwh
# model.split_dataset(target='heating_kwh')
# model.fit_model(target='heating_kwh')
# print(model.evaluate_model(target='heating_kwh'))
# model.save_model(target='heating_kwh')
# For hot_water_kwh
# model.split_dataset(target='hot_water_kwh')
# model.fit_model(target='hot_water_kwh')
# print(model.evaluate_model(target='hot_water_kwh'))
# model.save_model(target='hot_water_kwh')

View file

@ -0,0 +1,94 @@
import re
from datetime import datetime
import pandas as pd
from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet
# These columns we co-erce to strings before saving
PROBLEMATIC_COLUMNS = ["main-heating-controls"]
def extract_kwh_value(text):
"""
Extract the numerical kWh value from a given string.
:param text: The input string containing the kWh value.
:return: The extracted numerical kWh value as an integer.
"""
# Use regular expression to find the numerical value followed by "kWh per year"
match = re.search(r'([\d,]+) kWh per year', text)
if match:
# Remove commas from the extracted value and convert to integer
kwh_value = int(match.group(1).replace(',', ''))
return kwh_value
else:
# If no match is found, return None or raise an exception
return None
def app():
"""
Given the files written in our datalake in s3, this application will collate the data into a single file
and store it back in s3 for analysis
:return:
"""
# Firstly, list all of the saved files in s3
data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data")
run_date = datetime.now().strftime("%Y-%m-%d")
complete_data = []
for files in data_files:
dataset_run_date = files.split("/")[-1].split(".")[0]
# Extract the date from the file name
dataset_run_date = pd.Timestamp(dataset_run_date)
# Load the data from the file
data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files)
# We check that the retrieved energy consumption sufficiently matches the EPC data
internal_dataset = []
for x in data:
epc_data = x["epc"]
epc_sap = epc_data["current-energy-efficiency"]
epc_potential_sap = epc_data["potential-energy-efficiency"]
# Make sure this matches the extracted sap
if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int(
x["potential_epc_efficiency"]
):
continue
heating_kwh = extract_kwh_value(x["heating_text"])
hot_water_kwh = extract_kwh_value(x["hot_water_text"])
internal_dataset.append(
{
**epc_data,
"heating_kwh": heating_kwh,
"hot_water_kwh": hot_water_kwh,
"dataset_run_date": dataset_run_date
}
)
complete_data.extend(internal_dataset)
df = pd.DataFrame(complete_data)
# Because we collate multiple runs into a single data source, it's possible that we have duplicated data at
# the uprn level, so we dedupe based on the newest dataset_run_date
df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first")
df = df.drop(columns=["dataset_run_date"])
for col in PROBLEMATIC_COLUMNS:
df[col] = df[col].astype(str)
# Save the data back to s3, but this time as a parquet file
save_dataframe_to_s3_parquet(
bucket_name="retrofit-data-dev",
file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet",
df=df
)
df.to_csv("energy_consumption_dataset.csv", index=False)

View file

@ -131,7 +131,8 @@ def app():
sample_size = 100
energy_consumption_data = []
for directory in tqdm(epc_directories):
for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
@ -147,8 +148,8 @@ def app():
collected_data = []
for _, property_data in data.iterrows():
# Sleep for a random time between 0.1 and 1.5 seconds
time.sleep(np.random.uniform(0.1, 1.5))
# Sleep for a random time between 0.1 and 1.4 seconds
time.sleep(np.random.uniform(0.1, 1.4))
uprn = int(property_data["uprn"])
address = property_data["address1"]
@ -167,7 +168,7 @@ def app():
{
**response,
"epc": property_data.to_dict(),
"epc_directory": directory
"epc_directory": str(directory)
}
)

View file

@ -246,3 +246,33 @@ def read_csv_from_s3(bucket_name, filepath):
data = list(reader)
return data
def list_files_in_s3_folder(bucket_name, folder_name):
"""
List all files in a given folder in an S3 bucket.
:param bucket_name: The name of the S3 bucket.
:param folder_name: The folder name within the S3 bucket.
:return: A list of file keys in the specified S3 folder.
"""
try:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
if 'Contents' not in response:
logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.")
return []
file_keys = [content['Key'] for content in response['Contents']]
return file_keys
except NoCredentialsError:
logger.error("Credentials not available.")
return []
except PartialCredentialsError:
logger.error("Incomplete credentials provided.")
return []
except Exception as e:
logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []