From a137cccc05406e6f5f11614a04cb156758e75ab4 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Thu, 31 Aug 2023 00:06:03 +0100 Subject: [PATCH] added factory pattern --- .../simulation_system/MLModel/Models.py | 24 +++-- .../simulation_system/core/DataLoader.py | 92 +++++++++++++++---- model_data/simulation_system/core/Settings.py | 2 +- .../simulation_system/regenerate_metrics.py | 4 +- model_data/simulation_system/training.py | 43 ++++++--- 5 files changed, 121 insertions(+), 44 deletions(-) diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index 25668bc2..dc4874b0 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -13,6 +13,7 @@ import pandas as pd from autogluon.tabular import TabularDataset, TabularPredictor from sklearn.metrics import mean_absolute_percentage_error from core.Logger import logger +from core.Metrics import Metrics from MLModel.BaseMLModel import MLModel AUTOGLUON_HYPERPARAMETERS = [ @@ -25,18 +26,18 @@ AUTOGLUON_HYPERPARAMETERS = [ METRIC_FILENAME = "metrics.csv" -def select_model(model_type: str) -> MLModel: +def model_factory(model_type: str, hyperparameters: dict = None) -> MLModel: """ - Helper function to select the model to use + Use factory pattern to register the different ML implementations """ + model_types = { + "autogluon": { + "model": AutogluonModel, + "naming_attributes": f"{hyperparameters['presets']}-{hyperparameters['time_limit']}", + }, + } - if model_type == "autogluon": - model = AutogluonModel() - else: - logger.error("No other model currently implemented") - exit(1) - - return model + return model_types[model_type] class AutogluonModel: @@ -108,6 +109,7 @@ class AutogluonModel: self, validation_data: pd.DataFrame, target_column: str, + metrics: Metrics, metrics_location: Path = None, metric_filename: str = METRIC_FILENAME, ) -> pd.DataFrame: @@ -121,9 +123,11 @@ class AutogluonModel: logger.error("No model loaded/ trained - Unable to generate evaluation") exit(1) - performance = self.model.evaluate(validation_data) + # Generate prediction, load metrics suite, generate metrics betweeen the two predictions = self.generate_predictions(validation_data) + performance = self.model.evaluate(validation_data) + logger.info("Prediction used for evaluations are saved in self.prediction") self.predictions = predictions diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 17fd36b9..2b402ed3 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -1,14 +1,28 @@ import pandas as pd import os +from typing import Protocol -class DataLoader: +class DataLoader(Protocol): + """ + Interface for all DataLoader classes + """ + @staticmethod def load(filepath: str, index_col: str = None) -> pd.DataFrame: """ - Load different datasets + Loading data from the relevant source """ + +class LocalDataLoader: + """ + Implements the DataLoader Protocol for local files + """ + + @staticmethod + def load(filepath: str, index_col: str = None) -> pd.DataFrame: + if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") @@ -23,42 +37,82 @@ class DataLoader: return df - @staticmethod - def s3_load(filepath: str, index_col: str = None) -> pd.DataFrame: - """ - Load different datasets from s3 - """ - STORAGE_OPTIONS = { +class S3MockDataLoader: + """ + Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service + """ + + @staticmethod + def load(filepath: str, index_col: str = None) -> pd.DataFrame: + + # TODO: Ingest these as environment variables in the docker compose file + storage_options = { "key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"), "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), "client_kwargs": { - # "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") - "endpoint_url": os.environ.get("ENDPOINT_URL") + "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") }, } if filepath.endswith(".parquet"): - df = pd.read_parquet(filepath, storage_options=STORAGE_OPTIONS) + df = pd.read_parquet(filepath, storage_options=storage_options) if index_col is not None: df = df.set_index(index_col) elif filepath.endswith(".csv"): df = pd.read_csv( - filepath, index_col=index_col, storage_options=STORAGE_OPTIONS + filepath, index_col=index_col, storage_options=storage_options ) else: raise ValueError(f"File format not supported for file: {filepath}") return df - def process(self, filepath: str, index_col: str = None) -> pd.DataFrame: - """ - Based off the filepath, we choose a loader style - """ - if filepath.startswith("s3://"): - df = self.s3_load(filepath=filepath, index_col=index_col) +class S3DataLoader: + """ + Implements the DataLoader Protocol for s3 files + """ + + @staticmethod + def load(filepath: str, index_col: str = None) -> pd.DataFrame: + + storage_options = { + "key": os.environ.get("AWS_ACCESS_KEY_ID"), + "secret": os.environ.get("AWS_SECRET_ACCESS_KEY"), + } + + if filepath.endswith(".parquet"): + df = pd.read_parquet(filepath, storage_options=storage_options) + if index_col is not None: + df = df.set_index(index_col) + elif filepath.endswith(".csv"): + df = pd.read_csv( + filepath, index_col=index_col, storage_options=storage_options + ) else: - df = self.load(filepath=filepath, index_col=index_col) + raise ValueError(f"File format not supported for file: {filepath}") return df + + +def dataloader_factory(runtime_environment: str = None) -> DataLoader: + """ + Use factory pattern to determine which loading method we use + """ + + if runtime_environment is None: + runtime_environment = "local" + + dataloader_types = { + "local": LocalDataLoader(), + "local-mock": S3MockDataLoader(), + "dev": S3DataLoader(), + "staging": S3DataLoader(), + "prod": S3DataLoader(), + } + + if runtime_environment not in dataloader_types: + raise ValueError("Incorrect runtime environment specified") + + return dataloader_types[runtime_environment] diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index 5c1c37a2..85d9f210 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -30,7 +30,7 @@ MODEL_HYPERPARAMETERS = { } } -TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S" +TIMESTAMP_FORMAT = "%Y_%m_%d_%H_%M_%S" RANDOM_SEED = 0 SUBSAMPLE_FACTOR = 200 diff --git a/model_data/simulation_system/regenerate_metrics.py b/model_data/simulation_system/regenerate_metrics.py index 1386f7d9..5d2a8c9f 100644 --- a/model_data/simulation_system/regenerate_metrics.py +++ b/model_data/simulation_system/regenerate_metrics.py @@ -19,7 +19,7 @@ from core.Settings import ( REGISTRY_FILE, BEST_MODEL_COLUMN_NAME, ) -from MLModel.Models import AutogluonModel, select_model +from MLModel.Models import AutogluonModel, model_factory def ingest_arguments() -> argparse.Namespace: @@ -76,7 +76,7 @@ def regenerate_metrics(test_filepath: str, target_column: str) -> None: logger.info(f"--- Loading Model ({row['model_name']}) ---") - model = select_model(model_type=row["model_type"]) + model = model_factory(model_type=row["model_type"])() model.load_model(filepath=row["model_location"]) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index ef1e9c1d..7f6c8bb4 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -1,6 +1,7 @@ import argparse -# import os +import os + # from s3pathlib import S3Path # import boto3 @@ -9,9 +10,10 @@ from datetime import datetime import pandas as pd import seaborn as sns import matplotlib.pyplot as plt -from MLModel.Models import AutogluonModel +from MLModel.Models import AutogluonModel, model_factory from core.Logger import logger -from core.DataLoader import DataLoader +from core.Metrics import Metrics +from core.DataLoader import dataloader_factory from core.FeatureProcessor import FeatureProcessor from core.Settings import ( MODEL_DIRECTORY, @@ -38,6 +40,8 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") + # STORAGE_OPTIONS = { # "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), # "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), @@ -123,14 +127,15 @@ def training( """ logger.info("--- Loading data ---") - dataloader = DataLoader() - train_df = dataloader.process(filepath=train_filepath) - test_df = dataloader.process(filepath=test_filepath) + dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) + train_df = dataloader.load(filepath=train_filepath) + test_df = dataloader.load(filepath=test_filepath) logger.info("--- Feature processing ---") feature_processor = FeatureProcessor() + # This is for convenience for now subsample_amount = round(len(train_df) / SUBSAMPLE_FACTOR) train_df = feature_processor.process( @@ -147,13 +152,20 @@ def training( hyperparameters = MODEL_HYPERPARAMETERS[model_type] logger.info(f"Hyperparameters are: {hyperparameters}") - if model_type == "autogluon": - model_root = f"{target_column}-{hyperparameters['presets']}-{hyperparameters['time_limit']}-{TIMESTAMP}".lower() - output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root + logger.info( + "--- Loading model configuration (Model type and Naming convention) ---" + ) + # We might want to have hyperparameters in the names to make models more recognisable + model_toolkit = model_factory( + model_type=model_type, hyperparameters=hyperparameters + ) - model = AutogluonModel(output_filepath=output_base / MODEL_FOLDER) - else: - raise ValueError("No alternative model implemented yet") + model_root = ( + f"{target_column}-{model_toolkit['naming_attributes']}-{TIMESTAMP}".lower() + ) + output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root + + model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER) model.train_model( data=train_df, target_column=target_column, hyperparameters=hyperparameters @@ -163,6 +175,13 @@ def training( model.save_model(output_filepath=model.output_filepath) logger.info("--- Generate evaluation metrics ---") + # TODO: replace this with metrics class + # metrics_df = model.model_evaluation( + # validation_data=test_df, + # target_column=target_column, + # metrics_location=output_base / METRICS_FOLDER, + # metrics = Metrics + # ) metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column,