import numpy as np import pandas as pd import statsmodels.api as sm import matplotlib.pyplot as plt import pickle from typing import Any, Dict, Tuple from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \ median_absolute_error, mean_absolute_percentage_error from sklearn.linear_model import Lasso from sklearn.preprocessing import StandardScaler import xgboost as xgb from statsmodels.stats.outliers_influence import variance_inflation_factor from tqdm import tqdm with open("all_data.pkl", "rb") as f: all_data = pickle.load(f) class SapModel: # We want to estimate for making improvements on different property components RESPONSE = "current-energy-efficiency" # We could potentially build models by constituency to avoid having too many # features in the model BASE_FEATURES = [ "property-type", "built-form", "construction-age-band", "number-habitable-rooms", "constituency", "number-heated-rooms", "transaction-type" ] COMPONENT_FEATURES = [ "walls-description", "floor-description", "lighting-description", "roof-description", "mainheat-description", "hotwater-description", "main-fuel", "mechanical-ventilation", "secondheat-description", "energy-tariff", "solar-water-heating-flag", "photo-supply", "windows-description", "glazed-type", "glazed-area", "multi-glaze-proportion", # "lighting-description" # Might not need to use this "low-energy-lighting", "number-open-fireplaces", "mainheatcont-description", "fixed-lighting-outlets-count", "floor-height", "floor-level", "total-floor-area", "extension-count", ] CATEGORICAL_COLS = [ "property-type", "built-form", "number-habitable-rooms", "constituency", "number-heated-rooms", "mainheat-description", "hotwater-description", "main-fuel", "mechanical-ventilation", "secondheat-description", "energy-tariff", "solar-water-heating-flag", "windows-description", "glazed-type", "glazed-area", "construction-age-band", "lighting-description", "mainheatcont-description", "floor-level", ] NUMERICAL_COLUMNS = [ "photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces", "fixed-lighting-outlets-count", "floor-height", "total-floor-area", "extension-count", ] def __init__(self, data, cleaner, test_size=0.2, random_state=None): self.df = pd.DataFrame(data) self.cleaner = cleaner self.random_state = random_state if random_state is not None else 42 self.test_size = 0.2 if test_size is None else test_size self.model_data = None self.train_x = None self.train_y = None self.test_x = None self.test_y = None self.results = None self.model_data = None self.fit_error = None self.predict_error = None self.worst = {"fit_errors": pd.DataFrame(), "x": pd.DataFrame(), "prediction_errors": pd.DataFrame()} self.fit_df = None def run(self, plot=False): """ A pipeline method to run all necessary methods in correct order. """ try: self.create_dataset() self.fit_model() if plot: self.plot_regression(self.fit_df) except Exception as e: print("An error occurred during execution.") print(str(e)) def _merge_with_u_values( self, model_data: pd.DataFrame, description: str, thermal_transmittance: str ) -> pd.DataFrame: u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[ ["original_description", thermal_transmittance]].rename( columns={thermal_transmittance: f"{description}_u_value"} ) model_data = model_data.merge( u_values, how="left", left_on=f"{description}-description", right_on="original_description" ).drop(columns=["original_description"]) return model_data def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame: for description in ["walls", "floor", "roof"]: model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance") # lighting_proportions added separately as it doesn't use the _merge_with_u_values method lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[ ["original_description", "low_energy_proportion"]] model_data = model_data.merge( lighting_proportions, how="left", left_on="lighting-description", right_on="original_description" ).drop(columns=["original_description"]) return model_data @staticmethod def _convert_transaction_type(model_data): model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling" model_data = model_data.drop(columns=["transaction-type"]) return model_data def _clean_numericals(self, model_data): # Try binning numericals def bucket_and_fill(df, column_name, n_bins=10): # Check if the column is numerical if np.issubdtype(df[column_name].dtype, np.number): # Create a new categorical column from numerical one by binning the data df[column_name + "_bucket"] = pd.cut(df[column_name], bins=n_bins).astype(str) # Replace missing data with "NO_RECORD" df[column_name + "_bucket"] = df[column_name + "_bucket"].fillna("NO_RECORD") df[column_name + "_bucket"] = np.where( df[column_name + "_bucket"] == "nan", "NO_RECORD", df[column_name + "_bucket"] ) return df bucket_variables = [] remaining_numericals = [x for x in self.NUMERICAL_COLUMNS if x not in bucket_variables] for col in bucket_variables: model_data[col] = pd.to_numeric(model_data[col], errors='coerce') model_data = bucket_and_fill(model_data, col) # Replace the data with the binned version model_data = model_data.drop(columns=bucket_variables) model_data = model_data.rename( columns=dict(zip([c + "_bucket" for c in bucket_variables], bucket_variables)) ) for col in remaining_numericals: model_data[col] = np.where( model_data[col] == "", "0", model_data[col] ).astype(float) # Basic fill the rest of the columns with 0 # grouping_variables = ["constituency", "built-form", "property-type"] # secondary_grouping_variables = ["constituency", "property-type"] # # def append_missings(model_data, col, grouping_variables, secondary=False): # if not secondary: # numerical_data = model_data[(model_data[col] != "")].copy() # else: # numerical_data = model_data[~pd.isnull(model_data[col])].copy() # # numerical_data[col] = numerical_data[col].astype(float) # numerical_data = numerical_data.groupby(grouping_variables, observed=True)[col].mean().reset_index() # numerical_data = numerical_data.rename(columns={col: f"{col}_avg"}) # model_data = model_data.merge(numerical_data, how="left", on=grouping_variables) # # if not secondary: # model_data[col] = np.where(model_data[col] == "", model_data[f"{col}_avg"], model_data[col]) # else: # model_data[col] = np.where(pd.isnull(model_data[col]), model_data[f"{col}_avg"], model_data[col]) # # model_data = model_data.drop(columns=[f"{col}_avg"]) # model_data[col] = model_data[col].astype(float) # # return model_data # # for col in numerical_cols: # model_data[col] = pd.to_numeric(model_data[col], errors='coerce') # model_data[col] = model_data[col].fillna(model_data[col].mean()) # model_data = append_missings(model_data, col, grouping_variables) # # if pd.isnull(model_data[col]).sum(): # model_data = append_missings(model_data, col, secondary_grouping_variables, True) # # if pd.isnull(model_data[col]).sum(): # raise Exception("Cleaning failed, missing values remain") # # model_data[col] = np.where( # model_data[col] == "", "0", model_data[col] # ).astype(float) return model_data @staticmethod def clean_missings(model_data): # Cleaning of energy-tariff and construction-age-band hurt prediction performance, indicating there is # potentially # a notable difference between a "" missing and a "NO DATA!" missing, worth differentiating model_data["construction-age-band"].value_counts() model_data["mechanical-ventilation"] = np.where( model_data["mechanical-ventilation"] == "", "NO DATA!", model_data["mechanical-ventilation"] ) model_data["solar-water-heating-flag"] = np.where( model_data["solar-water-heating-flag"] == "", "N", model_data["solar-water-heating-flag"] ) model_data["glazed-type"] = np.where( model_data["glazed-type"] == "", "NO DATA!", model_data["glazed-type"] ) model_data["glazed-area"] = np.where( model_data["glazed-area"] == "", "NO DATA!", model_data["glazed-type"] ) return model_data def create_dataset(self): model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES] model_data = model_data.reset_index(drop=True) model_data["idx"] = model_data.index.copy() # Append on u-values model_data = self._append_cleaned_data(model_data) model_data = self.clean_missings(model_data) # Convert transaction_type model_data = self._convert_transaction_type(model_data) # Clean numerical columns model_data = self._clean_numericals(model_data) # Take just entries with U-values # TODO: Rather than doing this, do we want to include the estimated u-values? # Since this ends up with just 2k entries model_data = model_data[ ~pd.isnull(model_data["walls_u_value"]) & ~pd.isnull(model_data["floor_u_value"]) & ~pd.isnull(model_data["roof_u_value"]) ] exclude_features = [ "walls-description", "floor-description", "roof-description", "transaction-type" ] features = [ x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [ "walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE, "idx", "is_rdsap" ] if x not in exclude_features ] model_data = model_data[features] for col in self.CATEGORICAL_COLS: model_data[col] = model_data[col].astype('category') # Convert response model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float) self.model_data = model_data def make_training_test(self, x): # Split into training and test self.train_x, self.test_x, self.train_y, self.test_y = train_test_split( x.drop(self.RESPONSE, axis=1), x[self.RESPONSE], test_size=self.test_size, random_state=self.random_state ) def remove_zero_std_cols(self, threshold=1e-3): # Compute standard deviations std_devs = self.train_x.std() # Find columns with zero or near-zero standard deviation zero_std_cols = std_devs[std_devs <= threshold].index # Drop these columns from the training data self.train_x = self.train_x.drop(zero_std_cols, axis=1) # Ensure the test data has the same columns self.test_x = self.test_x[self.train_x.columns] def fit_model(self): # Dummy out the categorical variables binned = [] x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS + binned, drop_first=True) # Convert booleans to integer for col in x.columns: if x[col].dtype == bool: x[col] = x[col].astype(int) if x[col].dtype == object: x[col] = x[col].astype(float) # Create the training and test sets for each run self.make_training_test(x) self.remove_zero_std_cols() self.detect_multi_collinearity() # Add a constant to the independent value train_x = sm.add_constant(self.train_x) test_x = sm.add_constant(self.test_x) train_idx = train_x["idx"].copy() test_ids = self.test_x["idx"].copy() train_x = train_x.drop(columns=["idx"]) test_x = test_x.drop(columns=["idx"]) from sklearn.ensemble import RandomForestRegressor from sklearn.inspection import permutation_importance rf = RandomForestRegressor(random_state=self.random_state) rf.fit(train_x, self.train_y) # Print the name and importance of each feature importance_df = [] for feature, importance in zip(train_x.columns, rf.feature_importances_): importance_df.append( { "Feature": feature, "rf_importance": importance } ) importance_df = pd.DataFrame(importance_df) importance_df = importance_df.sort_values(by="rf_importance", ascending=False) perm_importance = permutation_importance(rf, test_x, self.test_y, scoring='neg_mean_squared_error') perm_importance_df = pd.DataFrame( { "Feature": test_x.columns, "perm_importance": perm_importance.importances_mean } ).sort_values(by="perm_importance", ascending=False) # make regression model model = sm.OLS(self.train_y, train_x) # fit model and print results self.results = model.fit() train_predictions = self.results.fittedvalues test_predictions = self.results.predict(test_x) diagnose = self.test_x.copy() diagnose["predictions"] = test_predictions diagnose["actual"] = self.test_y.values self.fit_error, self.worst["fit_errors"] = self.calculate_regression_metrics( y_true=self.train_y, y_pred=train_predictions ) # Predict on new data self.predict_error, self.worst["prediction_errors"] = self.calculate_regression_metrics( y_true=self.test_y, y_pred=test_predictions ) fit_df = pd.DataFrame( { "fit": self.results.fittedvalues, "actual": self.train_y, "idx": train_idx } ).sort_values("actual", ascending=True).merge(self.model_data[["idx", "property-type"]], on="idx") # temp hardcoded values best_fit = {'MAPE': 0.04617542805587113, 'Mean Squared Error': 18.62306128026334, 'Mean Absolute Error': 2.865262003625814, 'R2 Score': 0.8008316762496143, 'Explained Variance Score': 0.8008316762496143, 'Median Absolute Error': 1.911197425417548} best_predict = {'MAPE': 0.04358926901734807, 'Mean Squared Error': 21.197491698961528, 'Mean Absolute Error': 3.046853690257838, 'R2 Score': 0.7215087343364782, 'Explained Variance Score': 0.7215726927575035, 'Median Absolute Error': 1.921094388694634} def check_successes(experiment_error, best_error): successes = [] for k in experiment_error: if k in ["Explained Variance Score", "R2 Score"]: # We want to maximise this so we want experiment error to be higher successes.append( { "measure": k, "success": experiment_error[k] >= best_error[k], "difference": abs(experiment_error[k] - best_error[k]) } ) continue successes.append( { "measure": k, "success": experiment_error[k] <= best_error[k], "difference": abs(experiment_error[k] - best_error[k]) } ) return pd.DataFrame(successes) fit_success = check_successes(self.fit_error, best_fit) predict_success = check_successes(self.predict_error, best_predict) print(self.results.summary()) self.model_data['fit'] = self.results.fittedvalues # The worst errors over index heavily for flats self.worst["x"] = self.model_data[self.model_data.index.isin(self.worst["errors"].index)] self.fit_df = pd.DataFrame( { "fit": self.results.fittedvalues, "actual": self.train_y } ).sort_values("actual", ascending=True) # TODO: Testing # Create a StandardScaler instance scaler = StandardScaler() # Fit the scaler to the training data and transform it train_x_scaled = scaler.fit_transform(train_x) # Transform the test data test_x_scaled = scaler.transform(test_x) # Define the model lasso_reg = Lasso( alpha=0.1) # you can change the alpha parameter to adjust the strength of the regularization. # Fit the model lasso_reg.fit(train_x_scaled, self.train_y) # Make predictions on the training set train_predictions = lasso_reg.predict(train_x_scaled) # Make predictions on the test set test_predictions = lasso_reg.predict(test_x_scaled) # Calculate metrics based on these predictions. lasso_fit_error, _ = self.calculate_regression_metrics( y_true=self.train_y, y_pred=train_predictions ) # Predict on new data lasso_predict_error, _ = self.calculate_regression_metrics( y_true=self.test_y, y_pred=test_predictions ) lasso_fit_success = check_successes(lasso_fit_error, best_fit) lasso_predict_success = check_successes(lasso_predict_error, best_predict) # TODO: TESTING 2 # Create a StandardScaler instance scaler = StandardScaler() # Fit the scaler to the training data and transform it train_x_scaled = scaler.fit_transform(train_x) # Transform the test data test_x_scaled = scaler.transform(test_x) # Define the model alphas = np.logspace(-4, 2, 100) # Range of alpha values to search lasso_reg = LassoCV(cv=10, alphas=alphas) # Fit the model lasso_reg.fit(train_x_scaled, self.train_y) # Make predictions on the training set train_predictions = lasso_reg.predict(train_x_scaled) # Make predictions on the test set test_predictions = lasso_reg.predict(test_x_scaled) # Calculate metrics based on these predictions. lasso_fit_error, lasso_worst_fit_errors = self.calculate_regression_metrics( y_true=self.train_y, y_pred=train_predictions ) # Predict on new data lasso_predict_error, lasso_worst_predict_errors = self.calculate_regression_metrics( y_true=self.test_y, y_pred=test_predictions ) lasso_fit_success = check_successes(lasso_fit_error, best_fit) lasso_predict_success = check_successes(lasso_predict_error, best_predict) fit_df = pd.DataFrame( { "fit": train_predictions, "actual": self.train_y, "residual": abs(self.train_y - train_predictions), "idx": train_idx.values } ) fit_df = fit_df.sort_values("residual", ascending=False) fit_df = fit_df.merge(self.model_data, on="idx") zz = fit_df[fit_df["lighting-description"] == "Low energy lighting in all fixed outlets"] z = fit_df.head(100).groupby("lighting-description", observed=True)["residual"].agg( ['mean', 'count']).reset_index() z = z.sort_values("mean", ascending=False) worst_x = self.model_data[self.model_data.index.isin(lasso_worst_fit_errors.index)] worst_x = worst_x.merge(lasso_worst_fit_errors, left_index=True, right_index=True) worst_x = worst_x.sort_values("Absolute Residual", ascending=False) def detect_multi_collinearity(self): # Get the VIFs for each variable vifs = pd.DataFrame() vifs["features"] = self.train_x.columns vifs["vif"] = [variance_inflation_factor(self.train_x.values, i) for i in tqdm(range(self.train_x.shape[1]))] # Get the features with the highest VIF vifs = vifs.sort_values("vif", ascending=False) # There are some features, we do not want to remove required_features = [ "walls_u_value", "floor_u_value", "roof_u_value" ] vifs = vifs[~vifs["features"].isin(required_features)] drop_vifs = vifs[np.isinf(vifs["vif"])] # Acceptable drop variables: # main-fuel_Gas: mains gas # glazed-type_NO DATA! # glazed-area_NO DATA! self.train_x = self.train_x.drop(columns=drop_vifs["features"].values) self.test_x = self.test_x[self.train_x.columns] def test_multi_collinearity(self, test_variable): from statsmodels.regression.linear_model import OLS # drop target variable x_temp = self.train_x.drop(columns=[test_variable]) # define target variable y_temp = self.train_x[test_variable] # add a constant to the predictors x_temp = sm.add_constant(x_temp) # fit the model model_temp = OLS(y_temp, x_temp).fit() print(model_temp.summary()) smry = model_temp.summary() smry_coefs = pd.DataFrame(smry.tables[1].data[1:], columns=smry.tables[1].data[0]) smry_coefs = smry_coefs.sort_values("P>|t|", ascending=True) pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 500) pd.set_option('display.width', 1000) print(smry_coefs[smry_coefs["P>|t|"].astype(float) < 0.0001]) import seaborn as sns # Select columns selected_columns = ["main-fuel_Gas: mains gas", "mainheat-description_Community scheme"] # Subset dataframe subset_df = self.train_x[selected_columns] # Plot pairplot sns.pairplot(subset_df) crosstab1 = pd.crosstab(self.train_x[selected_columns[0]], self.train_x[selected_columns[1]]) crosstab2 = pd.crosstab(self.train_x[selected_columns[0]], self.train_x[selected_columns[2]]) @staticmethod def plot_regression(df): # Extract the "fit" and "actual" columns from the dataframe fit = df['fit'] actual = df['actual'] # Create an array of x-values (assumed to be sequential integers) x = np.arange(len(df)) # Plot the fit and actual data plt.plot(x, fit, color='red', label='Fit') plt.plot(x, actual, color='blue', label='Actual') # Set labels and title plt.xlabel('Index') plt.ylabel('Value') plt.title('Linear Regression - Fit vs Actual') # Display legend plt.legend() # Show the plot plt.show() @staticmethod def calculate_regression_metrics(y_true, y_pred, n=20): """ Calculate the 5 most important accuracy metrics for regression. Args: y_true (array-like): Array of true target values. y_pred (array-like): Array of predicted target values. Returns: dict: Dictionary containing the calculated metrics. """ metrics = { 'MAPE': mean_absolute_percentage_error(y_true, y_pred), 'Mean Squared Error': mean_squared_error(y_true, y_pred), 'Mean Absolute Error': mean_absolute_error(y_true, y_pred), 'R2 Score': r2_score(y_true, y_pred), 'Explained Variance Score': explained_variance_score(y_true, y_pred), 'Median Absolute Error': median_absolute_error(y_true, y_pred) } errors = pd.DataFrame() errors['Fit'] = y_true errors['Actual'] = y_pred errors['Residual'] = errors['Actual'] - errors['Fit'] errors['Absolute Residual'] = np.abs(errors['Residual']) worst_errors = errors.nlargest(n, 'Absolute Residual') return metrics, worst_errors self = SapModel( data=all_data["data"], cleaner=all_data["cleaner"] ) from sklearn.ensemble import RandomForestRegressor rf = RandomForestRegressor(random_state=self.random_state) X = self.df.drop(columns=self.RESPONSE) X = X.drop( columns=["address", "uprn-source", "heating-cost-potential", "hot-water-cost-potential", "potential-energy-rating", "environment-impact-potential", "heating-cost-current", "address3", "local-authority-label", "hot-water-cost-current", "county", "postcode", "constituency", "co2-emissions-potential", "address2", "posttown", "lighting-cost-potential", "lodgement-datetime", "current-energy-rating", "uprn", "energy-consumption-current", "lighting-cost-current", "lodgement-date", "lmk-key", "potential-energy-efficiency", "energy-consumption-potential", "inspection-date", "co2-emiss-curr-per-floor-area", "address1", "constituency-label", "building-reference-number", "environment-impact-current", "co2-emissions-current", ]) # Note: this probably not the best way to treat low-energy-fixed-light-count, unheated-corridor-length, # fixed-lighting-outlets-count for col in [ "photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces", "floor-height", "low-energy-fixed-light-count", "unheated-corridor-length", "fixed-lighting-outlets-count", "total-floor-area" ]: X[col] = np.where( X[col] == "", "0", X[col] ).astype(float) Y = self.df[self.RESPONSE] X = pd.get_dummies(X) rf.fit(X, Y) # Print the name and importance of each feature importance_df = [] for feature, importance in zip(X.columns, rf.feature_importances_): importance_df.append( { "Feature": feature, "rf_importance": importance } ) importance_df = pd.DataFrame(importance_df) importance_df = importance_df.sort_values("rf_importance", ascending=False)