Model/model_data/analysis/SapModel.py
2023-07-04 10:46:26 +01:00

374 lines
13 KiB
Python

import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import pickle
from typing import Any, Dict, Tuple
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
median_absolute_error, mean_absolute_percentage_error
with open("all_data.pkl", "rb") as f:
all_data = pickle.load(f)
class SapModel:
# We want to estimate for making improvements on different property components
RESPONSE = "environment-impact-current"
# We could potentially build models by constituency to avoid having too many
# features in the model
BASE_FEATURES = [
"property-type",
"built-form",
"construction-age-band",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"transaction-type"
]
COMPONENT_FEATURES = [
"walls-description",
"floor-description",
"lighting-description",
"roof-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"photo-supply",
"windows-description",
"glazed-type",
"glazed-area",
"multi-glaze-proportion",
# "lighting-description" # Might not need to use this
"low-energy-lighting",
"number-open-fireplaces",
]
CATEGORICAL_COLS = [
"property-type",
"built-form",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"windows-description",
"glazed-type",
"glazed-area",
"construction-age-band",
# Testing
"lighting-description"
]
def __init__(self, data, cleaner, test_size=0.2, random_state=None):
self.df = pd.DataFrame(data)
self.cleaner = cleaner
self.random_state = random_state if random_state is not None else 42
self.test_size = 0.2 if test_size is None else test_size
self.model_data = None
self.train_x = None
self.train_y = None
self.test_x = None
self.test_y = None
self.results = None
self.model_data = None
self.fit_error = None
self.predict_error = None
self.worst = {"fit_errors": pd.DataFrame(), "x": pd.DataFrame(), "prediction_errors": pd.DataFrame()}
self.fit_df = None
def run(self, plot=False):
"""
A pipeline method to run all necessary methods in correct order.
"""
try:
self.create_dataset()
self.fit_model()
if plot:
self.plot_regression(self.fit_df)
except Exception as e:
print("An error occurred during execution.")
print(str(e))
def _merge_with_u_values(
self, model_data: pd.DataFrame, description: str, thermal_transmittance: str
) -> pd.DataFrame:
u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[
["original_description", thermal_transmittance]].rename(
columns={thermal_transmittance: f"{description}_u_value"}
)
model_data = model_data.merge(
u_values,
how="left",
left_on=f"{description}-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame:
for description in ["walls", "floor", "roof"]:
model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance")
# lighting_proportions added separately as it doesn't use the _merge_with_u_values method
lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[
["original_description", "low_energy_proportion"]]
model_data = model_data.merge(
lighting_proportions,
how="left",
left_on="lighting-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
@staticmethod
def _convert_transaction_type(model_data):
model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling"
model_data = model_data.drop(columns=["transaction-type"])
return model_data
@staticmethod
def _clean_numericals(model_data):
for col in ["photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces"]:
model_data[col] = np.where(
model_data[col] == "", "0", model_data["photo-supply"]
).astype(float)
return model_data
def create_dataset(self):
model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES]
model_data = model_data.reset_index(drop=True)
model_data["idx"] = model_data.index.copy()
# Append on u-values
model_data = self._append_cleaned_data(model_data)
# Convert transaction_type
model_data = self._convert_transaction_type(model_data)
# Clean numerical columns
model_data = self._clean_numericals(model_data)
# Take just entries with U-values
# TODO: Rather than doing this, do we want to include the estimated u-values?
# Since this ends up with just 2k entries
model_data = model_data[
~pd.isnull(model_data["walls_u_value"]) &
~pd.isnull(model_data["floor_u_value"]) &
~pd.isnull(model_data["roof_u_value"])
]
exclude_features = [
"walls-description", "floor-description", "roof-description", "transaction-type"
]
features = [
x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [
"walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE
] if x not in exclude_features
]
model_data = model_data[features]
for col in self.CATEGORICAL_COLS:
model_data[col] = model_data[col].astype('category')
# Convert response
model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float)
self.model_data = model_data
def make_training_test(self, x):
# Split into training and test
self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(
x.drop(self.RESPONSE, axis=1),
x[self.RESPONSE],
test_size=self.test_size,
random_state=self.random_state
)
def remove_zero_std_cols(self, threshold=1e-3):
# Compute standard deviations
std_devs = self.train_x.std()
# Find columns with zero or near-zero standard deviation
zero_std_cols = std_devs[std_devs <= threshold].index
# Drop these columns from the training data
self.train_x = self.train_x.drop(zero_std_cols, axis=1)
# Ensure the test data has the same columns
self.test_x = self.test_x[self.train_x.columns]
def fit_model(self):
# Dummy out the categorical variables
x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS, drop_first=True)
# Convert booleans to integer
for col in x.columns:
if x[col].dtype == bool:
x[col] = x[col].astype(int)
if x[col].dtype == object:
x[col] = x[col].astype(float)
# Create the training and test sets for each run
self.make_training_test(x)
self.remove_zero_std_cols()
# Add a constant to the independent value
train_x = sm.add_constant(self.train_x)
# make regression model
model = sm.OLS(self.train_y, train_x)
# fit model and print results
self.results = model.fit()
self.fit_error, self.worst["fit_errors"] = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=self.results.fittedvalues
)
# Predict on new data
predictions = self.results.predict(sm.add_constant(self.test_x))
self.predict_error, self.worst["prediction_errors"] = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=predictions
)
# temp hardcoded values
best_fit = {'MAPE': 0.04138090547359925, 'Mean Squared Error': 20.14558392249143,
'Mean Absolute Error': 3.2071693100226386, 'R2 Score': 0.8070222206305815,
'Explained Variance Score': 0.8070222206305815, 'Median Absolute Error': 2.418797962633903}
best_predict = {'MAPE': 0.04477710915141379, 'Mean Squared Error': 24.121330207821273,
'Mean Absolute Error': 3.443075571126256, 'R2 Score': 0.7346655266247644,
'Explained Variance Score': 0.7346701958813864, 'Median Absolute Error': 2.5234727208706076}
def check_successes(experiment_error, best_error):
successes = []
for k in experiment_error:
if k == "Explained Variance Score":
# We want to maximise this so we want experiment error to be higher
successes.append(
{
"measure": k,
"success": experiment_error[k] >= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
continue
successes.append(
{
"measure": k,
"success": experiment_error[k] <= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
return pd.DataFrame(successes)
check_successes(self.fit_error, best_fit)
check_successes(self.predict_error, best_predict)
self.model_data['fit'] = self.results.fittedvalues
# The worst errors over index heavily for flats
self.worst["x"] = self.model_data[self.model_data.index.isin(self.worst["errors"].index)]
self.fit_df = pd.DataFrame(
{
"fit": self.results.fittedvalues,
"actual": self.train_y
}
).sort_values("actual", ascending=True)
def detect_multi_collinearity(self):
from statsmodels.stats.outliers_influence import variance_inflation_factor
from tqdm import tqdm
# Get the VIFs for each variable
vifs = pd.DataFrame()
vifs["features"] = self.train_x.columns
vifs["vif"] = [variance_inflation_factor(self.train_x.values, i) for i in tqdm(range(self.train_x.shape[1]))]
# Get the features with the highest VIF
vifs = vifs.sort_values("vif", ascending=False)
@staticmethod
def plot_regression(df):
# Extract the "fit" and "actual" columns from the dataframe
fit = df['fit']
actual = df['actual']
# Create an array of x-values (assumed to be sequential integers)
x = np.arange(len(df))
# Plot the fit and actual data
plt.plot(x, fit, color='red', label='Fit')
plt.plot(x, actual, color='blue', label='Actual')
# Set labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Linear Regression - Fit vs Actual')
# Display legend
plt.legend()
# Show the plot
plt.show()
@staticmethod
def calculate_regression_metrics(y_true, y_pred, n=20):
"""
Calculate the 5 most important accuracy metrics for regression.
Args:
y_true (array-like): Array of true target values.
y_pred (array-like): Array of predicted target values.
Returns:
dict: Dictionary containing the calculated metrics.
"""
metrics = {}
metrics['MAPE'] = mean_absolute_percentage_error(y_true, y_pred)
metrics['Mean Squared Error'] = mean_squared_error(y_true, y_pred)
metrics['Mean Absolute Error'] = mean_absolute_error(y_true, y_pred)
metrics['R2 Score'] = r2_score(y_true, y_pred)
metrics['Explained Variance Score'] = explained_variance_score(y_true, y_pred)
metrics['Median Absolute Error'] = median_absolute_error(y_true, y_pred)
errors = pd.DataFrame()
errors['Fit'] = y_true
errors['Actual'] = y_pred
errors['Residual'] = errors['Actual'] - errors['Fit']
errors['Absolute Residual'] = np.abs(errors['Residual'])
worst_errors = errors.nlargest(n, 'Absolute Residual')
return metrics, worst_errors
self = SapModel(
data=all_data["data"],
cleaner=all_data["cleaner"]
)