diff --git a/model_data/simulation_system/MLModel/BaseMLModel.py b/model_data/simulation_system/MLModel/BaseMLModel.py index 1b5c525a..e631880d 100644 --- a/model_data/simulation_system/MLModel/BaseMLModel.py +++ b/model_data/simulation_system/MLModel/BaseMLModel.py @@ -9,8 +9,9 @@ Key tasks: - Generate Inference """ +from numpy import ndarray from pathlib import Path -from typing import Protocol, NamedTuple +from typing import Protocol, NamedTuple, Any import pandas as pd @@ -30,13 +31,13 @@ class MLModel(Protocol): """ def train_model( - self, data: pd.DataFrame, target_column: str, hyperparameter: dict + self, data: pd.DataFrame, target_column: str, hyperparameters: dict ) -> None: """ For the given data and hyperparameters (specified to the model), a model is trained """ - def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame: + def generate_predictions(self, data: pd.DataFrame) -> ndarray[Any, Any] | None: """ For the given dataframe, model is loaded and predictions are generated """ @@ -45,8 +46,8 @@ class MLModel(Protocol): self, validation_data: pd.DataFrame, target_column: str, - metrics_location: Path = None, - ) -> NamedTuple: + metrics_location: Path | None = None, + ) -> pd.DataFrame | None: """ For any validation data, a set of predictions and metrics are return """ @@ -56,7 +57,7 @@ class MLModel(Protocol): Perfomance post processing on Model to ensure ready for deployment """ - def model_metadata(self) -> dict: + def model_metadata(self) -> dict | None: """ Extract out model metadata as dictionary """ diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index dc4874b0..dcf6a387 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -8,12 +8,14 @@ Key tasks: - Generate Inference """ +from typing import Any from pathlib import Path import pandas as pd from autogluon.tabular import TabularDataset, TabularPredictor from sklearn.metrics import mean_absolute_percentage_error from core.Logger import logger from core.Metrics import Metrics +from core.Settings import METRIC_FILENAME from MLModel.BaseMLModel import MLModel AUTOGLUON_HYPERPARAMETERS = [ @@ -23,13 +25,13 @@ AUTOGLUON_HYPERPARAMETERS = [ "presets", "excluded_model_types", ] -METRIC_FILENAME = "metrics.csv" -def model_factory(model_type: str, hyperparameters: dict = None) -> MLModel: +def model_factory(model_type: str, hyperparameters: dict) -> dict: """ Use factory pattern to register the different ML implementations """ + model_types = { "autogluon": { "model": AutogluonModel, @@ -45,25 +47,27 @@ class AutogluonModel: Autogluon model that implements the MLModel Protocol """ - def __init__(self, output_filepath: Path = None) -> None: + def __init__(self, output_filepath: Path | None = None) -> None: self.model = None self.output_filepath = output_filepath self.predictions = None - def load_model(self, filepath: Path) -> None: + def load_model(self, filepath: str | Path) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ + filepath = str(filepath) + self.model = TabularPredictor.load(path=filepath) - def save_model(self, output_filepath: Path = None) -> None: + def save_model(self, output_filepath: Path | None = None) -> None: """ Providing a path, this function will save the model to be used. """ logger.info("Using AutoGluon Model - Model saving already occured") def train_model( - self, data: pd.DataFrame, target_column: str, hyperparameters: dict = None + self, data: pd.DataFrame, target_column: str, hyperparameters: dict ) -> None: """ For the given data and hyperparameters, a model is trained @@ -92,7 +96,7 @@ class AutogluonModel: excluded_model_types=hyperparameters["excluded_model_types"], ) - def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame: + def generate_predictions(self, data: pd.DataFrame) -> pd.Series: """ For the given dataframe, model is loaded and predictions are generated """ @@ -101,7 +105,7 @@ class AutogluonModel: print("No model loaded/ trained") exit(1) - predictions = self.model.predict(data) + predictions = pd.Series(self.model.predict(data)) return predictions @@ -110,7 +114,7 @@ class AutogluonModel: validation_data: pd.DataFrame, target_column: str, metrics: Metrics, - metrics_location: Path = None, + metrics_location: Path | None = None, metric_filename: str = METRIC_FILENAME, ) -> pd.DataFrame: """ @@ -118,6 +122,7 @@ class AutogluonModel: """ if metrics_location is None: logger.warning("Metrics will be outputted to current folder") + metrics_location = Path() if self.model is None: logger.error("No model loaded/ trained - Unable to generate evaluation") @@ -126,18 +131,13 @@ class AutogluonModel: # Generate prediction, load metrics suite, generate metrics betweeen the two predictions = self.generate_predictions(validation_data) - performance = self.model.evaluate(validation_data) + performance = metrics.generate_metric_suite( + actuals=validation_data[target_column], predictions=predictions + ) logger.info("Prediction used for evaluations are saved in self.prediction") self.predictions = predictions - # TODO: Can have a custom metric class that defines all different metrics we want - metric_mape = mean_absolute_percentage_error( - validation_data[target_column], predictions - ) - - performance["mape"] = metric_mape - logger.info("Saving metric file as metric.csv") metrics_location.mkdir(exist_ok=True) @@ -148,7 +148,9 @@ class AutogluonModel: return metrics_df - def optimise_model_for_deployment(self, deployment_path: Path = None) -> str: + def optimise_model_for_deployment( + self, deployment_path: Path | str | None = None + ) -> Any: """ We can optimise the deployment for a autogluon model """ @@ -158,11 +160,18 @@ class AutogluonModel: if deployment_path is None: raise ValueError("Deployment path required") + deployment_path = str(deployment_path) + # This will return a string path of the location return self.model.clone_for_deployment(deployment_path) - def model_metadata(self) -> dict: + def model_metadata(self) -> dict[str, Any]: """ For Autogluon model, use the inbuilt model info method """ + + if self.model is None: + logger.error("No Model loaded/ trained") + exit(1) + return self.model.info() diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 2b402ed3..352815bf 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -9,7 +9,7 @@ class DataLoader(Protocol): """ @staticmethod - def load(filepath: str, index_col: str = None) -> pd.DataFrame: + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None: """ Loading data from the relevant source """ @@ -21,7 +21,7 @@ class LocalDataLoader: """ @staticmethod - def load(filepath: str, index_col: str = None) -> pd.DataFrame: + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") @@ -44,7 +44,7 @@ class S3MockDataLoader: """ @staticmethod - def load(filepath: str, index_col: str = None) -> pd.DataFrame: + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: # TODO: Ingest these as environment variables in the docker compose file storage_options = { @@ -75,7 +75,7 @@ class S3DataLoader: """ @staticmethod - def load(filepath: str, index_col: str = None) -> pd.DataFrame: + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: storage_options = { "key": os.environ.get("AWS_ACCESS_KEY_ID"), @@ -96,7 +96,7 @@ class S3DataLoader: return df -def dataloader_factory(runtime_environment: str = None) -> DataLoader: +def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: """ Use factory pattern to determine which loading method we use """ diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index 7b50f486..4c37176d 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -1,7 +1,7 @@ from pathlib import Path import numpy as np import pandas as pd -from model_data.BaseUtility import Definitions +from BaseUtility import Definitions from simulation_system.core.Settings import ( DATA_PROCESSOR_SETTINGS, EARLIEST_EPC_DATE, @@ -11,7 +11,7 @@ from simulation_system.core.Settings import ( TOTAL_FLOOR_AREA_NATIONAL_AVERAGE, FLOOR_LEVEL_MAP, BUILT_FORM_REMAP, - COLUMNS_TO_MERGE_ON + COLUMNS_TO_MERGE_ON, ) from typing import List @@ -23,7 +23,6 @@ class DataProcessor: def __init__(self, filepath: Path) -> None: self.filepath = filepath - self.data = None def load_data(self, low_memory=False) -> None: self.data = pd.read_csv(self.filepath, low_memory=low_memory) @@ -32,16 +31,20 @@ class DataProcessor: """ Load data and begin initial cleaning """ - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory']) + self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) self.confine_data() - # TODO: CLean number of heated rooms and habitable rooms - self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings']) + # TODO: CLean number of heated rooms and habitable rooms + self.recast_df_columns( + column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + ) self.clean_multi_glaze_proportion() - self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count']) + self.retain_multiple_epc_properties( + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] + ) self.remap_columns() - if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1: + if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: # If we have multiple EPC records, we can try and do filling self.fill_na_fields() @@ -53,11 +56,15 @@ class DataProcessor: """ If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields """ - # Each uprn can fille backward from recent and forward fill from oldest + # Each uprn can fille backward from recent and forward fill from oldest # The groupby changes the order and we use the index to make the original data - filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply( - lambda group: group.fillna(method='bfill').fillna(method='ffill') - ).reset_index().set_index('level_1').sort_index() + filled_data = ( + self.data.groupby("UPRN", group_keys=True)[columns_to_fill] + .apply(lambda group: group.fillna(method="bfill").fillna(method="ffill")) + .reset_index() + .set_index("level_1") + .sort_index() + ) self.data[columns_to_fill] = filled_data[columns_to_fill] @@ -67,15 +74,20 @@ class DataProcessor: """ # Map all anomaly values to None - data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES))) + data_anomaly_map = dict( + zip( + Definitions.DATA_ANOMALY_MATCHES, + [None] * len(Definitions.DATA_ANOMALY_MATCHES), + ) + ) # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values data = self.data.replace(data_anomaly_map) data = data.replace(np.NAN, None) # Remap certain columns - data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP) - data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP) + data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) + data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) self.data = data @@ -84,80 +96,130 @@ class DataProcessor: def median_without_missing(group): return group[AVERAGE_FIXED_FEATURES].median(skipna=True) - cleaning_averages = self.data.groupby( - ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], - observed=True, - dropna=False - ).apply(median_without_missing).reset_index() + cleaning_averages = ( + self.data.groupby( + [ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "NUMBER_HABITABLE_ROOMS", + "NUMBER_HEATED_ROOMS", + ], + observed=True, + dropna=False, + ) + .apply(median_without_missing) + .reset_index() + ) - general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply( - median_without_missing).reset_index() + general_averages = ( + self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True) + .apply(median_without_missing) + .reset_index() + ) - property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply( - median_without_missing).reset_index() + property_averages = ( + self.data.groupby(["PROPERTY_TYPE"], observed=True) + .apply(median_without_missing) + .reset_index() + ) - built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply( - median_without_missing).reset_index() + built_form_averages = ( + self.data.groupby(["BUILT_FORM"], observed=True) + .apply(median_without_missing) + .reset_index() + ) # We can clean up any NA's in the cleaning averages with the general averages here - cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'], - suffixes=['', '_AVERAGE']) - cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'], - suffixes=['', '_PROPERTY_AVERAGE']) - cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'], - suffixes=['', '_BUILT_FORM_AVERAGE']) + cleaning_averages_filled = pd.merge( + cleaning_averages, + general_averages, + on=["PROPERTY_TYPE", "BUILT_FORM"], + suffixes=["", "_AVERAGE"], + ) + cleaning_averages_filled = pd.merge( + cleaning_averages_filled, + property_averages, + on=["PROPERTY_TYPE"], + suffixes=["", "_PROPERTY_AVERAGE"], + ) + cleaning_averages_filled = pd.merge( + cleaning_averages_filled, + built_form_averages, + on=["BUILT_FORM"], + suffixes=["", "_BUILT_FORM_AVERAGE"], + ) # Replace any missing NAN values with averages for the same Property type and built form - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE']) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE']) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_AVERAGE"]) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_AVERAGE"]) cleaning_averages_filled = cleaning_averages_filled.drop( - columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE']) + columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + ) # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope # and built form # We can use just the property type average and replace - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE']) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE']) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_PROPERTY_AVERAGE"]) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_PROPERTY_AVERAGE"]) cleaning_averages_filled = cleaning_averages_filled.drop( - columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE']) + columns=[ + "TOTAL_FLOOR_AREA_PROPERTY_AVERAGE", + "FLOOR_HEIGHT_PROPERTY_AVERAGE", + ] + ) # If there are still NA values, use BUILT FORM averages - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE']) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE']) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE"]) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_BUILT_FORM_AVERAGE"]) cleaning_averages_filled = cleaning_averages_filled.drop( - columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE']) + columns=[ + "TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE", + "FLOOR_HEIGHT_BUILT_FORM_AVERAGE", + ] + ) # If there still is na values, use average across all properties in consituecy - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean()) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT'].mean()) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA"].mean()) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT"].mean()) # If the consituency is all NA values, then take UK AVERAGE VALUES - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - FLOOR_HEIGHT_NATIONAL_AVERAGE) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) return cleaning_averages_filled def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None: - ''' + """ Reduce the data futher by keeping only datasets with multiple epcs - ''' + """ counts = self.data.groupby("UPRN").size().reset_index() counts.columns = ["UPRN", "count"] # take UPRNS with multiple EPCs counts = counts[counts["count"] > epc_minimum_count] - self.data = pd.merge(self.data, counts, on='UPRN') + self.data = pd.merge(self.data, counts, on="UPRN") def recast_df_columns(self, column_mappings: dict) -> None: """ @@ -166,7 +228,7 @@ class DataProcessor: for key, values in column_mappings.items(): if key not in self.data.columns: - print('Column mapping incorrectly specified') + print("Column mapping incorrectly specified") exit(1) for value in values: self.data[key] = self.data[key].astype(value) @@ -189,13 +251,16 @@ class DataProcessor: self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"] - self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])] + self.data = self.data[ + ~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"]) + ] def clean_multi_glaze_proportion(self) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 """ - no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & ( - self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) - self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100 + no_multi_glaze_proportion_index = pd.isnull( + self.data["MULTI_GLAZE_PROPORTION"] + ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) + self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 diff --git a/model_data/simulation_system/core/FeatureProcessor.py b/model_data/simulation_system/core/FeatureProcessor.py index 8b53cb14..a425e972 100644 --- a/model_data/simulation_system/core/FeatureProcessor.py +++ b/model_data/simulation_system/core/FeatureProcessor.py @@ -31,12 +31,12 @@ class FeatureProcessor: return df @staticmethod - def retain_features(df: pd.DataFrame, features: List[str] = None): + def retain_features(df: pd.DataFrame, features: List[str] | None = None): """ Determine which columns to keep for modelling """ if features is None: - features = df.columns + features = df.columns.to_list() else: if not set(features).issubset(df.columns): logger.error("Features defined is not contained in data") @@ -47,7 +47,9 @@ class FeatureProcessor: return df @staticmethod - def subsample_data(df: pd.DataFrame, subsample_amount: int = None) -> pd.DataFrame: + def subsample_data( + df: pd.DataFrame, subsample_amount: int | None = None + ) -> pd.DataFrame: """ Sample data to reduce number of rows for model building if needed """ @@ -60,8 +62,8 @@ class FeatureProcessor: self, df: pd.DataFrame, target_column: str = "RDSAP_CHANGE", - features: List[str] = None, - subsample_amount: int = None, + features: List[str] | None = None, + subsample_amount: int | None = None, ) -> pd.DataFrame: """ Pipeline to get data ready for building a model diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py index 15fb93f9..99edae92 100644 --- a/model_data/simulation_system/core/Metrics.py +++ b/model_data/simulation_system/core/Metrics.py @@ -6,8 +6,41 @@ Key tasks: """ import pandas as pd -from core.Settings import OPTIMISE_METRIC -from MLModel.BaseMLModel import MLModel +from pathlib import Path +import seaborn as sns +import matplotlib.pyplot as plt +from core.Settings import ( + RESIDUAL_TRUE_LABEL, + RESIDUAL_PREDICTION_LABEL, + SEABORN_RESIDUAL_AXIS_FONTSIZE, + SEABORN_RESIDUAL_TITLE_FONTSIZE, + SEABORN_RESIDUAL_STYLE, + SEABORN_RESIDUAL_ASPECT_RATIO, + SEABORN_RESIDUAL_PLOT_DPI, + SEABORN_RESIDUAL_RANGE, + SEABORN_RESIDUAL_LINE_COLOUR, + SEABORN_RESIDUAL_LINE_WIDTH, +) +from sklearn.metrics import ( + mean_absolute_error, + median_absolute_error, + mean_squared_error, + mean_absolute_percentage_error, +) + + +# Dummy example of new metric that can be added - must be true and prediction as arguments +def max_error(y_true: pd.Series, y_pred: pd.Series): + return max(y_true - y_pred) + + +METRIC_TO_APPLY = [ + mean_absolute_error, + median_absolute_error, + mean_squared_error, + mean_absolute_percentage_error, + # max_error +] def sort_by_metric( @@ -16,7 +49,8 @@ def sort_by_metric( """ Helper function to sort data frame by metric and append a best model flag """ - data = data.sort_values(optimse_metric, ascending=False).reset_index(drop=True) + # Ascending as we want lowest error values + data = data.sort_values(optimse_metric, ascending=True).reset_index(drop=True) data[best_model_column_name] = [False] * len(data) data.loc[0, best_model_column_name] = True @@ -29,38 +63,68 @@ class Metrics: """ @staticmethod - def metric_1(predictions: pd.Series, actuals: pd.Series) -> float: - """ - Can leverage ML packages like sklearn for individual metrics like MAPE etc - """ - pass - - @staticmethod - def metric_2(predictions: pd.Series, actuals: pd.Series) -> float: - """ - Can leverage ML packages like sklearn for individual metrics like MAPE etc - """ - pass - - def list_metric_functions(self) -> list: + def list_metric_functions() -> list: """ Gather all metric functions to run """ - pass + return [metric_to_apply.__name__ for metric_to_apply in METRIC_TO_APPLY] - def generate_metric_suite( - self, model: MLModel, data: pd.DataFrame, target_column: str - ) -> pd.Series: + @staticmethod + def generate_metric_suite(actuals: pd.Series, predictions: pd.Series) -> pd.Series: """ For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values """ - predictions = model.generate_predictions(data=data) - actuals = data[target_column] metric_dict = {} - for key, metric_function in asd: # TODO: - metric_dict[key] = metric_function(predictions, actuals) + for metric_function in METRIC_TO_APPLY: + metric_dict[metric_function.__name__] = metric_function( + actuals, predictions + ) - metrics = pd.Series([metric_dict]) + metrics = pd.Series(metric_dict) return metrics + + @staticmethod + def generate_plot_suite(): + """ + Can do all metric ploting + """ + + @staticmethod + def generate_residual_plot( + actuals: pd.Series, + predictions: pd.Series, + target_column: str, + output_filepath: Path | str, + ): + + # TODO: can have a model.metric_outputs method + # FOr not just do it here + residual_df = pd.DataFrame( + list(zip(actuals, predictions)), + columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL], + ) + + # image formatting + sns.set(style=SEABORN_RESIDUAL_STYLE) + ax = sns.scatterplot( + x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df + ) + ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO) + ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE) + ax.set_ylabel( + f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE + ) # ylabel + ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE) + + # Square aspect ratio + ax.plot( + SEABORN_RESIDUAL_RANGE, + SEABORN_RESIDUAL_RANGE, + SEABORN_RESIDUAL_LINE_COLOUR, + linewidth=SEABORN_RESIDUAL_LINE_WIDTH, + ) + + plt.tight_layout() + plt.savefig(output_filepath, dpi=SEABORN_RESIDUAL_PLOT_DPI) diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index 85d9f210..f595a3d2 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -2,6 +2,8 @@ # TODO: migrate to dynaconf from pathlib import Path +METRIC_FILENAME = "metrics.csv" + OPTIMISE_METRIC = "mean_absolute_error" BEST_MODEL_COLUMN_NAME = "best_model" diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index f79b6c48..c801473b 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -18,21 +18,21 @@ services: timeout: 20s retries: 3 - # simulation_system_training: - # build: - # context: ./ - # dockerfile: ./Dockerfiles/Dockerfile.training - # image: simulation_system_training - # environment: - # ENDPOINT_URL: http://minio:9000/ - # AWS_ACCESS_KEY_ID: *MINIO_USER - # AWS_SECRET_ACCESS_KEY: *MINIO_PASS - # tty: true - # depends_on: - # minio: - # condition: service_healthy - # command: - # ["bash"] + simulation_system_training: + build: + context: ./ + dockerfile: ./Dockerfiles/Dockerfile.training + image: simulation_system_training + environment: + ENDPOINT_URL: http://minio:9000/ + AWS_ACCESS_KEY_ID: *MINIO_USER + AWS_SECRET_ACCESS_KEY: *MINIO_PASS + tty: true + depends_on: + minio: + condition: service_healthy + # command: + # ["bash"] # simulation_system_prediction: # build: diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index 2400e7c7..c1ca56a6 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -10,15 +10,16 @@ from core.Settings import ( COMPONENT_FEATURES, RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, - COLUMNS_TO_MERGE_ON + COLUMNS_TO_MERGE_ON, ) from core.DataProcessor import DataProcessor -DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates' +DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates" # TODO: Have a look at temporal features + def app(): # Get all the files in the directory @@ -29,7 +30,7 @@ def app(): directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] dataset = [] - # 116 + # 116 # 128048706 # PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic # -certificates/domestic-E09000021-Kingston-upon-Thames') @@ -48,12 +49,14 @@ def app(): fixed_data = {} # If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row - if max(property_data[MANDATORY_FIXED_FEATURES].nunique()) > 1: + if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1): continue - # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS + # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict() - mandatory_field_data = property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict() + mandatory_field_data = ( + property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict() + ) # Taking just the last row, which is the percentage change from the latest to previous one only # property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1 @@ -63,17 +66,25 @@ def app(): cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list() # Get the corresponding groupby and merge, and fill in NA values - cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[ - ['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean() + cleaning_averages_to_merge = cleaning_averages.groupby( + cleaned_columns_to_merge_on + )[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() - modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on, - suffixes=['', '_AVERAGE']) - modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna( - modified_property_data['TOTAL_FLOOR_AREA_AVERAGE']) - modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna( - modified_property_data['FLOOR_HEIGHT_AVERAGE']) + modified_property_data = pd.merge( + property_data, + cleaning_averages_to_merge, + on=cleaned_columns_to_merge_on, + suffixes=["", "_AVERAGE"], + ) + modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[ + "TOTAL_FLOOR_AREA" + ].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"]) + modified_property_data["FLOOR_HEIGHT"] = modified_property_data[ + "FLOOR_HEIGHT" + ].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"]) modified_property_data = modified_property_data.drop( - columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE']) + columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + ) for field in AVERAGE_FIXED_FEATURES: @@ -94,8 +105,9 @@ def app(): # We include the lodgement date here as we probably need to factor time into the # model, since EPC standards and rigour have changed over time variable_data = modified_property_data[ - COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] - ] + COMPONENT_FEATURES + + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] + ] # Note: we look at changes between subsequent EPCS, however we could look at other permutations # e.g. first vs second, second vs third and also first vs third @@ -107,15 +119,24 @@ def app(): starting_record = variable_data.iloc[idx] ending_record = variable_data.iloc[idx + 1] - rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE] - heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE] + rdsap_change = ( + ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE] + ) + heat_demand_change = ( + ending_record[HEAT_DEMAND_RESPONSE] + - starting_record[HEAT_DEMAND_RESPONSE] + ) # TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and # floors, we may want to use the U-value. We may also want to handle the (assumed) tags # within descriptions - starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING") - ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING") + starting_record = starting_record[ + COMPONENT_FEATURES + ["LODGEMENT_DATE"] + ].add_suffix("_STARTING") + ending_record = ending_record[ + COMPONENT_FEATURES + ["LODGEMENT_DATE"] + ].add_suffix("_ENDING") features = pd.concat([starting_record, ending_record]) @@ -125,14 +146,14 @@ def app(): "RDSAP_CHANGE": rdsap_change, "HEAT_DEMAND_CHANGE": heat_demand_change, **fixed_data, - **features.to_dict() + **features.to_dict(), } ) dataset.extend(property_model_data) output = pd.DataFrame(dataset) - output.to_parquet('./dataset.parquet') + output.to_parquet("./dataset.parquet") if __name__ == "__main__": diff --git a/model_data/simulation_system/handlers/predictions_app.py b/model_data/simulation_system/handlers/predictions_app.py index 10d5b81e..1f3a5c8b 100644 --- a/model_data/simulation_system/handlers/predictions_app.py +++ b/model_data/simulation_system/handlers/predictions_app.py @@ -2,6 +2,8 @@ import os import urllib.parse from predictions import prediction +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") + def handler(event, context): """ @@ -11,17 +13,32 @@ def handler(event, context): # Assuming a file in a bucket landing for now? # Assuming we have a model to use - bucket = event["Records"][0]["s3"]["bucket"]["name"] - key = urllib.parse.unquote_plus( - event["Records"][0]["s3"]["bucket"]["key"], encoding="utf-8" - ) + # bucket = event["Records"][0]["s3"]["bucket"]["name"] + # key = urllib.parse.unquote_plus( + # event["Records"][0]["s3"]["bucket"]["key"], encoding="utf-8" + # ) - prediction_file = bucket + "/" + key + payload = event["body"] + data_path = payload["file_location"] + property_id = payload["property_id"] + portfolio_id = payload["portfolio_id"] + created_at = payload["created_at"] + + # prediction_file = bucket + "/" + key # TODO: put a model into s3, both locally and in aws - model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/") + # model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/") + model_path = os.environ.get( + "MODEL_PATH", + "s3://retrofit-model-directory-{RUNTIME_ENVIRONMENT}/RDSAP_CHANGE/autogluon/rdsap_change-medium_quality-30-2023-08-30_11-43-41/deployment/", + ) try: - prediction(model_path=model_path, data_path=prediction_file) + outputs = prediction(model_path=model_path, data_path=data_path) + # Store into s3, with key of {portfolio_id}-{property_id} + outputs.to_csv( + f"s3://retrofit-sap-prediction-{RUNTIME_ENVIRONMENT}/{portfolio_id}/{property_id}/{created_at}.csv" + ) + except (Exception, KeyError, ValueError): print("Prediction failed") diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index 680193be..22104993 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -2,6 +2,7 @@ Script to load MLModel class and generate predictions """ +import os import json import argparse import pandas as pd @@ -9,7 +10,7 @@ from typing import Optional from datetime import datetime from MLModel.Models import AutogluonModel from core.Logger import logger -from core.DataLoader import DataLoader +from core.DataLoader import dataloader_factory from core.Settings import ( BASE_REGISTRY_PATH, REGISTRY_FILE, @@ -20,6 +21,7 @@ from core.Settings import ( ) TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") # FOR TESTING # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to @@ -58,19 +60,26 @@ def ingest_arguments() -> argparse.Namespace: def prediction( target_column: str = "RDSAP_CHANGE", - model_path: str = None, - data: pd.DataFrame = None, + model_path: str | None = None, + data: Optional[pd.DataFrame | str] = None, data_path: Optional[str] = None, ): """ Main pipeline function """ - registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE + if RUNTIME_ENVIRONMENT == "local": + registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE - if registry_path is None or not registry_path.exists(): - logger.error("No registry path provided or registry doesn't exist") - exit(1) + if registry_path is None or not registry_path.exists(): + logger.error("No registry path provided or registry doesn't exist") + exit(1) + elif RUNTIME_ENVIRONMENT == "dev": + registry_path = ( + "s3://retrofit-model-directory-dev/RDSAP_CHANGE/model_registry.csv" + ) + else: + raise NotImplemented("TO be implemented") if model_path is not None: logger.info("User specified a model to load - ignoring registry") @@ -98,13 +107,17 @@ def prediction( exit(1) if data_path and data is None: logger.info("Loading data from provided path") - data = DataLoader().load(filepath=data_path, index_col="UPRN") + dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) + data = dataloader.load(filepath=data_path, index_col="UPRN") - # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION - data = data.sample(1) + if data is None: + raise ValueError("No data loaded") + + # # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION + # data = data.sample(1) else: logger.info("Using data provided") - data = json.loads(data) + data = json.loads(str(data)) data = pd.DataFrame([data]) print(data) @@ -121,6 +134,7 @@ def prediction( logger.info("--- Generating Predictions ---") prediction = model.generate_predictions(data=data) + return pd.concat([data["recommendation_id"], prediction], axis=1) # Save prediction some where? # prediction.to_csv("s3?") @@ -128,23 +142,23 @@ def prediction( # TODO: Check how we want to structure outputs # For now, just categorise by uprn and timestamp # Assume one uprn coming in for now - uprn = data.index.values[0] + # uprn = data.index.values[0] - # Saving prediction local for now - # TODO: change uprn to TARGET_ID, put in setting - logger.info("--- Outputting prediction and metadata --- ") - output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP - output_base.mkdir(parents=True, exist_ok=True) + # # Saving prediction local for now + # # TODO: change uprn to TARGET_ID, put in setting + # logger.info("--- Outputting prediction and metadata --- ") + # output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP + # output_base.mkdir(parents=True, exist_ok=True) - json_prediction = prediction.to_json(output_base / PREDICTION_FILE) - prediction_metadata = { - "model_type": model_type, - "model_name": model_name, - "model_location": model_location, - "model_settings": model.model_metadata(), - } + # json_prediction = prediction.to_json(output_base / PREDICTION_FILE) + # prediction_metadata = { + # "model_type": model_type, + # "model_name": model_name, + # "model_location": model_location, + # "model_settings": model.model_metadata(), + # } - pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE) + # pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE) return json_prediction diff --git a/model_data/simulation_system/regenerate_metrics.py b/model_data/simulation_system/regenerate_metrics.py index 5d2a8c9f..025892f5 100644 --- a/model_data/simulation_system/regenerate_metrics.py +++ b/model_data/simulation_system/regenerate_metrics.py @@ -6,20 +6,23 @@ Key task: - Save the new metrics out to s3 bucket """ +import os import argparse from s3pathlib import S3Path import pandas as pd from tqdm import tqdm from core.Logger import logger from core.Metrics import Metrics, sort_by_metric -from core.DataLoader import DataLoader +from core.DataLoader import dataloader_factory from core.Settings import ( OPTIMISE_METRIC, MODEL_DIRECTORY, REGISTRY_FILE, BEST_MODEL_COLUMN_NAME, ) -from MLModel.Models import AutogluonModel, model_factory +from MLModel.Models import model_factory + +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") def ingest_arguments() -> argparse.Namespace: @@ -54,7 +57,7 @@ def regenerate_metrics(test_filepath: str, target_column: str) -> None: """ logger.info("--- Loading test data ---") - dataloader = DataLoader() + dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) test_df = dataloader.load(filepath=test_filepath) logger.info("--- Loading model registry ---") diff --git a/model_data/simulation_system/test_data_generation.py b/model_data/simulation_system/test_data_generation.py index d57c90f8..8d1dbf2b 100644 --- a/model_data/simulation_system/test_data_generation.py +++ b/model_data/simulation_system/test_data_generation.py @@ -2,79 +2,109 @@ from core.Logger import logger import argparse import pandas as pd from pathlib import Path -from core.Settings import ( - RANDOM_SEED, - TRAIN_AND_VALIDATION_DATA_NAME, - TEST_DATA_NAME -) +from core.Settings import RANDOM_SEED, TRAIN_AND_VALIDATION_DATA_NAME, TEST_DATA_NAME + def ingest_arguments() -> argparse.Namespace: """ Helper function to take in arguments from script start """ - parser = argparse.ArgumentParser(description='Inputs for training script') + parser = argparse.ArgumentParser(description="Inputs for training script") - parser.add_argument('--filepath', type=str, help='Location of Parquet dataset to load', required=True) - parser.add_argument('--output-folder', type=str, help='Location of Parquet dataset to save', required=True) - parser.add_argument('--percentage', type=float, help='Percentage of data to use as test data', default=None) - parser.add_argument('--volume', type=int, help='Volume of data to use as test data', default=None) - parser.add_argument('--sampling', type=str, help='Type of sampling to do for test data', choices=['random', 'stratified'], default='random') + parser.add_argument( + "--filepath", + type=str, + help="Location of Parquet dataset to load", + required=True, + ) + parser.add_argument( + "--output-folder", + type=str, + help="Location of Parquet dataset to save", + required=True, + ) + parser.add_argument( + "--percentage", + type=float, + help="Percentage of data to use as test data", + default=None, + ) + parser.add_argument( + "--volume", type=int, help="Volume of data to use as test data", default=None + ) + parser.add_argument( + "--sampling", + type=str, + help="Type of sampling to do for test data", + choices=["random", "stratified"], + default="random", + ) args = parser.parse_args() return args -def main(filepath: str, output_folder: str, percentage: float, volume: int, sampling: str): + +def main( + filepath: str, output_folder: str, percentage: float, volume: int, sampling: str +): """ Load a dataset in and split out the training+validation data and the test data. """ - logger.info('---Loading Data---') + logger.info("---Loading Data---") data = pd.read_parquet(filepath).reset_index(drop=True) if percentage and volume is None: - test_amount = round(len(data)*percentage) + test_amount = round(len(data) * percentage) elif percentage is None and volume: test_amount = volume elif percentage is None and volume is None: - logger.error('No amount specified - please specify either a percentage or volume') + logger.error( + "No amount specified - please specify either a percentage or volume" + ) exit(1) else: - logger.info('Both percentage and volume specified - taking largest of the two') - test_amount = max(round(len(data)*percentage), volume) + logger.info("Both percentage and volume specified - taking largest of the two") + test_amount = max(round(len(data) * percentage), volume) - logger.info(f'---Extracting {test_amount} from dataset to be test data') + logger.info(f"---Extracting {test_amount} from dataset to be test data") - if sampling == 'random': - logger.info('--- Using random sample method ---') + train_validation_data = pd.DataFrame() + test_data = pd.DataFrame() + + if sampling == "random": + logger.info("--- Using random sample method ---") sample_index = data.sample(n=test_amount, random_state=RANDOM_SEED).index train_validation_data = data.drop(sample_index) test_data = data.iloc[sample_index] - elif sampling =='stratified': - # Not yet implemented + elif sampling == "stratified": + # Not yet implemented pass - logger.info('--- Saving data ---') + logger.info("--- Saving data ---") - train_validation_data.to_parquet(Path(output_folder)/ TRAIN_AND_VALIDATION_DATA_NAME) - test_data.to_parquet(Path(output_folder)/ TEST_DATA_NAME) + train_validation_data.to_parquet( + Path(output_folder) / TRAIN_AND_VALIDATION_DATA_NAME + ) + test_data.to_parquet(Path(output_folder) / TEST_DATA_NAME) + + logger.info(" ---Pipeline complete---") - logger.info(' ---Pipeline complete---') if __name__ == "__main__": - logger.info('--- Generate test data pipeline ---') + logger.info("--- Generate test data pipeline ---") args = ingest_arguments() main( - filepath=args.filepath, + filepath=args.filepath, output_folder=args.output_folder, - percentage=args.percentage, - volume=args.volume, - sampling=args.sampling - ) - + percentage=args.percentage, + volume=args.volume, + sampling=args.sampling, + ) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 7f6c8bb4..47f70772 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -8,11 +8,9 @@ import os from pathlib import Path from datetime import datetime import pandas as pd -import seaborn as sns -import matplotlib.pyplot as plt -from MLModel.Models import AutogluonModel, model_factory +from MLModel.Models import model_factory from core.Logger import logger -from core.Metrics import Metrics +from core.Metrics import Metrics, sort_by_metric from core.DataLoader import dataloader_factory from core.FeatureProcessor import FeatureProcessor from core.Settings import ( @@ -25,17 +23,9 @@ from core.Settings import ( SUBSAMPLE_FACTOR, MODEL_HYPERPARAMETERS, TIMESTAMP_FORMAT, - RESIDUAL_TRUE_LABEL, - RESIDUAL_PREDICTION_LABEL, RESIDUAL_FILE, - SEABORN_RESIDUAL_AXIS_FONTSIZE, - SEABORN_RESIDUAL_TITLE_FONTSIZE, - SEABORN_RESIDUAL_STYLE, - SEABORN_RESIDUAL_ASPECT_RATIO, - SEABORN_RESIDUAL_PLOT_DPI, - SEABORN_RESIDUAL_RANGE, - SEABORN_RESIDUAL_LINE_COLOUR, - SEABORN_RESIDUAL_LINE_WIDTH, + BEST_MODEL_COLUMN_NAME, + OPTIMISE_METRIC, ) TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) @@ -120,7 +110,7 @@ def training( test_filepath: str, target_column: str = "RDSAP_CHANGE", model_type: str = "autogluon", - hyperparameters: dict = None, + hyperparameters: dict | None = None, ) -> None: """ Pipeline to run training on the dataset @@ -131,6 +121,9 @@ def training( train_df = dataloader.load(filepath=train_filepath) test_df = dataloader.load(filepath=test_filepath) + if train_df is None or test_df is None: + raise ValueError("No data Loaded - cancelling pipeline") + logger.info("--- Feature processing ---") feature_processor = FeatureProcessor() @@ -165,6 +158,7 @@ def training( ) output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root + # Will need to pass output path to model (for saving purposes) model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER) model.train_model( @@ -175,56 +169,26 @@ def training( model.save_model(output_filepath=model.output_filepath) logger.info("--- Generate evaluation metrics ---") - # TODO: replace this with metrics class - # metrics_df = model.model_evaluation( - # validation_data=test_df, - # target_column=target_column, - # metrics_location=output_base / METRICS_FOLDER, - # metrics = Metrics - # ) + metrics = Metrics() + metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column, metrics_location=output_base / METRICS_FOLDER, + metrics=metrics, ) logger.info("--- Generate metric outputs using predictions ---") - # TODO: can have a model.metric_outputs method - # FOr not just do it here - residual_df = pd.DataFrame( - list(zip(test_df[target_column], model.predictions)), - columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL], - ) - - # image formatting - # TODO: move to settings file , AXIS_FONT, TITLE_FONT - sns.set(style=SEABORN_RESIDUAL_STYLE) - ax = sns.scatterplot( - x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df - ) - ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO) - ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE) - ax.set_ylabel( - f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE - ) # ylabel - ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE) - - # Square aspect ratio - ax.plot( - SEABORN_RESIDUAL_RANGE, - SEABORN_RESIDUAL_RANGE, - SEABORN_RESIDUAL_LINE_COLOUR, - linewidth=SEABORN_RESIDUAL_LINE_WIDTH, - ) - - plt.tight_layout() - plt.savefig( - output_base / METRICS_FOLDER / RESIDUAL_FILE, dpi=SEABORN_RESIDUAL_PLOT_DPI + # metrics.generate_plot_suite() + metrics.generate_residual_plot( + actuals=test_df[target_column], + predictions=model.predictions, + target_column=target_column, + output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE, ) # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host - # plt.savefig(RESIDUAL_FILE, dpi=120) # TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment # Imagining for now that the model trained here is the best model amongst all models built @@ -252,21 +216,14 @@ def training( registry_df = pd.read_csv(registry_path, index_col=None) else: # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns - registry_df = pd.DataFrame( - columns=[ - "model_type", - "model_name", - "model_location", - "mean_absolute_error", - "root_mean_squared_error", - "mean_squared_error", - "r2", - "pearsonr", - "median_absolute_error", - "mape", - "best_model", - ] - ) + columns = [ + BEST_MODEL_COLUMN_NAME, + "model_type", + "model_name", + "model_location", + ] + metrics.list_metric_functions() + + registry_df = pd.DataFrame(columns=columns) model_details_df = pd.DataFrame( [ @@ -284,11 +241,12 @@ def training( # TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and # regenerate new metrics # TODO: decide metric to optimise to - registry_df = registry_df.sort_values( - "mean_absolute_error", ascending=False - ).reset_index(drop=True) - registry_df["best_model"] = [False] * len(registry_df) - registry_df.loc[0, "best_model"] = True + + registry_df = sort_by_metric( + registry_df, + optimse_metric=OPTIMISE_METRIC, + best_model_column_name=BEST_MODEL_COLUMN_NAME, + ) logger.info("--- Saving new model to registry ---") # Ensure the directory exists