From bf06636a58e9927437efc1b34fdf5cfd075f899b Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 11:27:20 +0100 Subject: [PATCH] added s3fs client to handler the connection to s3 --- .../simulation_system/MLModel/BaseMLModel.py | 5 +- .../simulation_system/MLModel/Models.py | 30 ++++-- .../simulation_system/core/CloudClient.py | 65 +++++++++++++ .../simulation_system/core/DataLoader.py | 5 +- model_data/simulation_system/core/Metrics.py | 14 +++ .../simulation_system/core/RegistryHandler.py | 83 ++++++++++++++++ model_data/simulation_system/core/Settings.py | 2 +- .../simulation_system/docker-compose.yml | 26 ++--- model_data/simulation_system/training.py | 95 ++++++++----------- 9 files changed, 246 insertions(+), 79 deletions(-) create mode 100644 model_data/simulation_system/core/CloudClient.py create mode 100644 model_data/simulation_system/core/RegistryHandler.py diff --git a/model_data/simulation_system/MLModel/BaseMLModel.py b/model_data/simulation_system/MLModel/BaseMLModel.py index e631880d..aa10e26f 100644 --- a/model_data/simulation_system/MLModel/BaseMLModel.py +++ b/model_data/simulation_system/MLModel/BaseMLModel.py @@ -13,6 +13,7 @@ from numpy import ndarray from pathlib import Path from typing import Protocol, NamedTuple, Any import pandas as pd +from training import S3FSClient class MLModel(Protocol): @@ -25,7 +26,9 @@ class MLModel(Protocol): Providing a path, this function will load the model to be used. Will load to internal variable """ - def save_model(self, output_filepath: Path) -> None: + def save_model( + self, output_filepath: Path, s3_client: S3FSClient | None = None + ) -> None: """ Providing a path, this function will save the model to be used. """ diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index dcf6a387..8928919a 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -12,11 +12,10 @@ from typing import Any from pathlib import Path import pandas as pd from autogluon.tabular import TabularDataset, TabularPredictor -from sklearn.metrics import mean_absolute_percentage_error from core.Logger import logger from core.Metrics import Metrics from core.Settings import METRIC_FILENAME -from MLModel.BaseMLModel import MLModel +from core.CloudClient import S3FSClient AUTOGLUON_HYPERPARAMETERS = [ "problem_type", @@ -52,19 +51,34 @@ class AutogluonModel: self.output_filepath = output_filepath self.predictions = None - def load_model(self, filepath: str | Path) -> None: + def load_model( + self, filepath: str | Path, s3_client: S3FSClient | None = None + ) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ - filepath = str(filepath) + if s3_client is None: + logger.info("In local development mode - no need for s3 client") + filepath = str(filepath) + self.model = TabularPredictor.load(path=filepath) + else: + pass + # logger.info(f"Loading model from s3") + # s3_client.download_model(filepath=filepath, local_filepath=) + # self.model = - self.model = TabularPredictor.load(path=filepath) - - def save_model(self, output_filepath: Path | None = None) -> None: + def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: """ Providing a path, this function will save the model to be used. """ - logger.info("Using AutoGluon Model - Model saving already occured") + if s3fs_client.client is None: + logger.info("In local development mode - no need for s3 client") + logger.info("Using AutoGluon Model - Model saving already occured") + else: + logger.info(f"Saving model into s3") + s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) + s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + logger.info("Save complete") def train_model( self, data: pd.DataFrame, target_column: str, hyperparameters: dict diff --git a/model_data/simulation_system/core/CloudClient.py b/model_data/simulation_system/core/CloudClient.py new file mode 100644 index 00000000..eb4b2bc6 --- /dev/null +++ b/model_data/simulation_system/core/CloudClient.py @@ -0,0 +1,65 @@ +""" +Set up the client to be used for downloading and uploading model files +""" + +import os +import s3fs +from core.Logger import logger + + +class S3FSClient: + """ + Set up the correct client to upload files to s3 + """ + + def __init__(self, runtime_environment: str = "local") -> None: + self.client: s3fs.S3FileSystem | None = None + self.model_bucket: str + + self.client_factory(runtime_environment) + self.determine_model_bucket(runtime_environment) + + def client_factory(self, runtime_environment: str = "local"): + """ + Select the correct s3 client to use + """ + + if runtime_environment == "local": + logger.info("No S3 client setup required") + elif runtime_environment == "local-mock": + logger.info(f"S3 settings for {runtime_environment}") + self.client = s3fs.S3FileSystem( + key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), + secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), + client_kwargs={ + "endpoint_url": os.environ.get( + "ENDPOINT_URL", "http://localhost:9000" + ) + }, + ) + elif runtime_environment in ["dev", "staging", "prod"]: + logger.info(f"S3 settings for {runtime_environment}") + # Key/ token should be in session/lambda for this + self.client = s3fs.S3FileSystem() + else: + raise NotImplementedError("No correspnding runtime environment") + + def determine_model_bucket(self, runtime_environment: str) -> None: + """ + For the given environment, return the correct bucket for models + """ + if runtime_environment == "local": + logger.info("In local development - no need for s3") + elif runtime_environment in ["local-mock", "dev"]: + self.model_bucket = "retrofit-model-directory-dev" + elif runtime_environment in ["staging", "prod"]: + self.model_bucket = f"retrofit-model-directory-{runtime_environment}" + else: + raise NotImplementedError("No corresponding runtime environment") + + # def download_model(self, filepath: str, local_filepath: str = "."): + # """ + # For the file path, download the model locally so that we can load the model + # """ + # if local_filepath is None: + # self.local_filepath = filepath diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 352815bf..9cf1c2f2 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -55,6 +55,9 @@ class S3MockDataLoader: }, } + if not filepath.startswith("s3://"): + filepath = "s3://" + filepath + if filepath.endswith(".parquet"): df = pd.read_parquet(filepath, storage_options=storage_options) if index_col is not None: @@ -107,7 +110,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: dataloader_types = { "local": LocalDataLoader(), "local-mock": S3MockDataLoader(), - "dev": S3DataLoader(), + "dev": S3MockDataLoader(), "staging": S3DataLoader(), "prod": S3DataLoader(), } diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py index 99edae92..d1382d94 100644 --- a/model_data/simulation_system/core/Metrics.py +++ b/model_data/simulation_system/core/Metrics.py @@ -9,6 +9,8 @@ import pandas as pd from pathlib import Path import seaborn as sns import matplotlib.pyplot as plt +from core.CloudClient import S3FSClient +from core.Logger import logger from core.Settings import ( RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL, @@ -62,6 +64,18 @@ class Metrics: All metric functions used to generate a dictionary of metrics """ + def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + """ + Providing a path, this function will save the metrics folders/files. + """ + if s3fs_client.client is None: + logger.info("In local development mode - no need to upload") + else: + logger.info(f"Saving metrics into s3") + s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) + s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + logger.info("Save complete") + @staticmethod def list_metric_functions() -> list: """ diff --git a/model_data/simulation_system/core/RegistryHandler.py b/model_data/simulation_system/core/RegistryHandler.py new file mode 100644 index 00000000..e492b2d2 --- /dev/null +++ b/model_data/simulation_system/core/RegistryHandler.py @@ -0,0 +1,83 @@ +""" + +""" + +import pandas as pd +from pathlib import Path +from core.Logger import logger +from core.CloudClient import S3FSClient +from core.Metrics import Metrics +from core.Settings import BEST_MODEL_COLUMN_NAME + + +class RegistryHandler: + """ + Handles the loading of the registry depending on the environment + """ + + def load_registry( + self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics + ): + """ + Depening on the environment, we will have to load from locally or s3 (mock/real) + """ + + if s3fs_client.client is None: + logger.info("Using local development - no need for s3 load") + return self.load_local_registry( + registry_path=registry_path, metrics=metrics + ) + + s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path) + + logger.info(f"Check if registry exists") + if s3fs_client.client.exists(s3_location): + registry_df = pd.read_csv( + s3fs_client.client.open(s3_location), index_col=None + ) + else: + logger.info("No registry found - creating new one") + registry_df = self.create_new_registry(metrics=metrics) + + return registry_df + + def load_local_registry(self, registry_path: Path, metrics: Metrics): + """ + In local development mode, load the registry + """ + if registry_path.exists(): + logger.info("Registry file found - Loading into Dataframe") + registry_df = pd.read_csv(registry_path, index_col=None) + else: + logger.info("No registry found - creating new one") + registry_df = self.create_new_registry(metrics=metrics) + + return registry_df + + def create_new_registry(self, metrics: Metrics): + """ + If no registry is found, create a new one + """ + # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns + columns = [ + BEST_MODEL_COLUMN_NAME, + "model_type", + "model_name", + "model_location", + ] + metrics.list_metric_functions() + + registry_df = pd.DataFrame(columns=columns) + + return registry_df + + def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + """ + Providing a path, this function will save the model to be used. + """ + if s3fs_client.client is None: + logger.info("In local development mode - no need for s3 client") + else: + logger.info(f"Saving registry into s3") + s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) + s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + logger.info("Save complete") diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index f595a3d2..7765f64a 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -26,7 +26,7 @@ MODEL_HYPERPARAMETERS = { "autogluon": { "problem_type": "regression", "eval_metric": "mean_absolute_error", - "time_limit": 30, + "time_limit": 45, "presets": "medium_quality", "excluded_model_types": None, } diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index c801473b..058dc062 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -18,19 +18,19 @@ services: timeout: 20s retries: 3 - simulation_system_training: - build: - context: ./ - dockerfile: ./Dockerfiles/Dockerfile.training - image: simulation_system_training - environment: - ENDPOINT_URL: http://minio:9000/ - AWS_ACCESS_KEY_ID: *MINIO_USER - AWS_SECRET_ACCESS_KEY: *MINIO_PASS - tty: true - depends_on: - minio: - condition: service_healthy + # simulation_system_training: + # build: + # context: ./ + # dockerfile: ./Dockerfiles/Dockerfile.training + # image: simulation_system_training + # environment: + # ENDPOINT_URL: http://minio:9000/ + # AWS_ACCESS_KEY_ID: *MINIO_USER + # AWS_SECRET_ACCESS_KEY: *MINIO_PASS + # tty: true + # depends_on: + # minio: + # condition: service_healthy # command: # ["bash"] diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 47f70772..71979b8b 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -1,10 +1,7 @@ import argparse import os - -# from s3pathlib import S3Path - -# import boto3 +import shutil from pathlib import Path from datetime import datetime import pandas as pd @@ -13,9 +10,10 @@ from core.Logger import logger from core.Metrics import Metrics, sort_by_metric from core.DataLoader import dataloader_factory from core.FeatureProcessor import FeatureProcessor +from core.CloudClient import S3FSClient +from core.RegistryHandler import RegistryHandler from core.Settings import ( MODEL_DIRECTORY, - BASE_REGISTRY_PATH, REGISTRY_FILE, MODEL_FOLDER, METRICS_FOLDER, @@ -30,15 +28,10 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) -RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") + +CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) -# STORAGE_OPTIONS = { -# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), -# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), -# "client_kwargs": { -# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") -# } -# } # FOR TESTING # train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet" @@ -48,22 +41,16 @@ RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") # hyperparameter = HYPERPARAMETERS # SUBSAMPLE_FACTOR = 200 -# SESSION = boto3.Session() +# Mock location +# train_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet" +# test_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" -# S3_CLIENT = SESSION.client( -# service_name="s3", -# aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), -# aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), -# endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000") -# ) -# S3_CLIENT.create_bucket -# S3_CLIENT.list_buckets() +# To run script in local mode: +# python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet -# df = pd.read_parquet( -# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet", -# storage_options=STORAGE_OPTIONS -# ) +# To run script in local-mock mode: +# python3 training.py --train-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet def ingest_arguments() -> argparse.Namespace: @@ -166,27 +153,34 @@ def training( ) logger.info("--- Save Model ---") - model.save_model(output_filepath=model.output_filepath) + model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT) logger.info("--- Generate evaluation metrics ---") metrics = Metrics() + metric_output_path = output_base / METRICS_FOLDER metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column, - metrics_location=output_base / METRICS_FOLDER, + metrics_location=metric_output_path, metrics=metrics, ) + metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT) + logger.info("--- Generate metric outputs using predictions ---") # metrics.generate_plot_suite() + + plot_output_path = output_base / METRICS_FOLDER / RESIDUAL_FILE metrics.generate_residual_plot( actuals=test_df[target_column], predictions=model.predictions, target_column=target_column, - output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE, + output_filepath=plot_output_path, ) + metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT) + # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host @@ -195,35 +189,27 @@ def training( logger.info("--- Optimising model for deployment ---") - deployment_model_path = model.optimise_model_for_deployment( - deployment_path=output_base / DEPLOYMENT_FOLDER - ) + deployment_model_path = output_base / DEPLOYMENT_FOLDER + + model.optimise_model_for_deployment(deployment_path=deployment_model_path) logger.info( f"Optimised version of best model can be found at: {deployment_model_path}" ) + model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT) + # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory # Loading registry from s3 logger.info("--- Append registry with new model ---") - registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE - # registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri - # registry = RegistryHandler(location=registry_path) + registry_handler = RegistryHandler() - if registry_path.exists(): - logger.info("Registry file found - Loading into Dataframe") - registry_df = pd.read_csv(registry_path, index_col=None) - else: - # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns - columns = [ - BEST_MODEL_COLUMN_NAME, - "model_type", - "model_name", - "model_location", - ] + metrics.list_metric_functions() + registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE - registry_df = pd.DataFrame(columns=columns) + registry_df = registry_handler.load_registry( + registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics + ) model_details_df = pd.DataFrame( [ @@ -238,10 +224,6 @@ def training( registry_row = pd.concat([model_details_df, metrics_df], axis=1) registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True) - # TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and - # regenerate new metrics - # TODO: decide metric to optimise to - registry_df = sort_by_metric( registry_df, optimse_metric=OPTIMISE_METRIC, @@ -253,6 +235,13 @@ def training( registry_path.parent.mkdir(parents=True, exist_ok=True) registry_df.to_csv(registry_path, index=False) + registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT) + + logger.info("--- Clean up ---") + if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists(): + logger.info("Removing local development files not in s3") + shutil.rmtree(Path(MODEL_DIRECTORY)) + logger.info("--- Training Pipeline Complete --- ") @@ -263,10 +252,6 @@ if __name__ == "__main__": logger.info("---Ingest Arguments---") args = ingest_arguments() - # To run script: python3 training.py --train-filepath - # ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath - # ./model_build_data/change_data/rdsap_full/test_data.parquet - # TODO: Ingest hyper parameters from somewhere - currently change at the top of script training( train_filepath=args.train_filepath, test_filepath=args.test_filepath,