Model/model_data/analysis/SapModel.py
Khalim Conn-Kowlessar 3e5d6cc4b4 added SapModel to app
2023-07-21 17:02:23 +01:00

650 lines
24 KiB
Python

import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
from typing import Dict, Optional, List
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
median_absolute_error, mean_absolute_percentage_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from model_data.EpcClean import EpcClean
from statsmodels.stats.outliers_influence import variance_inflation_factor
from tqdm import tqdm
from utils.logger import setup_logger
logger = setup_logger()
class SapModel:
# We want to estimate for making improvements on different property components
RESPONSE = "current-energy-efficiency"
# We could potentially build models by constituency to avoid having too many
# features in the model
BASE_FEATURES = [
"property-type",
"built-form",
"construction-age-band",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"transaction-type"
]
COMPONENT_FEATURES = [
"walls-description",
"floor-description",
"lighting-description",
"roof-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"photo-supply",
"windows-description",
"glazed-type",
"glazed-area",
"multi-glaze-proportion",
# "lighting-description" # Might not need to use this
"low-energy-lighting",
"number-open-fireplaces",
"mainheatcont-description",
"fixed-lighting-outlets-count",
"floor-height",
"floor-level",
"total-floor-area",
"extension-count",
]
CATEGORICAL_COLS = [
"property-type",
"built-form",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"windows-description",
"glazed-type",
"glazed-area",
"construction-age-band",
"lighting-description",
"mainheatcont-description",
"floor-level",
]
NUMERICAL_COLUMNS = [
"photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces",
"fixed-lighting-outlets-count",
"floor-height",
"total-floor-area",
"extension-count",
]
# For the moment, we store records of the best performing models as a benchmark for future imporvements
BEST_FIT = {
'MAPE': 0.04646530042225876, 'Mean Squared Error': 18.635209563729763,
'Mean Absolute Error': 2.856347408023325, 'R2 Score': 0.800701753826118,
'Explained Variance Score': 0.800701753826118, 'Median Absolute Error': 1.9026758012120197
}
BEST_PREDICT = {
'MAPE': 0.04346083528432316, 'Mean Squared Error': 21.16036509335514,
'Mean Absolute Error': 3.0440540802375833, 'R2 Score': 0.7219965012634312,
'Explained Variance Score': 0.7220620137390414, 'Median Absolute Error': 1.9031967986967828
}
BEST_FINAL = {
'MAPE': 0.04841470773386795, 'Mean Squared Error': 21.323052316630914, 'Mean Absolute Error': 2.988547998636157,
'R2 Score': 0.7633662459299112, 'Explained Variance Score': 0.7633785339028832,
'Median Absolute Error': 1.9487883489495985
}
BUCKET_VARIABLES = [
"number-open-fireplaces", "fixed-lighting-outlets-count", 'extension-count', 'multi-glaze-proportion'
]
def __init__(
self, data: List[Dict],
cleaner: EpcClean,
test_size: Optional[float] = 0.2,
random_state: Optional[int] = None
):
self.df = pd.DataFrame(data)
self.cleaner = cleaner
self.random_state = random_state if random_state is not None else 42
self.test_size = 0.2 if test_size is None else test_size
self.model_data = None
self.train_x = None
self.train_y = None
self.test_x = None
self.test_y = None
self.test_model = None
self.final_model = None
self.fit_error = None
self.predict_error = None
self.final_error = None
self.worst = {
"fit_errors": pd.DataFrame(),
"prediction_errors": pd.DataFrame(),
"fit_x": pd.DataFrame(),
"prediction_x": pd.DataFrame(),
"final_errors": pd.DataFrame(),
"final_x": pd.DataFrame(),
}
self.fit_df = None
self.predict_df = None
self.final_fit_df = None
self.diagnosis = {}
def run(self, plot: bool = False) -> None:
"""
A pipeline method to run all necessary methods in correct order.
:param plot: Boolean to indicate whether to plot the regression
"""
try:
self.create_dataset()
self.fit_model()
if plot:
self.plot_regression(self.fit_df)
except Exception as e:
logger.error("An error occurred during execution.")
logger.error(str(e))
def _merge_with_u_values(
self, model_data: pd.DataFrame, description: str, thermal_transmittance: str
) -> pd.DataFrame:
"""
Utility function to merge u value data with model data
:param model_data: Pandas dataframe which is the main modelling dataset
:param description: Name of the description column for which we're merging u-values onto
:param thermal_transmittance: Name of the thermal transmittance column
:return:
"""
u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[
["original_description", thermal_transmittance]].rename(
columns={thermal_transmittance: f"{description}_u_value"}
)
model_data = model_data.merge(
u_values,
how="left",
left_on=f"{description}-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame:
"""
Appends cleaned data into the model data.
:param model_data: Original model data.
:return: Model data with cleaned data appended.
"""
for description in ["walls", "floor", "roof"]:
model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance")
# lighting_proportions added separately as it doesn't use the _merge_with_u_values method
lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[
["original_description", "low_energy_proportion"]]
model_data = model_data.merge(
lighting_proportions,
how="left",
left_on="lighting-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
@staticmethod
def _convert_transaction_type(model_data: pd.DataFrame) -> pd.DataFrame:
"""
Converts transaction type to boolean
:param model_data: Model data with transaction type.
:return: Model data with converted transaction type.
"""
model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling"
model_data = model_data.drop(columns=["transaction-type"])
return model_data
@staticmethod
def bucket_and_fill(df: pd.DataFrame, column_name: str, n_bins: int = 10) -> pd.DataFrame:
"""
Simple utility function to bucket up features into bins and then fill any missing values with "NO_RECORD"
:param df: Dataframe of features to be binned
:param column_name: Name of the column to be binned
:param n_bins: Number of bins to use
:return: Dataframe with binned column
"""
# Check if the column is numerical
if np.issubdtype(df[column_name].dtype, np.number):
# Create a new categorical column from numerical one by binning the data
df[column_name + "_bucket"] = pd.cut(df[column_name], bins=n_bins).astype(str)
# Replace missing data with "NO_RECORD"
df[column_name + "_bucket"] = df[column_name + "_bucket"].fillna("NO_RECORD")
df[column_name + "_bucket"] = np.where(
df[column_name + "_bucket"] == "nan",
"NO_RECORD",
df[column_name + "_bucket"]
)
return df
def _clean_numericals(self, model_data):
# Try binning numericals
remaining_numericals = [x for x in self.NUMERICAL_COLUMNS if x not in self.BUCKET_VARIABLES]
for col in self.BUCKET_VARIABLES:
model_data[col] = pd.to_numeric(model_data[col], errors='coerce')
# If all values are missing, set all values to 0 - this column will get dropped
if all(pd.isnull(model_data[col])):
model_data[col + "_bucket"] = "NO_RECORD"
continue
model_data = self.bucket_and_fill(model_data, col)
# Replace the data with the binned version
model_data = model_data.drop(columns=self.BUCKET_VARIABLES)
model_data = model_data.rename(
columns=dict(zip([c + "_bucket" for c in self.BUCKET_VARIABLES], self.BUCKET_VARIABLES))
)
# Basic fill the rest of the columns with 0 - currenrtly this provided the best performance
for col in remaining_numericals:
model_data[col] = np.where(
model_data[col] == "", "0", model_data[col]
).astype(float)
return model_data
@staticmethod
def clean_missings(model_data: pd.DataFrame) -> pd.DataFrame:
"""
Fills categorical missing data with sensible values
:param model_data: Original model data.
:return: Model data with cleaned categorical data.
"""
# Cleaning of energy-tariff and construction-age-band hurt prediction performance, indicating there is
# potentially
# a notable difference between a "" missing and a "NO DATA!" missing, worth differentiating
model_data["mechanical-ventilation"] = np.where(
model_data["mechanical-ventilation"] == "", "NO DATA!", model_data["mechanical-ventilation"]
)
model_data["solar-water-heating-flag"] = np.where(
model_data["solar-water-heating-flag"] == "", "N", model_data["solar-water-heating-flag"]
)
model_data["glazed-type"] = np.where(
model_data["glazed-type"] == "", "NO DATA!", model_data["glazed-type"]
)
model_data["glazed-area"] = np.where(
model_data["glazed-area"] == "", "NO DATA!", model_data["glazed-type"]
)
return model_data
def create_dataset(self):
logger.info("Creating modelling dataset")
model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES]
model_data = model_data.reset_index(drop=True)
model_data["idx"] = model_data.index.copy()
# Append on u-values
model_data = self._append_cleaned_data(model_data)
model_data = self.clean_missings(model_data)
# Convert transaction_type
model_data = self._convert_transaction_type(model_data)
# Clean numerical columns
model_data = self._clean_numericals(model_data)
# Take just entries with U-values
# TODO: Rather than doing this, do we want to include the estimated u-values?
# Since this ends up with just 2k entries
model_data = model_data[
~pd.isnull(model_data["walls_u_value"]) &
~pd.isnull(model_data["floor_u_value"]) &
~pd.isnull(model_data["roof_u_value"])
]
exclude_features = [
"walls-description", "floor-description", "roof-description", "transaction-type"
]
features = [
x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [
"walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE, "idx", "is_rdsap"
] if x not in exclude_features
]
model_data = model_data[features]
for col in self.CATEGORICAL_COLS:
model_data[col] = model_data[col].astype('category')
# Convert response
model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float)
self.model_data = model_data
def make_training_test(self, x):
# Split into training and test
self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(
x.drop(self.RESPONSE, axis=1),
x[self.RESPONSE],
test_size=self.test_size,
random_state=self.random_state
)
@staticmethod
def remove_zero_std_cols(train_x, test_x=None, threshold=1e-3):
"""
Utility function to remove columns that have zero standard deviation from both test and train sets
:param train_x: Training data to remove columns from
:param test_x: If provided, remove the same columns from the test data
:param threshold: float value, if the standard deviation is below this threshold, the column is considered
to have zero standard deviation
:return: Tuple of train_x and test_x (if provided). If test_x is not provided, a null placeholder is returned
"""
# Compute standard deviations
std_devs = train_x.std()
# Find columns with zero or near-zero standard deviation
zero_std_cols = std_devs[std_devs <= threshold].index
# Drop these columns from the training data
train_x = train_x.drop(zero_std_cols, axis=1)
if test_x is not None:
# Ensure the test data has the same columns
test_x = test_x[train_x.columns]
return train_x, test_x
return train_x, None
def fit_model(self):
"""
Main function to fit the model and produce accuracy metrics
"""
x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS + self.BUCKET_VARIABLES, drop_first=True)
# Convert booleans to integer
for col in x.columns:
if x[col].dtype == bool:
x[col] = x[col].astype(int)
if x[col].dtype == object:
x[col] = x[col].astype(float)
# Create the training and test sets for each run
self.make_training_test(x)
self.train_x, self.test_x = self.remove_zero_std_cols(self.train_x, self.test_x)
logger.info("Detecting multi-collinearity in training dataset")
self.detect_multi_collinearity()
# Add a constant to the independent value
train_x = sm.add_constant(self.train_x)
test_x = sm.add_constant(self.test_x)
train_idx = train_x["idx"].copy()
test_idx = self.test_x["idx"].copy()
train_x = train_x.drop(columns=["idx"])
test_x = test_x.drop(columns=["idx"])
logger.info("Fitting testing model")
# make regression model
model = sm.OLS(self.train_y, train_x)
# fit model and print results
self.test_model = model.fit()
train_predictions = self.test_model.fittedvalues
test_predictions = self.test_model.predict(test_x)
self.fit_error, self.worst["fit_errors"] = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=train_predictions
)
# Predict on new data
self.predict_error, self.worst["prediction_errors"] = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=test_predictions
)
fit_success = self.check_successes(self.fit_error, self.BEST_FIT)
predict_success = self.check_successes(self.predict_error, self.BEST_PREDICT)
self.model_data['fit'] = self.test_model.fittedvalues
# The worst errors over index heavily for flats
self.worst["fit_x"] = self.model_data[self.model_data.index.isin(self.worst["fit_errors"].index)]
self.worst["prediction_x"] = self.model_data[self.model_data.index.isin(self.worst["prediction_errors"].index)]
self.fit_df = pd.DataFrame(
{
"fit": train_predictions,
"actual": self.train_y,
"idx": train_idx
}
).sort_values("actual", ascending=True)
self.predict_df = pd.DataFrame(
{
"predictions": test_predictions,
"actual": self.test_y,
"idx": test_idx
}
)
self.diagnosis = {
"fit_success": fit_success,
"predict_success": predict_success,
"summary": self.test_model.summary()
}
# We're now ready to fit the final model
# For the momeent, the pre-processing at the top of this function merely removes columns, so we
# just need to remove the columns that were removed from the training data from the final model
logger.info("Fitting final model")
x = sm.add_constant(x)
y = x[self.RESPONSE]
x = x[self.train_x.columns]
idx = x["idx"].copy()
x = x.drop(columns=["idx"])
final_model = sm.OLS(y, x)
# fit model and print results
self.final_model = final_model.fit()
final_predictions = self.final_model.fittedvalues
self.final_error, self.worst["final_errors"] = self.calculate_regression_metrics(
y_true=y, y_pred=final_predictions
)
self.final_fit_df = pd.DataFrame(
{
"fit": final_predictions,
"actual": y,
"idx": idx
}
).sort_values("actual", ascending=True)
@staticmethod
def check_successes(experiment_error, best_error):
"""
Simple function to check if the experiment error is better than the best error
:param experiment_error: output of calculate_regression_metrics() on the experiment
:param best_error: Current benchmark best error
:return:
"""
successes = []
for k in experiment_error:
if k in ["Explained Variance Score", "R2 Score"]:
# We want to maximise this so we want experiment error to be higher
successes.append(
{
"measure": k,
"success": experiment_error[k] >= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
continue
successes.append(
{
"measure": k,
"success": experiment_error[k] <= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
return pd.DataFrame(successes)
def rf_importance(self, train_x, train_y, test_x, test_y):
"""
Utility function to estimate feature importance using a random forest
This is useful to get a sense of some of the key features which are driving model
performance
:param train_x: Training data covariates to build the importance model on
:param train_y: Training data response to build the importance model on
:param test_x: Test data covariates to build the permutation importance model on
:param test_y: Test data response to build the permutation importance model on
:return: Pandas dataframe of feature importances, ranked by most important to least
"""
rf = RandomForestRegressor(random_state=self.random_state)
rf.fit(train_x, train_y)
# Print the name and importance of each feature
rf_importance_df = []
for feature, importance in zip(train_x.columns, rf.feature_importances_):
rf_importance_df.append(
{
"Feature": feature,
"rf_importance": importance
}
)
rf_importance_df = pd.DataFrame(rf_importance_df)
rf_importance_df = rf_importance_df.sort_values(by="rf_importance", ascending=False)
perm_importance = self.permuation_importance(rf, test_x, test_y)
return rf_importance_df, perm_importance
@staticmethod
def permuation_importance(rf, test_x, test_y):
"""
Simple utility function to produce permutation importance for a given model\
:param rf: Random forest model to calculate permutation importance for
:param test_x: Test covariates to be used for permutation importance
:param test_y: Test response to be used for permutation importance
:return:
"""
perm_importance = permutation_importance(rf, test_x, test_y, scoring='neg_mean_squared_error')
perm_importance_df = pd.DataFrame(
{
"Feature": test_x.columns,
"perm_importance": perm_importance.importances_mean
}
).sort_values(by="perm_importance", ascending=False)
return perm_importance_df
def detect_multi_collinearity(self):
# Get the VIFs for each variable
vifs = pd.DataFrame()
vifs["features"] = self.train_x.columns
vifs["vif"] = [variance_inflation_factor(self.train_x.values, i) for i in tqdm(range(self.train_x.shape[1]))]
# Get the features with the highest VIF
vifs = vifs.sort_values("vif", ascending=False)
# There are some features, we do not want to remove
required_features = [
"walls_u_value", "floor_u_value", "roof_u_value", "idx", "is_rdsap"
]
vifs = vifs[~vifs["features"].isin(required_features)]
drop_vifs = vifs[np.isinf(vifs["vif"])]
# Acceptable drop variables:
# main-fuel_Gas: mains gas
# glazed-type_NO DATA!
# glazed-area_NO DATA!
self.train_x = self.train_x.drop(columns=drop_vifs["features"].values)
self.test_x = self.test_x[self.train_x.columns]
@staticmethod
def plot_regression(df):
# Extract the "fit" and "actual" columns from the dataframe
fit = df['fit']
actual = df['actual']
# Create an array of x-values (assumed to be sequential integers)
x = np.arange(len(df))
# Plot the fit and actual data
plt.plot(x, fit, color='red', label='Fit')
plt.plot(x, actual, color='blue', label='Actual')
# Set labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Linear Regression - Fit vs Actual')
# Display legend
plt.legend()
# Show the plot
plt.show()
@staticmethod
def calculate_regression_metrics(y_true, y_pred, n=20):
"""
Calculate the 5 most important accuracy metrics for regression.
Args:
y_true (array-like): Array of true target values.
y_pred (array-like): Array of predicted target values.
Returns:
dict: Dictionary containing the calculated metrics.
"""
metrics = {
'MAPE': mean_absolute_percentage_error(y_true, y_pred),
'Mean Squared Error': mean_squared_error(y_true, y_pred),
'Mean Absolute Error': mean_absolute_error(y_true, y_pred),
'R2 Score': r2_score(y_true, y_pred),
'Explained Variance Score': explained_variance_score(y_true, y_pred),
'Median Absolute Error': median_absolute_error(y_true, y_pred)
}
errors = pd.DataFrame()
errors['Fit'] = y_true
errors['Actual'] = y_pred
errors['Residual'] = errors['Actual'] - errors['Fit']
errors['Absolute Residual'] = np.abs(errors['Residual'])
worst_errors = errors.nlargest(n, 'Absolute Residual')
return metrics, worst_errors