Model/model_data/analysis/SapModel.py
2023-07-20 12:24:34 +01:00

655 lines
24 KiB
Python

import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import pickle
from typing import Any, Dict, Tuple, Optional, List
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
median_absolute_error, mean_absolute_percentage_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from model_data.EpcClean import EpcClean
from statsmodels.stats.outliers_influence import variance_inflation_factor
from tqdm import tqdm
from utils.logger import setup_logger
logger = setup_logger()
# with open("all_data.pkl", "rb") as f:
# all_data = pickle.load(f)
class SapModel:
# We want to estimate for making improvements on different property components
RESPONSE = "current-energy-efficiency"
# We could potentially build models by constituency to avoid having too many
# features in the model
BASE_FEATURES = [
"property-type",
"built-form",
"construction-age-band",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"transaction-type"
]
COMPONENT_FEATURES = [
"walls-description",
"floor-description",
"lighting-description",
"roof-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"photo-supply",
"windows-description",
"glazed-type",
"glazed-area",
"multi-glaze-proportion",
# "lighting-description" # Might not need to use this
"low-energy-lighting",
"number-open-fireplaces",
"mainheatcont-description",
"fixed-lighting-outlets-count",
"floor-height",
"floor-level",
"total-floor-area",
"extension-count",
]
CATEGORICAL_COLS = [
"property-type",
"built-form",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"windows-description",
"glazed-type",
"glazed-area",
"construction-age-band",
"lighting-description",
"mainheatcont-description",
"floor-level",
]
NUMERICAL_COLUMNS = [
"photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces",
"fixed-lighting-outlets-count",
"floor-height",
"total-floor-area",
"extension-count",
]
# For the moment, we store records of the best performing models as a benchmark for future imporvements
BEST_FIT = {
'MAPE': 0.04646530042225876, 'Mean Squared Error': 18.635209563729763,
'Mean Absolute Error': 2.856347408023325, 'R2 Score': 0.800701753826118,
'Explained Variance Score': 0.800701753826118, 'Median Absolute Error': 1.9026758012120197
}
BEST_PREDICT = {
'MAPE': 0.04346083528432316, 'Mean Squared Error': 21.16036509335514,
'Mean Absolute Error': 3.0440540802375833, 'R2 Score': 0.7219965012634312,
'Explained Variance Score': 0.7220620137390414, 'Median Absolute Error': 1.9031967986967828
}
BEST_FINAL = {
'MAPE': 0.04841470773386795, 'Mean Squared Error': 21.323052316630914, 'Mean Absolute Error': 2.988547998636157,
'R2 Score': 0.7633662459299112, 'Explained Variance Score': 0.7633785339028832,
'Median Absolute Error': 1.9487883489495985
}
BUCKET_VARIABLES = [
"number-open-fireplaces", "fixed-lighting-outlets-count", 'extension-count', 'multi-glaze-proportion'
]
def __init__(
self, data: List[Dict],
cleaner: EpcClean,
test_size: Optional[float] = 0.2,
random_state: Optional[int] = None
):
self.df = pd.DataFrame(data)
self.cleaner = cleaner
self.random_state = random_state if random_state is not None else 42
self.test_size = 0.2 if test_size is None else test_size
self.model_data = None
self.train_x = None
self.train_y = None
self.test_x = None
self.test_y = None
self.test_model = None
self.final_model = None
self.fit_error = None
self.predict_error = None
self.final_error = None
self.worst = {
"fit_errors": pd.DataFrame(),
"prediction_errors": pd.DataFrame(),
"fit_x": pd.DataFrame(),
"prediction_x": pd.DataFrame(),
"final_errors": pd.DataFrame(),
"final_x": pd.DataFrame(),
}
self.fit_df = None
self.predict_df = None
self.final_fit_df = None
self.diagnosis = {}
def run(self, plot: bool = False) -> None:
"""
A pipeline method to run all necessary methods in correct order.
:param plot: Boolean to indicate whether to plot the regression
"""
try:
self.create_dataset()
self.fit_model()
if plot:
self.plot_regression(self.fit_df)
except Exception as e:
logger.error("An error occurred during execution.")
logger.error(str(e))
def _merge_with_u_values(
self, model_data: pd.DataFrame, description: str, thermal_transmittance: str
) -> pd.DataFrame:
"""
Utility function to merge u value data with model data
:param model_data: Pandas dataframe which is the main modelling dataset
:param description: Name of the description column for which we're merging u-values onto
:param thermal_transmittance: Name of the thermal transmittance column
:return:
"""
u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[
["original_description", thermal_transmittance]].rename(
columns={thermal_transmittance: f"{description}_u_value"}
)
model_data = model_data.merge(
u_values,
how="left",
left_on=f"{description}-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame:
"""
Appends cleaned data into the model data.
:param model_data: Original model data.
:return: Model data with cleaned data appended.
"""
for description in ["walls", "floor", "roof"]:
model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance")
# lighting_proportions added separately as it doesn't use the _merge_with_u_values method
lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[
["original_description", "low_energy_proportion"]]
model_data = model_data.merge(
lighting_proportions,
how="left",
left_on="lighting-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
@staticmethod
def _convert_transaction_type(model_data: pd.DataFrame) -> pd.DataFrame:
"""
Converts transaction type to boolean
:param model_data: Model data with transaction type.
:return: Model data with converted transaction type.
"""
model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling"
model_data = model_data.drop(columns=["transaction-type"])
return model_data
@staticmethod
def bucket_and_fill(df: pd.DataFrame, column_name: str, n_bins: int = 10) -> pd.DataFrame:
"""
Simple utility function to bucket up features into bins and then fill any missing values with "NO_RECORD"
:param df: Dataframe of features to be binned
:param column_name: Name of the column to be binned
:param n_bins: Number of bins to use
:return: Dataframe with binned column
"""
# Check if the column is numerical
if np.issubdtype(df[column_name].dtype, np.number):
# Create a new categorical column from numerical one by binning the data
df[column_name + "_bucket"] = pd.cut(df[column_name], bins=n_bins).astype(str)
# Replace missing data with "NO_RECORD"
df[column_name + "_bucket"] = df[column_name + "_bucket"].fillna("NO_RECORD")
df[column_name + "_bucket"] = np.where(
df[column_name + "_bucket"] == "nan",
"NO_RECORD",
df[column_name + "_bucket"]
)
return df
def _clean_numericals(self, model_data):
# Try binning numericals
remaining_numericals = [x for x in self.NUMERICAL_COLUMNS if x not in self.BUCKET_VARIABLES]
for col in self.BUCKET_VARIABLES:
model_data[col] = pd.to_numeric(model_data[col], errors='coerce')
# If all values are missing, set all values to 0 - this column will get dropped
if all(pd.isnull(model_data[col])):
model_data[col + "_bucket"] = "NO_RECORD"
continue
model_data = self.bucket_and_fill(model_data, col)
# Replace the data with the binned version
model_data = model_data.drop(columns=self.BUCKET_VARIABLES)
model_data = model_data.rename(
columns=dict(zip([c + "_bucket" for c in self.BUCKET_VARIABLES], self.BUCKET_VARIABLES))
)
# Basic fill the rest of the columns with 0 - currenrtly this provided the best performance
for col in remaining_numericals:
model_data[col] = np.where(
model_data[col] == "", "0", model_data[col]
).astype(float)
return model_data
@staticmethod
def clean_missings(model_data: pd.DataFrame) -> pd.DataFrame:
"""
Fills categorical missing data with sensible values
:param model_data: Original model data.
:return: Model data with cleaned categorical data.
"""
# Cleaning of energy-tariff and construction-age-band hurt prediction performance, indicating there is
# potentially
# a notable difference between a "" missing and a "NO DATA!" missing, worth differentiating
model_data["mechanical-ventilation"] = np.where(
model_data["mechanical-ventilation"] == "", "NO DATA!", model_data["mechanical-ventilation"]
)
model_data["solar-water-heating-flag"] = np.where(
model_data["solar-water-heating-flag"] == "", "N", model_data["solar-water-heating-flag"]
)
model_data["glazed-type"] = np.where(
model_data["glazed-type"] == "", "NO DATA!", model_data["glazed-type"]
)
model_data["glazed-area"] = np.where(
model_data["glazed-area"] == "", "NO DATA!", model_data["glazed-type"]
)
return model_data
def create_dataset(self):
logger.info("Creating modelling dataset")
model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES]
model_data = model_data.reset_index(drop=True)
model_data["idx"] = model_data.index.copy()
# Append on u-values
model_data = self._append_cleaned_data(model_data)
model_data = self.clean_missings(model_data)
# Convert transaction_type
model_data = self._convert_transaction_type(model_data)
# Clean numerical columns
model_data = self._clean_numericals(model_data)
# Take just entries with U-values
# TODO: Rather than doing this, do we want to include the estimated u-values?
# Since this ends up with just 2k entries
model_data = model_data[
~pd.isnull(model_data["walls_u_value"]) &
~pd.isnull(model_data["floor_u_value"]) &
~pd.isnull(model_data["roof_u_value"])
]
exclude_features = [
"walls-description", "floor-description", "roof-description", "transaction-type"
]
features = [
x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [
"walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE, "idx", "is_rdsap"
] if x not in exclude_features
]
model_data = model_data[features]
for col in self.CATEGORICAL_COLS:
model_data[col] = model_data[col].astype('category')
# Convert response
model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float)
self.model_data = model_data
def make_training_test(self, x):
# Split into training and test
self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(
x.drop(self.RESPONSE, axis=1),
x[self.RESPONSE],
test_size=self.test_size,
random_state=self.random_state
)
@staticmethod
def remove_zero_std_cols(train_x, test_x=None, threshold=1e-3):
"""
Utility function to remove columns that have zero standard deviation from both test and train sets
:param train_x: Training data to remove columns from
:param test_x: If provided, remove the same columns from the test data
:param threshold: float value, if the standard deviation is below this threshold, the column is considered
to have zero standard deviation
:return: Tuple of train_x and test_x (if provided). If test_x is not provided, a null placeholder is returned
"""
# Compute standard deviations
std_devs = train_x.std()
# Find columns with zero or near-zero standard deviation
zero_std_cols = std_devs[std_devs <= threshold].index
# Drop these columns from the training data
train_x = train_x.drop(zero_std_cols, axis=1)
if test_x is not None:
# Ensure the test data has the same columns
test_x = test_x[train_x.columns]
return train_x, test_x
return train_x, None
def fit_model(self):
"""
Main function to fit the model and produce accuracy metrics
"""
x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS + self.BUCKET_VARIABLES, drop_first=True)
# Convert booleans to integer
for col in x.columns:
if x[col].dtype == bool:
x[col] = x[col].astype(int)
if x[col].dtype == object:
x[col] = x[col].astype(float)
# Create the training and test sets for each run
self.make_training_test(x)
self.train_x, self.test_x = self.remove_zero_std_cols(self.train_x, self.test_x)
logger.info("Detecting multi-collinearity in training dataset")
self.detect_multi_collinearity()
# Add a constant to the independent value
train_x = sm.add_constant(self.train_x)
test_x = sm.add_constant(self.test_x)
train_idx = train_x["idx"].copy()
test_idx = self.test_x["idx"].copy()
train_x = train_x.drop(columns=["idx"])
test_x = test_x.drop(columns=["idx"])
logger.info("Fitting testing model")
# make regression model
model = sm.OLS(self.train_y, train_x)
# fit model and print results
self.test_model = model.fit()
train_predictions = self.test_model.fittedvalues
test_predictions = self.test_model.predict(test_x)
self.fit_error, self.worst["fit_errors"] = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=train_predictions
)
# Predict on new data
self.predict_error, self.worst["prediction_errors"] = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=test_predictions
)
fit_success = self.check_successes(self.fit_error, self.BEST_FIT)
predict_success = self.check_successes(self.predict_error, self.BEST_PREDICT)
self.model_data['fit'] = self.test_model.fittedvalues
# The worst errors over index heavily for flats
self.worst["fit_x"] = self.model_data[self.model_data.index.isin(self.worst["fit_errors"].index)]
self.worst["prediction_x"] = self.model_data[self.model_data.index.isin(self.worst["prediction_errors"].index)]
self.fit_df = pd.DataFrame(
{
"fit": train_predictions,
"actual": self.train_y,
"idx": train_idx
}
).sort_values("actual", ascending=True)
self.predict_df = pd.DataFrame(
{
"predictions": test_predictions,
"actual": self.test_y,
"idx": test_idx
}
)
self.diagnosis = {
"fit_success": fit_success,
"predict_success": predict_success,
"summary": self.test_model.summary()
}
# We're now ready to fit the final model
# For the momeent, the pre-processing at the top of this function merely removes columns, so we
# just need to remove the columns that were removed from the training data from the final model
logger.info("Fitting final model")
x = sm.add_constant(x)
y = x[self.RESPONSE]
x = x[self.train_x.columns]
idx = x["idx"].copy()
x = x.drop(columns=["idx"])
final_model = sm.OLS(y, x)
# fit model and print results
self.final_model = final_model.fit()
final_predictions = self.final_model.fittedvalues
self.final_error, self.worst["final_errors"] = self.calculate_regression_metrics(
y_true=y, y_pred=final_predictions
)
self.final_fit_df = pd.DataFrame(
{
"fit": final_predictions,
"actual": y,
"idx": idx
}
).sort_values("actual", ascending=True)
@staticmethod
def check_successes(experiment_error, best_error):
"""
Simple function to check if the experiment error is better than the best error
:param experiment_error: output of calculate_regression_metrics() on the experiment
:param best_error: Current benchmark best error
:return:
"""
successes = []
for k in experiment_error:
if k in ["Explained Variance Score", "R2 Score"]:
# We want to maximise this so we want experiment error to be higher
successes.append(
{
"measure": k,
"success": experiment_error[k] >= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
continue
successes.append(
{
"measure": k,
"success": experiment_error[k] <= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
return pd.DataFrame(successes)
def rf_importance(self, train_x, train_y, test_x, test_y):
"""
Utility function to estimate feature importance using a random forest
This is useful to get a sense of some of the key features which are driving model
performance
:param train_x: Training data covariates to build the importance model on
:param train_y: Training data response to build the importance model on
:param test_x: Test data covariates to build the permutation importance model on
:param test_y: Test data response to build the permutation importance model on
:return: Pandas dataframe of feature importances, ranked by most important to least
"""
rf = RandomForestRegressor(random_state=self.random_state)
rf.fit(train_x, train_y)
# Print the name and importance of each feature
rf_importance_df = []
for feature, importance in zip(train_x.columns, rf.feature_importances_):
rf_importance_df.append(
{
"Feature": feature,
"rf_importance": importance
}
)
rf_importance_df = pd.DataFrame(rf_importance_df)
rf_importance_df = rf_importance_df.sort_values(by="rf_importance", ascending=False)
perm_importance = self.permuation_importance(rf, test_x, test_y)
return rf_importance_df, perm_importance
@staticmethod
def permuation_importance(rf, test_x, test_y):
"""
Simple utility function to produce permutation importance for a given model\
:param rf: Random forest model to calculate permutation importance for
:param test_x: Test covariates to be used for permutation importance
:param test_y: Test response to be used for permutation importance
:return:
"""
perm_importance = permutation_importance(rf, test_x, test_y, scoring='neg_mean_squared_error')
perm_importance_df = pd.DataFrame(
{
"Feature": test_x.columns,
"perm_importance": perm_importance.importances_mean
}
).sort_values(by="perm_importance", ascending=False)
return perm_importance_df
def detect_multi_collinearity(self):
# Get the VIFs for each variable
vifs = pd.DataFrame()
vifs["features"] = self.train_x.columns
vifs["vif"] = [variance_inflation_factor(self.train_x.values, i) for i in tqdm(range(self.train_x.shape[1]))]
# Get the features with the highest VIF
vifs = vifs.sort_values("vif", ascending=False)
# There are some features, we do not want to remove
required_features = [
"walls_u_value", "floor_u_value", "roof_u_value", "idx", "is_rdsap"
]
vifs = vifs[~vifs["features"].isin(required_features)]
drop_vifs = vifs[np.isinf(vifs["vif"])]
# Acceptable drop variables:
# main-fuel_Gas: mains gas
# glazed-type_NO DATA!
# glazed-area_NO DATA!
self.train_x = self.train_x.drop(columns=drop_vifs["features"].values)
self.test_x = self.test_x[self.train_x.columns]
@staticmethod
def plot_regression(df):
# Extract the "fit" and "actual" columns from the dataframe
fit = df['fit']
actual = df['actual']
# Create an array of x-values (assumed to be sequential integers)
x = np.arange(len(df))
# Plot the fit and actual data
plt.plot(x, fit, color='red', label='Fit')
plt.plot(x, actual, color='blue', label='Actual')
# Set labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Linear Regression - Fit vs Actual')
# Display legend
plt.legend()
# Show the plot
plt.show()
@staticmethod
def calculate_regression_metrics(y_true, y_pred, n=20):
"""
Calculate the 5 most important accuracy metrics for regression.
Args:
y_true (array-like): Array of true target values.
y_pred (array-like): Array of predicted target values.
Returns:
dict: Dictionary containing the calculated metrics.
"""
metrics = {
'MAPE': mean_absolute_percentage_error(y_true, y_pred),
'Mean Squared Error': mean_squared_error(y_true, y_pred),
'Mean Absolute Error': mean_absolute_error(y_true, y_pred),
'R2 Score': r2_score(y_true, y_pred),
'Explained Variance Score': explained_variance_score(y_true, y_pred),
'Median Absolute Error': median_absolute_error(y_true, y_pred)
}
errors = pd.DataFrame()
errors['Fit'] = y_true
errors['Actual'] = y_pred
errors['Residual'] = errors['Actual'] - errors['Fit']
errors['Absolute Residual'] = np.abs(errors['Residual'])
worst_errors = errors.nlargest(n, 'Absolute Residual')
return metrics, worst_errors