diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index c02d4c8c..89847ca1 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -6,6 +6,7 @@ from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percenta from sklearn.feature_selection import RFECV from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet import logging +from pprint import pprint # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -15,42 +16,58 @@ class EnergyConsumptionModel: FEATURES = { "heating_kwh": [ "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "heating-cost-current", "total-floor-area", "number-heated-rooms", - "mainheat-description", "main-fuel", "mainheat-energy-eff", "number-habitable-rooms", - "mainheatcont-description", "property-type", "built-form", "construction-age-band" - # To test - # "hotwater-description" - make a days since lodgment variable? - # A geographic variable + "heating-cost-current", + "total-floor-area", "number-heated-rooms", + "mainheat-description", "mainheat-energy-eff", "main-fuel", + # TESTING + "secondheat-description", + # , , "number-habitable-rooms", + # "mainheatcont-description", + # "co2-emissions-current", + # "property-type", "built-form", ], "hot_water_kwh": [ - "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", - "hot-water-cost-current" + "lodgement-year", "lodgement-month", + "current-energy-efficiency", + "energy-consumption-current", + "hot-water-cost-current", + "total-floor-area", "number-heated-rooms", + "hotwater-description", "hot-water-energy-eff", "main-fuel", "property-type", "built-form", + "co2-emissions-current", ] } TARGETS = ['heating_kwh', 'hot_water_kwh'] CATEGORICAL_COLUMNS = [ "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", "number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form", - "construction-age-band" + "construction-age-band", "secondheat-description", "hotwater-description", "hot-water-energy-eff", ] - NUMERICAL_COLUMNS = ["current-energy-efficiency", "energy-consumption-current", "heating-cost-current", - "hot-water-cost-current", "total-floor-area"] - def __init__(self, model_paths=None): + def __init__(self, model_paths=None, n_jobs=1): self.models = {} self.model_paths = model_paths or {} + self.n_jobs = n_jobs + self.data = None self.input_data = None self.dummy_columns = None self.training_predictions = {} self.testing_predictions = {} + self.best_iteration = {} self.x_train = {} self.x_test = {} + self.x_val = {} + self.y_val = {} self.y_train = {} self.y_test = {} self.selected_features = {} + self.NUMERICAL_COLUMNS = list({ + x for x in self.FEATURES["heating_kwh"] + self.FEATURES["hot_water_kwh"] + if x not in self.CATEGORICAL_COLUMNS + }) + if model_paths: for target, path in model_paths.items(): self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) @@ -96,18 +113,32 @@ class EnergyConsumptionModel: logging.info("Feature engineering completed") - def split_dataset(self, target, test_size=0.2, random_state=42): - """Splits the dataset into training and testing sets.""" + def split_dataset(self, target, test_size=0.2, validation_size=0.2, random_state=42): + """Splits the dataset into training, validation, and testing sets.""" if target not in self.TARGETS: raise ValueError(f"Target {target} not in {self.TARGETS}") logging.info(f"Splitting dataset for target {target}") - x = self.data[self.dummy_columns[target]] - y = self.data[target] - self.x_train[target], self.x_test[target], self.y_train[target], self.y_test[target] = train_test_split( - x, y, test_size=test_size, random_state=random_state + + # Split into train + validation and test sets + x_train_val, x_test, y_train_val, y_test = train_test_split( + self.data[self.dummy_columns[target]], + self.data[target], + test_size=test_size, + random_state=random_state ) + # Split train + validation into train and validation sets + x_train, x_val, y_train, y_val = train_test_split( + x_train_val, + y_train_val, + test_size=validation_size / (1 - test_size), + random_state=random_state + ) + + self.x_train[target], self.x_val[target], self.x_test[target] = x_train, x_val, x_test + self.y_train[target], self.y_val[target], self.y_test[target] = y_train, y_val, y_test + def feature_selection(self, target, cv_folds=3, sample_fraction=0.1, random_state=42): """ Performs feature selection using RFECV with XGBoost. @@ -134,26 +165,72 @@ class EnergyConsumptionModel: y_sample = self.y_train[target] # Initialize the XGBoost model and RFECV - model = XGBRegressor(objective='reg:squarederror', n_jobs=-1) - selector = RFECV(model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error') + model = self.init_model(feature_selection=True) + selector = RFECV( + model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error', verbose=1, n_jobs=self.n_jobs + ) selector = selector.fit(x_sample, y_sample) # Get the selected features self.selected_features[target] = x_sample.columns[selector.support_] - # Update x_train and x_test with selected features + # Update x_train, x_test and x_val with selected features self.x_train[target] = self.x_train[target][self.selected_features[target]] self.x_test[target] = self.x_test[target][self.selected_features[target]] + self.x_val[target] = self.x_val[target][self.selected_features[target]] logging.info(f"Feature selection completed for target {target}") + def init_model(self, feature_selection=False): + + if feature_selection: + # Set up a smaller model to work it + return XGBRegressor( + objective='reg:squarederror', + n_estimators=50, + learning_rate=0.05, + max_depth=6, + subsample=0.8, + colsample_bytree=0.8, + # n_jobs=self.n_jobs + ) + + return XGBRegressor( + objective='reg:squarederror', + n_estimators=1000, + learning_rate=0.05, + max_depth=6, + subsample=0.8, + colsample_bytree=0.8, + # n_jobs=self.n_jobs + ) + def fit_model(self, target): """Fits the linear regression model to the training data.""" logging.info(f"Fitting model for target {target}") - self.models[target] = XGBRegressor(objective='reg:squarederror') - self.models[target].fit(self.x_train[target], self.y_train[target]) + self.models[target] = self.init_model() + self.models[target].fit( + self.x_train[target], + self.y_train[target], + eval_set=[(self.x_val[target], self.y_val[target])], + early_stopping_rounds=50 + ) logging.info(f"Model fitting completed for target {target}") + # Store the best iteration + self.best_iteration[target] = self.models[target].best_iteration + + def re_train_final_model(self, target): + """Re-trains the final model on the combined training and validation set.""" + logging.info(f"Re-training final model for target {target}") + x_train_val = pd.concat([self.x_train[target], self.x_val[target]]) + y_train_val = pd.concat([self.y_train[target], self.y_val[target]]) + + self.models[target] = self.init_model() + + self.models[target].fit(x_train_val, y_train_val, verbose=False) + logging.info(f"Re-training final model completed for target {target}") + def evaluate_model(self, target): """Evaluates the model on training and testing data.""" logging.info(f"Evaluating model for target {target}") @@ -166,6 +243,9 @@ class EnergyConsumptionModel: 'Actual': self.y_train[target], 'Predicted': y_train_pred }) + self.training_predictions[target]["residual"] = abs( + self.training_predictions[target]["Actual"] - self.training_predictions[target]["Predicted"] + ) y_test_pred = self.models[target].predict(self.x_test[target]) test_mse = mean_squared_error(self.y_test[target], y_test_pred) @@ -176,11 +256,20 @@ class EnergyConsumptionModel: 'Actual': self.y_test[target], 'Predicted': y_test_pred }) + self.testing_predictions[target]["residual"] = abs( + self.testing_predictions[target]["Actual"] - self.testing_predictions[target]["Predicted"] + ) - feature_importance = pd.DataFrame({ - 'Feature': self.selected_features[target], - 'Importance': self.models[target].feature_importances_ - }).sort_values(by='Importance', ascending=False) + if target in self.selected_features: + feature_importance = pd.DataFrame({ + 'Feature': self.selected_features[target], + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) + else: + feature_importance = pd.DataFrame({ + 'Feature': self.x_train[target].columns, + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) logging.info(f"Evaluation completed for target {target}") @@ -303,18 +392,31 @@ class EnergyConsumptionModel: # Example usage: -model = EnergyConsumptionModel() -model.read_dataset('energy_consumption/2024-07-02/energy_consumption_dataset.parquet') +model = EnergyConsumptionModel(n_jobs=2) +model.read_dataset('energy_consumption/2024-07-04/energy_consumption_dataset.parquet') model.feature_engineering() # For heating_kwh model.split_dataset(target='heating_kwh') -model.feature_selection(target='heating_kwh', cv_folds=3, sample_fraction=0.1) +# model.feature_selection(target='heating_kwh', cv_folds=3, sample_fraction=0.1) model.fit_model(target='heating_kwh') + +model.re_train_final_model(target='heating_kwh') evaluation_results = model.evaluate_model(target='heating_kwh') -from pprint import pprint pprint(evaluation_results["train"]) pprint(evaluation_results["test"]) importance_df = evaluation_results["train"]["Feature Importance"] +testing_predictions = model.testing_predictions["heating_kwh"] +testing_predictions = testing_predictions.sort_values("residual", ascending=False) +# Merge on model.input_data, by the index +merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True) + +# For hot_water_kwh +model.split_dataset(target='hot_water_kwh') +model.fit_model(target='hot_water_kwh') +model.re_train_final_model(target='hot_water_kwh') +evaluation_results = model.evaluate_model(target='hot_water_kwh') +pprint(evaluation_results["train"]) +pprint(evaluation_results["test"]) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 24b10d7f..ecc62015 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -133,7 +133,7 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): # Skip the first 50 - if i < 90: + if i < 305: continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) diff --git a/etl/bill_savings/data_collation.py b/etl/bill_savings/data_combining.py similarity index 95% rename from etl/bill_savings/data_collation.py rename to etl/bill_savings/data_combining.py index ef2b286b..a111ecf2 100644 --- a/etl/bill_savings/data_collation.py +++ b/etl/bill_savings/data_combining.py @@ -1,12 +1,13 @@ import re from datetime import datetime +from tqdm import tqdm import pandas as pd from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet # These columns we co-erce to strings before saving -PROBLEMATIC_COLUMNS = ["main-heating-controls"] +PROBLEMATIC_COLUMNS = ["main-heating-controls", "floor-level"] def extract_kwh_value(text): @@ -41,7 +42,7 @@ def app(): run_date = datetime.now().strftime("%Y-%m-%d") complete_data = [] - for files in data_files: + for files in tqdm(data_files): dataset_run_date = files.split("/")[-1].split(".")[0] # Extract the date from the file name dataset_run_date = pd.Timestamp(dataset_run_date) @@ -90,5 +91,3 @@ def app(): file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet", df=df ) - - df.to_csv("energy_consumption_dataset.csv", index=False)