Model/model_data/analysis/SapModel.py
2023-07-04 10:00:15 +01:00

304 lines
10 KiB
Python

import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import pickle
from typing import Any, Dict, Tuple
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
median_absolute_error, mean_absolute_percentage_error
with open("all_data.pkl", "rb") as f:
all_data = pickle.load(f)
class SapModel:
# We want to estimate for making improvements on different property components
RESPONSE = "environment-impact-current"
# We could potentially build models by constituency to avoid having too many
# features in the model
BASE_FEATURES = [
"property-type",
"built-form",
"construction-age-band",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"transaction-type"
]
COMPONENT_FEATURES = [
"walls-description",
"floor-description",
"lighting-description",
"roof-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"photo-supply",
"windows-description",
"glazed-type",
"glazed-area",
"multi-glaze-proportion",
# "lighting-description" # Might not need to use this
"low-energy-lighting",
"number-open-fireplaces",
]
CATEGORICAL_COLS = [
"property-type",
"built-form",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"lighting-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"windows-description",
"glazed-type",
"glazed-area",
"construction-age-band",
]
def __init__(self, data, cleaner, test_size=0.2, random_state=None):
self.df = pd.DataFrame(data)
self.cleaner = cleaner
self.random_state = random_state if random_state is not None else 42
self.test_size = 0.2 if test_size is None else test_size
self.model_data = None
self.train_x = None
self.train_y = None
self.test_x = None
self.test_y = None
self.results = None
self.model_data = None
self.fit_error = None
self.worst = {"errors": pd.DataFrame(), "x": pd.DataFrame()}
self.fit_df = None
def run(self, plot=False):
"""
A pipeline method to run all necessary methods in correct order.
"""
try:
self.create_dataset()
self.fit_model()
if plot:
self.plot_regression(self.fit_df)
except Exception as e:
print("An error occurred during execution.")
print(str(e))
def _merge_with_u_values(
self, model_data: pd.DataFrame, description: str, thermal_transmittance: str
) -> pd.DataFrame:
u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[
["original_description", thermal_transmittance]].rename(
columns={thermal_transmittance: f"{description}_u_value"}
)
model_data = model_data.merge(
u_values,
how="left",
left_on=f"{description}-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame:
for description in ["walls", "floor", "roof"]:
model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance")
# lighting_proportions added separately as it doesn't use the _merge_with_u_values method
lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[
["original_description", "low_energy_proportion"]]
model_data = model_data.merge(
lighting_proportions,
how="left",
left_on="lighting-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
@staticmethod
def _convert_transaction_type(model_data):
model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling"
model_data = model_data.drop(columns=["transaction-type"])
return model_data
@staticmethod
def _clean_numericals(model_data):
for col in ["photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces"]:
model_data[col] = np.where(
model_data[col] == "", "0", model_data["photo-supply"]
).astype(float)
return model_data
def create_dataset(self):
model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES]
model_data = model_data.reset_index(drop=True)
model_data["idx"] = model_data.index.copy()
# Append on u-values
model_data = self._append_cleaned_data(model_data)
# Convert transaction_type
model_data = self._convert_transaction_type(model_data)
# Clean numerical columns
model_data = self._clean_numericals(model_data)
# Take just entries with U-values
# TODO: Rather than doing this, do we want to include the estimated u-values?
# Since this ends up with just 2k entries
model_data = model_data[
~pd.isnull(model_data["walls_u_value"]) &
~pd.isnull(model_data["floor_u_value"]) &
~pd.isnull(model_data["roof_u_value"])
]
exclude_features = ["walls-description", "floor-description", "roof-description", "transaction-type"]
features = [
x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [
"walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE
] if x not in exclude_features
]
model_data = model_data[features]
for col in self.CATEGORICAL_COLS:
model_data[col] = model_data[col].astype('category')
# Convert response
model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float)
self.model_data = model_data
def make_training_test(self, x):
# Split into training and test
self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(
x.drop(self.RESPONSE, axis=1),
x[self.RESPONSE],
test_size=self.test_size,
random_state=self.random_state
)
def fit_model(self):
# Dummy out the categorical variables
x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS, drop_first=True)
# Convert booleans to integer
for col in x.columns:
if x[col].dtype == bool:
x[col] = x[col].astype(int)
if x[col].dtype == object:
x[col] = x[col].astype(float)
# Create the training and test sets for each run
self.make_training_test(x)
# Add a constant to the independent value
train_x = sm.add_constant(self.train_x)
# make regression model
model = sm.OLS(self.train_y, train_x)
# fit model and print results
self.results = model.fit()
self.fit_error, self.worst["errors"] = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=self.results.fittedvalues
)
self.model_data['fit'] = self.results.fittedvalues
# The worst errors over index heavily for flats
self.worst["x"] = self.model_data[self.model_data.index.isin(self.worst["errors"].index)]
self.fit_df = pd.DataFrame(
{
"fit": self.results.fittedvalues,
"actual": self.train_y
}
).sort_values("actual", ascending=True)
@staticmethod
def plot_regression(df):
# Extract the "fit" and "actual" columns from the dataframe
fit = df['fit']
actual = df['actual']
# Create an array of x-values (assumed to be sequential integers)
x = np.arange(len(df))
# Plot the fit and actual data
plt.plot(x, fit, color='red', label='Fit')
plt.plot(x, actual, color='blue', label='Actual')
# Set labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Linear Regression - Fit vs Actual')
# Display legend
plt.legend()
# Show the plot
plt.show()
@staticmethod
def calculate_regression_metrics(y_true, y_pred, n=20):
"""
Calculate the 5 most important accuracy metrics for regression.
Args:
y_true (array-like): Array of true target values.
y_pred (array-like): Array of predicted target values.
Returns:
dict: Dictionary containing the calculated metrics.
"""
metrics = {}
metrics['MAPE'] = mean_absolute_percentage_error(y_true, y_pred)
metrics['Mean Squared Error'] = mean_squared_error(y_true, y_pred)
metrics['Mean Absolute Error'] = mean_absolute_error(y_true, y_pred)
metrics['R2 Score'] = r2_score(y_true, y_pred)
metrics['Explained Variance Score'] = explained_variance_score(y_true, y_pred)
metrics['Median Absolute Error'] = median_absolute_error(y_true, y_pred)
metrics['Mean True Value'] = y_true.mean()
metrics['Mean Predicted Value'] = y_pred.mean()
errors = pd.DataFrame()
errors['Fit'] = y_true
errors['Actual'] = y_pred
errors['Residual'] = errors['Actual'] - errors['Fit']
errors['Absolute Residual'] = np.abs(errors['Residual'])
worst_errors = errors.nlargest(n, 'Absolute Residual')
return metrics, worst_errors
self = SapModel(
data=all_data["data"],
cleaner=all_data["cleaner"]
)