Model/model_data/analysis/SapModel.py
2023-07-04 16:13:40 +01:00

622 lines
22 KiB
Python

import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import pickle
from typing import Any, Dict, Tuple
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
median_absolute_error, mean_absolute_percentage_error
from sklearn.linear_model import Lasso
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
import xgboost as xgb
with open("all_data.pkl", "rb") as f:
all_data = pickle.load(f)
class SapModel:
# We want to estimate for making improvements on different property components
RESPONSE = "environment-impact-current"
# We could potentially build models by constituency to avoid having too many
# features in the model
BASE_FEATURES = [
"property-type",
"built-form",
"construction-age-band",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"transaction-type"
]
COMPONENT_FEATURES = [
"walls-description",
"floor-description",
"lighting-description",
"roof-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"photo-supply",
"windows-description",
"glazed-type",
"glazed-area",
"multi-glaze-proportion",
# "lighting-description" # Might not need to use this
"low-energy-lighting",
"number-open-fireplaces",
]
CATEGORICAL_COLS = [
"property-type",
"built-form",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"windows-description",
"glazed-type",
"glazed-area",
"construction-age-band",
# Testing
"lighting-description"
]
def __init__(self, data, cleaner, test_size=0.2, random_state=None):
self.df = pd.DataFrame(data)
self.cleaner = cleaner
self.random_state = random_state if random_state is not None else 42
self.test_size = 0.2 if test_size is None else test_size
self.model_data = None
self.train_x = None
self.train_y = None
self.test_x = None
self.test_y = None
self.results = None
self.model_data = None
self.fit_error = None
self.predict_error = None
self.worst = {"fit_errors": pd.DataFrame(), "x": pd.DataFrame(), "prediction_errors": pd.DataFrame()}
self.fit_df = None
def run(self, plot=False):
"""
A pipeline method to run all necessary methods in correct order.
"""
try:
self.create_dataset()
self.fit_model()
if plot:
self.plot_regression(self.fit_df)
except Exception as e:
print("An error occurred during execution.")
print(str(e))
def _merge_with_u_values(
self, model_data: pd.DataFrame, description: str, thermal_transmittance: str
) -> pd.DataFrame:
u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[
["original_description", thermal_transmittance]].rename(
columns={thermal_transmittance: f"{description}_u_value"}
)
model_data = model_data.merge(
u_values,
how="left",
left_on=f"{description}-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame:
for description in ["walls", "floor", "roof"]:
model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance")
# lighting_proportions added separately as it doesn't use the _merge_with_u_values method
lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[
["original_description", "low_energy_proportion"]]
model_data = model_data.merge(
lighting_proportions,
how="left",
left_on="lighting-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
@staticmethod
def _convert_transaction_type(model_data):
model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling"
model_data = model_data.drop(columns=["transaction-type"])
return model_data
@staticmethod
def _clean_numericals(model_data):
for col in ["photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces"]:
model_data[col] = np.where(
model_data[col] == "", "0", model_data["photo-supply"]
).astype(float)
return model_data
def create_dataset(self):
model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES]
model_data = model_data.reset_index(drop=True)
model_data["idx"] = model_data.index.copy()
# Append on u-values
model_data = self._append_cleaned_data(model_data)
def clean_missings(model_data):
CLEANING_COLS = ["mechanical-ventilation", "energy-tariff", "solar-water-heating-flag", "glazed-type", ""]
model_data["construction-age-band"].value_counts()
model_data["mechanical-ventilation"] = np.where(
model_data["mechanical-ventilation"] == "", "NO DATA!", model_data["mechanical-ventilation"]
)
# REVIEW THIS
# model_data["energy-tariff"] = np.where(
# model_data["energy-tariff"] == "", "Unknown", model_data["mechanical-ventilation"]
# )
#
model_data["solar-water-heating-flag"] = np.where(
model_data["solar-water-heating-flag"] == "", "N", model_data["solar-water-heating-flag"]
)
model_data["glazed-type"] = np.where(
model_data["glazed-type"] == "", "NO DATA!", model_data["glazed-type"]
)
model_data["glazed-area"] = np.where(
model_data["glazed-area"] == "", "NO DATA!", model_data["glazed-type"]
)
# model_data["construction-age-band"] = np.where(
# model_data["construction-age-band"] == "", "NO DATA!", model_data["construction-age-band"]
# )
return model_data
model_data = clean_missings(model_data)
# Convert transaction_type
model_data = self._convert_transaction_type(model_data)
# Clean numerical columns
model_data = self._clean_numericals(model_data)
# Take just entries with U-values
# TODO: Rather than doing this, do we want to include the estimated u-values?
# Since this ends up with just 2k entries
model_data = model_data[
~pd.isnull(model_data["walls_u_value"]) &
~pd.isnull(model_data["floor_u_value"]) &
~pd.isnull(model_data["roof_u_value"])
]
exclude_features = [
"walls-description", "floor-description", "roof-description", "transaction-type"
]
features = [
x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [
"walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE, "idx"
] if x not in exclude_features
]
model_data = model_data[features]
for col in self.CATEGORICAL_COLS:
model_data[col] = model_data[col].astype('category')
# Convert response
model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float)
self.model_data = model_data
def make_training_test(self, x):
# Split into training and test
self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(
x.drop(self.RESPONSE, axis=1),
x[self.RESPONSE],
test_size=self.test_size,
random_state=self.random_state
)
def remove_zero_std_cols(self, threshold=1e-3):
# Compute standard deviations
std_devs = self.train_x.std()
# Find columns with zero or near-zero standard deviation
zero_std_cols = std_devs[std_devs <= threshold].index
# Drop these columns from the training data
self.train_x = self.train_x.drop(zero_std_cols, axis=1)
# Ensure the test data has the same columns
self.test_x = self.test_x[self.train_x.columns]
def fit_model(self):
# Dummy out the categorical variables
x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS, drop_first=True)
# Convert booleans to integer
for col in x.columns:
if x[col].dtype == bool:
x[col] = x[col].astype(int)
if x[col].dtype == object:
x[col] = x[col].astype(float)
# Create the training and test sets for each run
self.make_training_test(x)
self.remove_zero_std_cols()
self.detect_multi_collinearity()
# Add a constant to the independent value
train_x = sm.add_constant(self.train_x)
test_x = sm.add_constant(self.test_x)
train_idx = train_x["idx"].copy()
test_ids = self.test_x["idx"].copy()
train_x = train_x.drop(columns=["idx"])
test_x = test_x.drop(columns=["idx"])
# importance_df = self.make_importance(train_x)
# Test dropping the least important features
# to_drop = importance_df.tail(2)["Feature"].values
# Dropping this is a good idea
to_drop = [
"hotwater-description_Electric immersion, off-peak",
]
train_x = train_x.drop(columns=to_drop)
test_x = test_x[train_x.columns]
# make regression model
model = sm.OLS(self.train_y, train_x)
# fit model and print results
self.results = model.fit()
train_predictions = self.results.fittedvalues
test_predictions = self.results.predict(test_x)
diagnose = self.test_x.copy()
diagnose["predictions"] = test_predictions
diagnose["actual"] = self.test_y.values
self.fit_error, self.worst["fit_errors"] = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=train_predictions
)
# Predict on new data
self.predict_error, self.worst["prediction_errors"] = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=test_predictions
)
fit_df = pd.DataFrame(
{
"fit": self.results.fittedvalues,
"actual": self.train_y,
"idx": train_idx
}
).sort_values("actual", ascending=True).merge(self.model_data[["idx", "property-type"]], on="idx")
# temp hardcoded values
best_fit = {'MAPE': 0.042768242654695386, 'Mean Squared Error': 21.606875710236896,
'Mean Absolute Error': 3.293776606279645, 'R2 Score': 0.7930242722318233,
'Explained Variance Score': 0.7930242722318233, 'Median Absolute Error': 2.47686604239054}
best_predict = {'MAPE': 0.04397538047202114, 'Mean Squared Error': 22.582856696398935,
'Mean Absolute Error': 3.384549163877968, 'R2 Score': 0.7515887251149801,
'Explained Variance Score': 0.7516508219403573, 'Median Absolute Error': 2.4624472128668344}
def check_successes(experiment_error, best_error):
successes = []
for k in experiment_error:
if k in ["Explained Variance Score", "R2 Score"]:
# We want to maximise this so we want experiment error to be higher
successes.append(
{
"measure": k,
"success": experiment_error[k] >= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
continue
successes.append(
{
"measure": k,
"success": experiment_error[k] <= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
return pd.DataFrame(successes)
fit_success = check_successes(self.fit_error, best_fit)
predict_success = check_successes(self.predict_error, best_predict)
print(self.results.summary())
self.model_data['fit'] = self.results.fittedvalues
# The worst errors over index heavily for flats
self.worst["x"] = self.model_data[self.model_data.index.isin(self.worst["errors"].index)]
self.fit_df = pd.DataFrame(
{
"fit": self.results.fittedvalues,
"actual": self.train_y
}
).sort_values("actual", ascending=True)
# TODO: Testing
# Create a StandardScaler instance
scaler = StandardScaler()
# Fit the scaler to the training data and transform it
train_x_scaled = scaler.fit_transform(train_x)
# Transform the test data
test_x_scaled = scaler.transform(test_x)
# Define the model
lasso_reg = Lasso(
alpha=0.1) # you can change the alpha parameter to adjust the strength of the regularization.
# Fit the model
lasso_reg.fit(train_x_scaled, self.train_y)
# Make predictions on the training set
train_predictions = lasso_reg.predict(train_x_scaled)
# Make predictions on the test set
test_predictions = lasso_reg.predict(test_x_scaled)
# Calculate metrics based on these predictions.
lasso_fit_error, _ = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=train_predictions
)
# Predict on new data
lasso_predict_error, _ = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=test_predictions
)
lasso_fit_success = check_successes(lasso_fit_error, best_fit)
lasso_predict_success = check_successes(lasso_predict_error, best_predict)
# TODO: TESTING 2
# Create a StandardScaler instance
scaler = StandardScaler()
# Fit the scaler to the training data and transform it
train_x_scaled = scaler.fit_transform(train_x)
# Transform the test data
test_x_scaled = scaler.transform(test_x)
# Define the model
alphas = np.logspace(-4, 2, 100) # Range of alpha values to search
lasso_reg = LassoCV(cv=10, alphas=alphas)
# Fit the model
lasso_reg.fit(train_x_scaled, self.train_y)
# Make predictions on the training set
train_predictions = lasso_reg.predict(train_x_scaled)
# Make predictions on the test set
test_predictions = lasso_reg.predict(test_x_scaled)
# Calculate metrics based on these predictions.
lasso_fit_error, lasso_worst_fit_errors = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=train_predictions
)
# Predict on new data
lasso_predict_error, lasso_worst_predict_errors = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=test_predictions
)
lasso_fit_success = check_successes(lasso_fit_error, best_fit)
lasso_predict_success = check_successes(lasso_predict_error, best_predict)
fit_df = pd.DataFrame(
{
"fit": train_predictions,
"actual": self.train_y,
"residual": abs(self.train_y - train_predictions),
"idx": train_idx.values
}
)
fit_df = fit_df.sort_values("residual", ascending=False)
fit_df = fit_df.merge(self.model_data, on="idx")
zz = fit_df[fit_df["lighting-description"] == "Low energy lighting in all fixed outlets"]
z = fit_df.head(100).groupby("lighting-description", observed=True)["residual"].agg(
['mean', 'count']).reset_index()
z = z.sort_values("mean", ascending=False)
worst_x = self.model_data[self.model_data.index.isin(lasso_worst_fit_errors.index)]
worst_x = worst_x.merge(lasso_worst_fit_errors, left_index=True, right_index=True)
worst_x = worst_x.sort_values("Absolute Residual", ascending=False)
def make_importance(self, train_x):
# Create a DMatrix from your training data
dtrain = xgb.DMatrix(train_x, label=self.train_y)
# Set the parameters for the XGBoost model
params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse'
}
# Train the XGBoost model
model = xgb.train(params, dtrain)
# Get feature importance scores
importance_scores = model.get_score(importance_type='gain')
# Create a dataframe with feature names and importance scores
importance_df = pd.DataFrame({
'Feature': importance_scores.keys(),
'Importance': importance_scores.values()
})
# Sort the dataframe by importance score in descending order
importance_df = importance_df.sort_values(by='Importance', ascending=False)
# Print the feature importances
return importance_df
def detect_multi_collinearity(self):
from statsmodels.stats.outliers_influence import variance_inflation_factor
from tqdm import tqdm
# Get the VIFs for each variable
vifs = pd.DataFrame()
vifs["features"] = self.train_x.columns
vifs["vif"] = [variance_inflation_factor(self.train_x.values, i) for i in tqdm(range(self.train_x.shape[1]))]
# Get the features with the highest VIF
vifs = vifs.sort_values("vif", ascending=False)
# There are some features, we do not want to remove
required_features = [
"walls_u_value", "floor_u_value", "roof_u_value"
]
vifs = vifs[~vifs["features"].isin(required_features)]
drop_vifs = vifs[np.isinf(vifs["vif"])]
# Acceptable drop variables:
# main-fuel_Gas: mains gas
# glazed-type_NO DATA!
# glazed-area_NO DATA!
self.train_x = self.train_x.drop(columns=drop_vifs["features"].values)
self.test_x = self.test_x[self.train_x.columns]
def test_multi_collinearity(self, test_variable):
from statsmodels.regression.linear_model import OLS
# drop target variable
x_temp = self.train_x.drop(columns=[test_variable])
# define target variable
y_temp = self.train_x[test_variable]
# add a constant to the predictors
x_temp = sm.add_constant(x_temp)
# fit the model
model_temp = OLS(y_temp, x_temp).fit()
print(model_temp.summary())
smry = model_temp.summary()
smry_coefs = pd.DataFrame(smry.tables[1].data[1:], columns=smry.tables[1].data[0])
smry_coefs = smry_coefs.sort_values("P>|t|", ascending=True)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
print(smry_coefs[smry_coefs["P>|t|"].astype(float) < 0.0001])
import seaborn as sns
# Select columns
selected_columns = ["main-fuel_Gas: mains gas", "mainheat-description_Community scheme"]
# Subset dataframe
subset_df = self.train_x[selected_columns]
# Plot pairplot
sns.pairplot(subset_df)
crosstab1 = pd.crosstab(self.train_x[selected_columns[0]], self.train_x[selected_columns[1]])
crosstab2 = pd.crosstab(self.train_x[selected_columns[0]], self.train_x[selected_columns[2]])
@staticmethod
def plot_regression(df):
# Extract the "fit" and "actual" columns from the dataframe
fit = df['fit']
actual = df['actual']
# Create an array of x-values (assumed to be sequential integers)
x = np.arange(len(df))
# Plot the fit and actual data
plt.plot(x, fit, color='red', label='Fit')
plt.plot(x, actual, color='blue', label='Actual')
# Set labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Linear Regression - Fit vs Actual')
# Display legend
plt.legend()
# Show the plot
plt.show()
@staticmethod
def calculate_regression_metrics(y_true, y_pred, n=20):
"""
Calculate the 5 most important accuracy metrics for regression.
Args:
y_true (array-like): Array of true target values.
y_pred (array-like): Array of predicted target values.
Returns:
dict: Dictionary containing the calculated metrics.
"""
metrics = {}
metrics['MAPE'] = mean_absolute_percentage_error(y_true, y_pred)
metrics['Mean Squared Error'] = mean_squared_error(y_true, y_pred)
metrics['Mean Absolute Error'] = mean_absolute_error(y_true, y_pred)
metrics['R2 Score'] = r2_score(y_true, y_pred)
metrics['Explained Variance Score'] = explained_variance_score(y_true, y_pred)
metrics['Median Absolute Error'] = median_absolute_error(y_true, y_pred)
errors = pd.DataFrame()
errors['Fit'] = y_true
errors['Actual'] = y_pred
errors['Residual'] = errors['Actual'] - errors['Fit']
errors['Absolute Residual'] = np.abs(errors['Residual'])
worst_errors = errors.nlargest(n, 'Absolute Residual')
return metrics, worst_errors
self = SapModel(
data=all_data["data"],
cleaner=all_data["cleaner"]
)