Model/etl/bill_savings/EnergyConsumptionModel.py
2024-10-21 18:34:18 +01:00

512 lines
22 KiB
Python

import pandas as pd
import numpy as np
# from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error
from sklearn.feature_selection import RFECV
from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet, read_csv_from_s3
from utils.logger import setup_logger
from backend.Property import Property
logger = setup_logger()
class EnergyConsumptionModel:
FEATURES = {
"heating_kwh": [
"lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current",
"heating-cost-current", "heating-cost-potential", "total-floor-area", "number-heated-rooms",
"mainheat-description", "mainheat-energy-eff", "main-fuel", "secondheat-description", "property-type",
"built-form", "mainheatcont-description", "hotwater-description", "hot-water-energy-eff",
"walls-energy-eff",
"roof-energy-eff", "windows-description", "windows-energy-eff", "floor-description", "flat-top-storey",
"flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation",
"low-energy-lighting", "environment-impact-current", "energy-tariff",
"county", "construction-age-band", "co2-emissions-current",
],
"hot_water_kwh": [
"lodgement-year", "lodgement-month",
"current-energy-efficiency",
"energy-consumption-current",
"hot-water-cost-current",
"total-floor-area", "number-heated-rooms",
"hotwater-description", "hot-water-energy-eff", "main-fuel", "property-type", "built-form",
"co2-emissions-current",
]
}
TARGETS = ['heating_kwh', 'hot_water_kwh']
CATEGORICAL_COLUMNS = [
"lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms",
"number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form",
"construction-age-band", "secondheat-description", "hotwater-description", "hot-water-energy-eff",
"walls-description", "walls-energy-eff", "roof-description", "roof-energy-eff", "floor-description",
"county",
"windows-description", "windows-energy-eff", "flat-top-storey",
"flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation",
"low-energy-lighting", "environment-impact-current", "energy-tariff", "current-energy-rating"
]
retail_price_comparison = None
def __init__(
self, cleaned, model_paths=None, dummy_schema_path=None, consumption_average_path=None, n_jobs=1,
environment="dev"
):
self.cleaned = cleaned
self.models = {}
self.model_paths = model_paths or {}
self.n_jobs = n_jobs
self.environment = environment
self.data = None
self.input_data = None
self.dummy_columns = None
self.training_predictions = {}
self.testing_predictions = {}
self.best_iteration = {}
self.dummy_schema = None
self.x_train = {}
self.x_test = {}
self.x_val = {}
self.y_val = {}
self.y_train = {}
self.y_test = {}
self.selected_features = {}
self.NUMERICAL_COLUMNS = list({
x for x in self.FEATURES["heating_kwh"] + self.FEATURES["hot_water_kwh"]
if x not in self.CATEGORICAL_COLUMNS
})
# if model_paths:
# for target, path in model_paths.items():
# # Read model
# self.models[target] = read_pickle_from_s3(
# bucket_name=f"retrofit-model-directory-{environment}", s3_file_name=path
# )
# Read dummy schema
if dummy_schema_path:
self.dummy_schema = read_pickle_from_s3(
bucket_name=f"retrofit-model-directory-{environment}",
s3_file_name=dummy_schema_path
)
self.consumption_averages = None
if consumption_average_path:
self.consumption_averages = read_dataframe_from_s3_parquet(
bucket_name=f"retrofit-data-{environment}",
file_key=consumption_average_path
)
# We also retrieve the newest retail price comparison data which comes from Ofgem:
# https://www.ofgem.gov.uk/energy-data-and-research/data-portal/retail-market-indicators
# We use the detail price comparison by company and tariff type data
print("Reading retail price comparison - make sure this is up-to-date")
self.read_retail_price_comparison()
def read_retail_price_comparison(self):
data = read_csv_from_s3(
bucket_name=f"retrofit-data-{self.environment}",
filepath="energy_consumption/retail-price-comparison.csv"
)
header = ['Date', 'Average standard variable tariff (Large legacy suppliers)',
'Average standard variable tariff (Other suppliers)', 'Average fixed tariff',
'Cheapest tariff (Large legacy suppliers)', 'Cheapest tariff (All suppliers)',
'Cheapest tariff (Basket)', 'Default tariff cap level']
# Extract data rows
data_rows = []
for row in data[1:]:
date = row['\ufeff"']
values = row[None]
data_rows.append([date] + values)
self.retail_price_comparison = pd.DataFrame(data_rows, columns=header)
self.retail_price_comparison['Date'] = pd.to_datetime(self.retail_price_comparison['Date'], errors='coerce')
def read_dataset(self, file_path):
"""Reads the dataset from the specified file path."""
logger.info(f"Reading dataset from {file_path}")
self.data = read_dataframe_from_s3_parquet(bucket_name=f"retrofit-data-{self.environment}", file_key=file_path)
self.input_data = self.data.copy()
def feature_engineering(self, drop_first=False):
"""Performs feature engineering on the dataset."""
logger.info("Starting feature engineering")
self.data["lodgement-date"] = pd.to_datetime(self.data["lodgement-date"])
self.data["lodgement-year"] = self.data["lodgement-date"].dt.year
self.data["lodgement-month"] = self.data["lodgement-date"].dt.month
# For walls, roof, floor description where we have average thermal transmittance, to avoid too many categories
# we group them
ranges = {
"lessthan 0.1": (0, 0.1),
"0.1 - 0.3": (0.1, 0.3),
"0.3 - 0.5": (0.3, 0.5),
"morethan 0.5": (0.5, 2.5),
}
# Generate the lookup table
thermal_transmittance_lookup_table = []
for i in range(1, 251):
value = i / 100
for label, (low, high) in ranges.items():
if low < value <= high:
thermal_transmittance_lookup_table.append({"from": value, "to": label})
break
# Convert to DataFrame for display
thermal_transmittance_lookup_table = pd.DataFrame(thermal_transmittance_lookup_table)
thermal_transmittance_lookup_table["from"] = thermal_transmittance_lookup_table["from"].astype(str)
# Apply the lookup table to the data
for feature in ["walls-description", "roof-description", "floor-description"]:
cleaned_df = pd.DataFrame(self.cleaned[feature])[["original_description", "thermal_transmittance"]]
# Round to 2 decimal places and convert to string
cleaned_df["thermal_transmittance"] = cleaned_df["thermal_transmittance"].round(2).astype(str)
self.data = self.data.merge(
cleaned_df,
how="left",
left_on=feature,
right_on="original_description",
)
# We now have the thermal transmittance in the data, which we can use to group with the lookup table
self.data = self.data.merge(
thermal_transmittance_lookup_table,
how="left",
left_on="thermal_transmittance",
right_on="from",
)
# Where "to" is populated, replace feature with to
self.data[feature] = np.where(
~pd.isnull(self.data["to"]),
self.data["to"],
self.data[feature]
)
self.data = self.data.drop(columns=["original_description", "thermal_transmittance", "from", "to"])
# Convert data types
self.data[self.NUMERICAL_COLUMNS] = self.data[self.NUMERICAL_COLUMNS].apply(pd.to_numeric)
self.data[self.CATEGORICAL_COLUMNS] = self.data[self.CATEGORICAL_COLUMNS].astype(str)
# Convert categorical columns to dummies
self.data = pd.get_dummies(self.data, columns=self.CATEGORICAL_COLUMNS, drop_first=drop_first)
self.dummy_schema = self.data.columns.tolist()
# Store the dummy columns
self.dummy_columns = {}
for target in self.TARGETS:
target_features = self.FEATURES[target]
dummy_feature_columns = []
for feature in target_features:
if feature in self.CATEGORICAL_COLUMNS:
dummy_feature_columns.extend([col for col in self.data.columns if col.startswith(feature + '_')])
else:
dummy_feature_columns.append(feature)
self.dummy_columns[target] = dummy_feature_columns
logger.info("Feature engineering completed")
def split_dataset(self, target, test_size=0.2, validation_size=0.2, random_state=42):
"""Splits the dataset into training, validation, and testing sets."""
if target not in self.TARGETS:
raise ValueError(f"Target {target} not in {self.TARGETS}")
logger.info(f"Splitting dataset for target {target}")
# Split into train + validation and test sets
x_train_val, x_test, y_train_val, y_test = train_test_split(
self.data[self.dummy_columns[target]],
self.data[target],
test_size=test_size,
random_state=random_state
)
# Split train + validation into train and validation sets
x_train, x_val, y_train, y_val = train_test_split(
x_train_val,
y_train_val,
test_size=validation_size / (1 - test_size),
random_state=random_state
)
self.x_train[target], self.x_val[target], self.x_test[target] = x_train, x_val, x_test
self.y_train[target], self.y_val[target], self.y_test[target] = y_train, y_val, y_test
def feature_selection(self, target, cv_folds=3, sample_fraction=0.1, random_state=42):
"""
Performs feature selection using RFECV with XGBoost.
Parameters:
- target: The target variable for feature selection.
- cv_folds: Number of cross-validation folds.
- sample_fraction: Fraction of the data to use for feature selection.
- random_state: Random state for reproducibility.
"""
if target not in self.TARGETS:
raise ValueError(f"Target {target} not in {self.TARGETS}")
logger.info(f"Starting feature selection for target {target}")
# Sample the data if specified
if sample_fraction < 1.0:
x_sample, _, y_sample, _ = train_test_split(
self.x_train[target], self.y_train[target],
train_size=sample_fraction, random_state=random_state
)
else:
x_sample = self.x_train[target]
y_sample = self.y_train[target]
# Initialize the XGBoost model and RFECV
model = self.init_model(feature_selection=True)
selector = RFECV(
model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error', verbose=1, n_jobs=self.n_jobs
)
selector = selector.fit(x_sample, y_sample)
# Get the selected features
self.selected_features[target] = x_sample.columns[selector.support_]
# Update x_train, x_test and x_val with selected features
self.x_train[target] = self.x_train[target][self.selected_features[target]]
self.x_test[target] = self.x_test[target][self.selected_features[target]]
self.x_val[target] = self.x_val[target][self.selected_features[target]]
logger.info(f"Feature selection completed for target {target}")
# def init_model(self, feature_selection=False):
#
# if feature_selection:
# # Set up a smaller model to work it
# return XGBRegressor(
# objective='reg:squarederror',
# n_estimators=50,
# learning_rate=0.05,
# max_depth=6,
# subsample=0.8,
# colsample_bytree=0.8,
# reg_alpha=0.1,
# reg_lambda=0.1
# )
#
# return XGBRegressor(
# objective='reg:squarederror',
# n_estimators=1000,
# learning_rate=0.05,
# max_depth=6,
# min_child_weight=3,
# subsample=0.8,
# colsample_bytree=0.8,
# reg_alpha=0.1,
# reg_lambda=0.1
# # n_jobs=self.n_jobs
# )
def fit_model(self, target):
"""Fits the model to the training data and removes zero-importance features."""
logger.info(f"Fitting model for target {target}")
# Initialize and fit the model
model = self.init_model()
model.fit(
self.x_train[target],
self.y_train[target],
eval_set=[(self.x_val[target], self.y_val[target])],
early_stopping_rounds=50
)
# Store the model
self.models[target] = model
# Identify and remove zero-importance features
feature_importance = pd.DataFrame({
'Feature': self.x_train[target].columns,
'Importance': model.feature_importances_
})
zero_importance_features = feature_importance[feature_importance['Importance'] == 0]['Feature'].tolist()
if zero_importance_features:
logger.info(f"Removing zero-importance features for target {target}: {zero_importance_features}")
self.x_train[target] = self.x_train[target].drop(columns=zero_importance_features)
self.x_val[target] = self.x_val[target].drop(columns=zero_importance_features)
self.x_test[target] = self.x_test[target].drop(columns=zero_importance_features)
# Re-fit the model with the reduced feature set
model = self.init_model()
model.fit(
self.x_train[target],
self.y_train[target],
eval_set=[(self.x_val[target], self.y_val[target])],
early_stopping_rounds=50
)
# Update the model
self.models[target] = model
# Store the best iteration
self.best_iteration[target] = self.models[target].best_iteration
logger.info(f"Model fitting completed for target {target}")
def re_train_final_model(self, target):
"""Re-trains the final model on the combined training and validation set."""
logger.info(f"Re-training final model for target {target}")
x_train_val = pd.concat([self.x_train[target], self.x_val[target]])
y_train_val = pd.concat([self.y_train[target], self.y_val[target]])
self.models[target] = self.init_model()
self.models[target].fit(x_train_val, y_train_val, verbose=False)
logger.info(f"Re-training final model completed for target {target}")
def evaluate_model(self, target):
"""Evaluates the model on training and testing data."""
logger.info(f"Evaluating model for target {target}")
y_train_pred = self.models[target].predict(self.x_train[target])
train_mse = mean_squared_error(self.y_train[target], y_train_pred)
train_r2 = r2_score(self.y_train[target], y_train_pred)
train_mape = mean_absolute_percentage_error(self.y_train[target], y_train_pred)
self.training_predictions[target] = pd.DataFrame({
'Actual': self.y_train[target],
'Predicted': y_train_pred
})
self.training_predictions[target]["residual"] = abs(
self.training_predictions[target]["Actual"] - self.training_predictions[target]["Predicted"]
)
y_test_pred = self.models[target].predict(self.x_test[target])
test_mse = mean_squared_error(self.y_test[target], y_test_pred)
test_r2 = r2_score(self.y_test[target], y_test_pred)
test_mape = mean_absolute_percentage_error(self.y_test[target], y_test_pred)
self.testing_predictions[target] = pd.DataFrame({
'Actual': self.y_test[target],
'Predicted': y_test_pred
})
self.testing_predictions[target]["residual"] = abs(
self.testing_predictions[target]["Actual"] - self.testing_predictions[target]["Predicted"]
)
if target in self.selected_features:
feature_importance = pd.DataFrame({
'Feature': self.selected_features[target],
'Importance': self.models[target].feature_importances_
}).sort_values(by='Importance', ascending=False)
else:
feature_importance = pd.DataFrame({
'Feature': self.x_train[target].columns,
'Importance': self.models[target].feature_importances_
}).sort_values(by='Importance', ascending=False)
logger.info(f"Evaluation completed for target {target}")
return {
'train': {
'MSE': train_mse,
'R2': train_r2,
'MAPE': train_mape,
'Feature Importance': feature_importance
},
'test': {
'MSE': test_mse,
'R2': test_r2,
'MAPE': test_mape
}
}
def save_model(self, target, dataset_version):
"""Saves the model to S3."""
logger.info(f"Saving model for target {target}")
save_pickle_to_s3(
self.models[target],
bucket_name=f"retrofit-model-directory-{self.environment}",
s3_file_name=f"model_directory/energy_consumption_model/{target}_{dataset_version}.pkl"
)
def save_dummy_schema(self, dataset_version):
logger.info("Saving dummy schema for target {target}")
save_pickle_to_s3(
self.dummy_schema,
bucket_name=f"retrofit-model-directory-{self.environment}",
s3_file_name=f"model_directory/energy_consumption_model/{dataset_version}_dummy_schema.pkl"
)
def score_new_data(self, new_data, target):
"""Scores new data using the trained model."""
if target not in self.models:
raise ValueError(f"Model for target {target} not loaded or trained")
# Verify that self.data is None
if self.data is not None:
raise ValueError("self.data is not None. Ensure that self.data is reset before scoring new data.")
# Temporarily set self.data to new data
self.data = new_data.copy()
# Run feature engineering
self.feature_engineering(drop_first=False)
new_data_transformed = self.data.copy()
for col in self.dummy_schema:
if col not in new_data_transformed.columns:
new_data_transformed[col] = 0
new_data_transformed = new_data_transformed[self.dummy_schema]
missed_dummies = [c for c in self.models[target].feature_names_in_ if c not in new_data_transformed.columns]
zero_df = pd.DataFrame([dict(zip(missed_dummies, [0, ] * len(missed_dummies)))])
new_data_transformed = pd.concat([new_data_transformed, zero_df], axis=1)
# When we dummy in this case, we run with drop_first = False so we may end up with some of those
# first columns, we we'll need to dorp them
new_data_transformed = new_data_transformed[self.models[target].feature_names_in_]
# Generate predictions
prediction = self.models[target].predict(new_data_transformed)
# Reset self.data to None
self.data = None
return prediction
@staticmethod
def calculate_percentage_decrease(start_efficiency, end_efficiency, consumption_averages):
start_consumption = consumption_averages.loc[
consumption_averages["current-energy-efficiency"].astype(str) == str(start_efficiency), "total_consumption"
].values[0]
end_consumption = consumption_averages.loc[
consumption_averages["current-energy-efficiency"].astype(str) == str(end_efficiency), "total_consumption"
].values[0]
percentage_decrease = ((start_consumption - end_consumption) / start_consumption) * 100
# percentage_decrease cannot be nehative
if percentage_decrease < 0:
percentage_decrease = 0
return percentage_decrease
def estimate_new_consumption(self, current_energy_efficiency, target_efficiency, current_consumption):
"""
Given then consumption_averages dataset, which is produced as a result of the training_data.py script,
for the energy kwh models, this function will estimate the new consumption based on the current consumption,
based on the expected reduction in consumption from the current rating to the target rating.
:param current_energy_efficiency:
:param target_efficiency:
:param current_consumption:
:return:
"""
percentage_decrease = self.calculate_percentage_decrease(
start_efficiency=current_energy_efficiency,
end_efficiency=target_efficiency,
consumption_averages=self.consumption_averages
)
new_consumption = current_consumption * (1 - percentage_decrease / 100)
return new_consumption