From bf06636a58e9927437efc1b34fdf5cfd075f899b Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 11:27:20 +0100 Subject: [PATCH 1/8] added s3fs client to handler the connection to s3 --- .../simulation_system/MLModel/BaseMLModel.py | 5 +- .../simulation_system/MLModel/Models.py | 30 ++++-- .../simulation_system/core/CloudClient.py | 65 +++++++++++++ .../simulation_system/core/DataLoader.py | 5 +- model_data/simulation_system/core/Metrics.py | 14 +++ .../simulation_system/core/RegistryHandler.py | 83 ++++++++++++++++ model_data/simulation_system/core/Settings.py | 2 +- .../simulation_system/docker-compose.yml | 26 ++--- model_data/simulation_system/training.py | 95 ++++++++----------- 9 files changed, 246 insertions(+), 79 deletions(-) create mode 100644 model_data/simulation_system/core/CloudClient.py create mode 100644 model_data/simulation_system/core/RegistryHandler.py diff --git a/model_data/simulation_system/MLModel/BaseMLModel.py b/model_data/simulation_system/MLModel/BaseMLModel.py index e631880d..aa10e26f 100644 --- a/model_data/simulation_system/MLModel/BaseMLModel.py +++ b/model_data/simulation_system/MLModel/BaseMLModel.py @@ -13,6 +13,7 @@ from numpy import ndarray from pathlib import Path from typing import Protocol, NamedTuple, Any import pandas as pd +from training import S3FSClient class MLModel(Protocol): @@ -25,7 +26,9 @@ class MLModel(Protocol): Providing a path, this function will load the model to be used. Will load to internal variable """ - def save_model(self, output_filepath: Path) -> None: + def save_model( + self, output_filepath: Path, s3_client: S3FSClient | None = None + ) -> None: """ Providing a path, this function will save the model to be used. """ diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index dcf6a387..8928919a 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -12,11 +12,10 @@ from typing import Any from pathlib import Path import pandas as pd from autogluon.tabular import TabularDataset, TabularPredictor -from sklearn.metrics import mean_absolute_percentage_error from core.Logger import logger from core.Metrics import Metrics from core.Settings import METRIC_FILENAME -from MLModel.BaseMLModel import MLModel +from core.CloudClient import S3FSClient AUTOGLUON_HYPERPARAMETERS = [ "problem_type", @@ -52,19 +51,34 @@ class AutogluonModel: self.output_filepath = output_filepath self.predictions = None - def load_model(self, filepath: str | Path) -> None: + def load_model( + self, filepath: str | Path, s3_client: S3FSClient | None = None + ) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ - filepath = str(filepath) + if s3_client is None: + logger.info("In local development mode - no need for s3 client") + filepath = str(filepath) + self.model = TabularPredictor.load(path=filepath) + else: + pass + # logger.info(f"Loading model from s3") + # s3_client.download_model(filepath=filepath, local_filepath=) + # self.model = - self.model = TabularPredictor.load(path=filepath) - - def save_model(self, output_filepath: Path | None = None) -> None: + def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: """ Providing a path, this function will save the model to be used. """ - logger.info("Using AutoGluon Model - Model saving already occured") + if s3fs_client.client is None: + logger.info("In local development mode - no need for s3 client") + logger.info("Using AutoGluon Model - Model saving already occured") + else: + logger.info(f"Saving model into s3") + s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) + s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + logger.info("Save complete") def train_model( self, data: pd.DataFrame, target_column: str, hyperparameters: dict diff --git a/model_data/simulation_system/core/CloudClient.py b/model_data/simulation_system/core/CloudClient.py new file mode 100644 index 00000000..eb4b2bc6 --- /dev/null +++ b/model_data/simulation_system/core/CloudClient.py @@ -0,0 +1,65 @@ +""" +Set up the client to be used for downloading and uploading model files +""" + +import os +import s3fs +from core.Logger import logger + + +class S3FSClient: + """ + Set up the correct client to upload files to s3 + """ + + def __init__(self, runtime_environment: str = "local") -> None: + self.client: s3fs.S3FileSystem | None = None + self.model_bucket: str + + self.client_factory(runtime_environment) + self.determine_model_bucket(runtime_environment) + + def client_factory(self, runtime_environment: str = "local"): + """ + Select the correct s3 client to use + """ + + if runtime_environment == "local": + logger.info("No S3 client setup required") + elif runtime_environment == "local-mock": + logger.info(f"S3 settings for {runtime_environment}") + self.client = s3fs.S3FileSystem( + key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), + secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), + client_kwargs={ + "endpoint_url": os.environ.get( + "ENDPOINT_URL", "http://localhost:9000" + ) + }, + ) + elif runtime_environment in ["dev", "staging", "prod"]: + logger.info(f"S3 settings for {runtime_environment}") + # Key/ token should be in session/lambda for this + self.client = s3fs.S3FileSystem() + else: + raise NotImplementedError("No correspnding runtime environment") + + def determine_model_bucket(self, runtime_environment: str) -> None: + """ + For the given environment, return the correct bucket for models + """ + if runtime_environment == "local": + logger.info("In local development - no need for s3") + elif runtime_environment in ["local-mock", "dev"]: + self.model_bucket = "retrofit-model-directory-dev" + elif runtime_environment in ["staging", "prod"]: + self.model_bucket = f"retrofit-model-directory-{runtime_environment}" + else: + raise NotImplementedError("No corresponding runtime environment") + + # def download_model(self, filepath: str, local_filepath: str = "."): + # """ + # For the file path, download the model locally so that we can load the model + # """ + # if local_filepath is None: + # self.local_filepath = filepath diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 352815bf..9cf1c2f2 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -55,6 +55,9 @@ class S3MockDataLoader: }, } + if not filepath.startswith("s3://"): + filepath = "s3://" + filepath + if filepath.endswith(".parquet"): df = pd.read_parquet(filepath, storage_options=storage_options) if index_col is not None: @@ -107,7 +110,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: dataloader_types = { "local": LocalDataLoader(), "local-mock": S3MockDataLoader(), - "dev": S3DataLoader(), + "dev": S3MockDataLoader(), "staging": S3DataLoader(), "prod": S3DataLoader(), } diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py index 99edae92..d1382d94 100644 --- a/model_data/simulation_system/core/Metrics.py +++ b/model_data/simulation_system/core/Metrics.py @@ -9,6 +9,8 @@ import pandas as pd from pathlib import Path import seaborn as sns import matplotlib.pyplot as plt +from core.CloudClient import S3FSClient +from core.Logger import logger from core.Settings import ( RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL, @@ -62,6 +64,18 @@ class Metrics: All metric functions used to generate a dictionary of metrics """ + def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + """ + Providing a path, this function will save the metrics folders/files. + """ + if s3fs_client.client is None: + logger.info("In local development mode - no need to upload") + else: + logger.info(f"Saving metrics into s3") + s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) + s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + logger.info("Save complete") + @staticmethod def list_metric_functions() -> list: """ diff --git a/model_data/simulation_system/core/RegistryHandler.py b/model_data/simulation_system/core/RegistryHandler.py new file mode 100644 index 00000000..e492b2d2 --- /dev/null +++ b/model_data/simulation_system/core/RegistryHandler.py @@ -0,0 +1,83 @@ +""" + +""" + +import pandas as pd +from pathlib import Path +from core.Logger import logger +from core.CloudClient import S3FSClient +from core.Metrics import Metrics +from core.Settings import BEST_MODEL_COLUMN_NAME + + +class RegistryHandler: + """ + Handles the loading of the registry depending on the environment + """ + + def load_registry( + self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics + ): + """ + Depening on the environment, we will have to load from locally or s3 (mock/real) + """ + + if s3fs_client.client is None: + logger.info("Using local development - no need for s3 load") + return self.load_local_registry( + registry_path=registry_path, metrics=metrics + ) + + s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path) + + logger.info(f"Check if registry exists") + if s3fs_client.client.exists(s3_location): + registry_df = pd.read_csv( + s3fs_client.client.open(s3_location), index_col=None + ) + else: + logger.info("No registry found - creating new one") + registry_df = self.create_new_registry(metrics=metrics) + + return registry_df + + def load_local_registry(self, registry_path: Path, metrics: Metrics): + """ + In local development mode, load the registry + """ + if registry_path.exists(): + logger.info("Registry file found - Loading into Dataframe") + registry_df = pd.read_csv(registry_path, index_col=None) + else: + logger.info("No registry found - creating new one") + registry_df = self.create_new_registry(metrics=metrics) + + return registry_df + + def create_new_registry(self, metrics: Metrics): + """ + If no registry is found, create a new one + """ + # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns + columns = [ + BEST_MODEL_COLUMN_NAME, + "model_type", + "model_name", + "model_location", + ] + metrics.list_metric_functions() + + registry_df = pd.DataFrame(columns=columns) + + return registry_df + + def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + """ + Providing a path, this function will save the model to be used. + """ + if s3fs_client.client is None: + logger.info("In local development mode - no need for s3 client") + else: + logger.info(f"Saving registry into s3") + s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) + s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + logger.info("Save complete") diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index f595a3d2..7765f64a 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -26,7 +26,7 @@ MODEL_HYPERPARAMETERS = { "autogluon": { "problem_type": "regression", "eval_metric": "mean_absolute_error", - "time_limit": 30, + "time_limit": 45, "presets": "medium_quality", "excluded_model_types": None, } diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index c801473b..058dc062 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -18,19 +18,19 @@ services: timeout: 20s retries: 3 - simulation_system_training: - build: - context: ./ - dockerfile: ./Dockerfiles/Dockerfile.training - image: simulation_system_training - environment: - ENDPOINT_URL: http://minio:9000/ - AWS_ACCESS_KEY_ID: *MINIO_USER - AWS_SECRET_ACCESS_KEY: *MINIO_PASS - tty: true - depends_on: - minio: - condition: service_healthy + # simulation_system_training: + # build: + # context: ./ + # dockerfile: ./Dockerfiles/Dockerfile.training + # image: simulation_system_training + # environment: + # ENDPOINT_URL: http://minio:9000/ + # AWS_ACCESS_KEY_ID: *MINIO_USER + # AWS_SECRET_ACCESS_KEY: *MINIO_PASS + # tty: true + # depends_on: + # minio: + # condition: service_healthy # command: # ["bash"] diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 47f70772..71979b8b 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -1,10 +1,7 @@ import argparse import os - -# from s3pathlib import S3Path - -# import boto3 +import shutil from pathlib import Path from datetime import datetime import pandas as pd @@ -13,9 +10,10 @@ from core.Logger import logger from core.Metrics import Metrics, sort_by_metric from core.DataLoader import dataloader_factory from core.FeatureProcessor import FeatureProcessor +from core.CloudClient import S3FSClient +from core.RegistryHandler import RegistryHandler from core.Settings import ( MODEL_DIRECTORY, - BASE_REGISTRY_PATH, REGISTRY_FILE, MODEL_FOLDER, METRICS_FOLDER, @@ -30,15 +28,10 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) -RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") + +CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) -# STORAGE_OPTIONS = { -# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), -# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), -# "client_kwargs": { -# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") -# } -# } # FOR TESTING # train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet" @@ -48,22 +41,16 @@ RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") # hyperparameter = HYPERPARAMETERS # SUBSAMPLE_FACTOR = 200 -# SESSION = boto3.Session() +# Mock location +# train_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet" +# test_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" -# S3_CLIENT = SESSION.client( -# service_name="s3", -# aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), -# aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), -# endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000") -# ) -# S3_CLIENT.create_bucket -# S3_CLIENT.list_buckets() +# To run script in local mode: +# python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet -# df = pd.read_parquet( -# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet", -# storage_options=STORAGE_OPTIONS -# ) +# To run script in local-mock mode: +# python3 training.py --train-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet def ingest_arguments() -> argparse.Namespace: @@ -166,27 +153,34 @@ def training( ) logger.info("--- Save Model ---") - model.save_model(output_filepath=model.output_filepath) + model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT) logger.info("--- Generate evaluation metrics ---") metrics = Metrics() + metric_output_path = output_base / METRICS_FOLDER metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column, - metrics_location=output_base / METRICS_FOLDER, + metrics_location=metric_output_path, metrics=metrics, ) + metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT) + logger.info("--- Generate metric outputs using predictions ---") # metrics.generate_plot_suite() + + plot_output_path = output_base / METRICS_FOLDER / RESIDUAL_FILE metrics.generate_residual_plot( actuals=test_df[target_column], predictions=model.predictions, target_column=target_column, - output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE, + output_filepath=plot_output_path, ) + metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT) + # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host @@ -195,35 +189,27 @@ def training( logger.info("--- Optimising model for deployment ---") - deployment_model_path = model.optimise_model_for_deployment( - deployment_path=output_base / DEPLOYMENT_FOLDER - ) + deployment_model_path = output_base / DEPLOYMENT_FOLDER + + model.optimise_model_for_deployment(deployment_path=deployment_model_path) logger.info( f"Optimised version of best model can be found at: {deployment_model_path}" ) + model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT) + # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory # Loading registry from s3 logger.info("--- Append registry with new model ---") - registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE - # registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri - # registry = RegistryHandler(location=registry_path) + registry_handler = RegistryHandler() - if registry_path.exists(): - logger.info("Registry file found - Loading into Dataframe") - registry_df = pd.read_csv(registry_path, index_col=None) - else: - # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns - columns = [ - BEST_MODEL_COLUMN_NAME, - "model_type", - "model_name", - "model_location", - ] + metrics.list_metric_functions() + registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE - registry_df = pd.DataFrame(columns=columns) + registry_df = registry_handler.load_registry( + registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics + ) model_details_df = pd.DataFrame( [ @@ -238,10 +224,6 @@ def training( registry_row = pd.concat([model_details_df, metrics_df], axis=1) registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True) - # TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and - # regenerate new metrics - # TODO: decide metric to optimise to - registry_df = sort_by_metric( registry_df, optimse_metric=OPTIMISE_METRIC, @@ -253,6 +235,13 @@ def training( registry_path.parent.mkdir(parents=True, exist_ok=True) registry_df.to_csv(registry_path, index=False) + registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT) + + logger.info("--- Clean up ---") + if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists(): + logger.info("Removing local development files not in s3") + shutil.rmtree(Path(MODEL_DIRECTORY)) + logger.info("--- Training Pipeline Complete --- ") @@ -263,10 +252,6 @@ if __name__ == "__main__": logger.info("---Ingest Arguments---") args = ingest_arguments() - # To run script: python3 training.py --train-filepath - # ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath - # ./model_build_data/change_data/rdsap_full/test_data.parquet - # TODO: Ingest hyper parameters from somewhere - currently change at the top of script training( train_filepath=args.train_filepath, test_filepath=args.test_filepath, From f79e18a6a033118d579e2f919b4d094089b859ce Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 11:28:06 +0100 Subject: [PATCH 2/8] added s3fs client to handler the connection to s3 --- model_data/simulation_system/core/DataLoader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 9cf1c2f2..428e1d7f 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -110,7 +110,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: dataloader_types = { "local": LocalDataLoader(), "local-mock": S3MockDataLoader(), - "dev": S3MockDataLoader(), + "dev": S3DataLoader(), "staging": S3DataLoader(), "prod": S3DataLoader(), } From 9c50c9dcfcb3635cb23dc945f09a83b107171bcb Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 11:35:00 +0100 Subject: [PATCH 3/8] added s3fs client to handler the connection to s3 --- model_data/simulation_system/requirements/training/training.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/model_data/simulation_system/requirements/training/training.txt b/model_data/simulation_system/requirements/training/training.txt index 17e4c8da..cb2e1f87 100644 --- a/model_data/simulation_system/requirements/training/training.txt +++ b/model_data/simulation_system/requirements/training/training.txt @@ -1,3 +1,4 @@ autogluon==0.8.2 pandas==1.5.3 seaborn==0.12.2 +s3fs==2023.6.0 From 18a32471e75efca06c3046bd955f6d0c453efd52 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 11:35:36 +0100 Subject: [PATCH 4/8] added s3fs client to handler the connection to s3 - add more requirements --- .../requirements/predictions/predictions-dev.txt | 1 + .../simulation_system/requirements/training/training-dev.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/model_data/simulation_system/requirements/predictions/predictions-dev.txt b/model_data/simulation_system/requirements/predictions/predictions-dev.txt index 0574e0f5..48bbb3ca 100644 --- a/model_data/simulation_system/requirements/predictions/predictions-dev.txt +++ b/model_data/simulation_system/requirements/predictions/predictions-dev.txt @@ -1,3 +1,4 @@ autogluon==0.8.2 pandas==1.5.3 +s3fs==2023.6.0 pre-commit==3.3.3 diff --git a/model_data/simulation_system/requirements/training/training-dev.txt b/model_data/simulation_system/requirements/training/training-dev.txt index ea205270..bcba4f18 100644 --- a/model_data/simulation_system/requirements/training/training-dev.txt +++ b/model_data/simulation_system/requirements/training/training-dev.txt @@ -1,4 +1,5 @@ autogluon==0.8.2 pandas==1.5.3 seaborn==0.12.2 +s3fs==2023.6.0 pre-commit==3.3.3 From 8f94628031d30dadd8408819e112ab7bf325c75f Mon Sep 17 00:00:00 2001 From: quandanrepo <45804868+quandanrepo@users.noreply.github.com> Date: Fri, 1 Sep 2023 11:36:32 +0100 Subject: [PATCH 5/8] Update cml.yml --- .github/workflows/cml.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cml.yml b/.github/workflows/cml.yml index bf361dd3..be06486b 100644 --- a/.github/workflows/cml.yml +++ b/.github/workflows/cml.yml @@ -17,7 +17,7 @@ jobs: run: | ls cd model_data/simulation_system - pip install -r requirements.txt + pip install -r requirements/training/training.txt python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet cd model_directory/RDSAP_CHANGE From 31ce4a670e5ccd25607b1326341b1e04fd309b70 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 11:42:38 +0100 Subject: [PATCH 6/8] added s3fs client to handler the connection to s3 - cahnge to local --- model_data/simulation_system/training.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 71979b8b..4cc19ed3 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -28,7 +28,7 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) -RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) From 7037625e7f4dee068834d22e8077845b8fc79340 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 15:42:04 +0100 Subject: [PATCH 7/8] added download and load for predcitions --- .../simulation_system/MLModel/Models.py | 16 ++--- .../simulation_system/core/CloudClient.py | 42 +++++++++++-- .../simulation_system/core/DataLoader.py | 62 ++++++++++++++++--- .../handlers/predictions_app.py | 2 +- model_data/simulation_system/predictions.py | 16 +++-- model_data/simulation_system/training.py | 2 +- 6 files changed, 112 insertions(+), 28 deletions(-) diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index 8928919a..4a520e4b 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -52,20 +52,22 @@ class AutogluonModel: self.predictions = None def load_model( - self, filepath: str | Path, s3_client: S3FSClient | None = None + self, + filepath: str | Path, + s3_client: S3FSClient, + model_folder: str = "local_model", ) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ - if s3_client is None: + filepath = str(filepath) + if s3_client.client is None: logger.info("In local development mode - no need for s3 client") - filepath = str(filepath) self.model = TabularPredictor.load(path=filepath) else: - pass - # logger.info(f"Loading model from s3") - # s3_client.download_model(filepath=filepath, local_filepath=) - # self.model = + logger.info(f"Loading model from s3") + s3_client.download_model(filepath=filepath, model_folder=model_folder) + self.model = TabularPredictor.load(path=model_folder) def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: """ diff --git a/model_data/simulation_system/core/CloudClient.py b/model_data/simulation_system/core/CloudClient.py index eb4b2bc6..5af65da5 100644 --- a/model_data/simulation_system/core/CloudClient.py +++ b/model_data/simulation_system/core/CloudClient.py @@ -51,15 +51,45 @@ class S3FSClient: if runtime_environment == "local": logger.info("In local development - no need for s3") elif runtime_environment in ["local-mock", "dev"]: + # TODO: get from enironment self.model_bucket = "retrofit-model-directory-dev" elif runtime_environment in ["staging", "prod"]: self.model_bucket = f"retrofit-model-directory-{runtime_environment}" else: raise NotImplementedError("No corresponding runtime environment") - # def download_model(self, filepath: str, local_filepath: str = "."): - # """ - # For the file path, download the model locally so that we can load the model - # """ - # if local_filepath is None: - # self.local_filepath = filepath + def download_model(self, filepath: str, model_folder: str): + """ + For the file path, download the model locally so that we can load the model + """ + + if self.client is None: + logger.info("No need to download model as local development") + else: + + def list_files_recursively(folder_path, client): + all_files = [] + for root, dirs, files in client.walk(folder_path): + for file in files: + s3_path = os.path.join(root, file) + all_files.append(s3_path) + return all_files + + # List all files in the specified S3 folder and its subfolders + files = list_files_recursively( + f"{self.model_bucket}/{filepath}", client=self.client + ) + + # Download each file + for file in files: + # Extract the filename from the S3 path + filename = file.split(filepath)[-1] + + # Define the local path where you want to save the file + local_path = os.path.join("local_model", filename) + + # Download the file from S3 to the local directory + self.client.get(file, local_path) + print(f"Downloaded {filename} to {local_path}") + + print("Download completed.") diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 428e1d7f..a6c9cfc0 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -1,6 +1,54 @@ import pandas as pd import os from typing import Protocol +import boto3 +from io import BytesIO, StringIO + + +def read_parquet_from_s3(bucket_name, file_key): + """ + Read a CSV file from S3 using boto3 and pandas. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :param aws_access_key_id: AWS Access Key ID + :param aws_secret_access_key: AWS Secret Access Key + :return: DataFrame containing the CSV data. + """ + # Initialize the S3 client + s3_client = boto3.client("s3") + + # Get the object + s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read() + df = pd.read_parquet(BytesIO(csv_body)) + + return df + + +def read_csv_from_s3(bucket_name, file_key, index_col): + """ + Read a CSV file from S3 using boto3 and pandas. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :param aws_access_key_id: AWS Access Key ID + :param aws_secret_access_key: AWS Secret Access Key + :return: DataFrame containing the CSV data. + """ + # Initialize the S3 client + s3_client = boto3.client("s3") + + # Get the object + s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read().decode("utf-8") + df = pd.read_csv(StringIO(csv_body), index_col=index_col) + + return df class DataLoader(Protocol): @@ -80,19 +128,15 @@ class S3DataLoader: @staticmethod def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: - storage_options = { - "key": os.environ.get("AWS_ACCESS_KEY_ID"), - "secret": os.environ.get("AWS_SECRET_ACCESS_KEY"), - } - + filepath_split = filepath.split("s3://")[-1].split("/", 1) + bucket = filepath_split[0] + key = filepath_split[1] if filepath.endswith(".parquet"): - df = pd.read_parquet(filepath, storage_options=storage_options) + df = read_parquet_from_s3(bucket, key) if index_col is not None: df = df.set_index(index_col) elif filepath.endswith(".csv"): - df = pd.read_csv( - filepath, index_col=index_col, storage_options=storage_options - ) + df = read_csv_from_s3(bucket, key, index_col) else: raise ValueError(f"File format not supported for file: {filepath}") diff --git a/model_data/simulation_system/handlers/predictions_app.py b/model_data/simulation_system/handlers/predictions_app.py index 02b1673f..2b47c0db 100644 --- a/model_data/simulation_system/handlers/predictions_app.py +++ b/model_data/simulation_system/handlers/predictions_app.py @@ -30,7 +30,7 @@ def handler(event, context): # model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/") model_path = os.environ.get( "MODEL_PATH", - "s3://retrofit-model-directory-{RUNTIME_ENVIRONMENT}/RDSAP_CHANGE/autogluon/rdsap_change-medium_quality-30" + f"s3://retrofit-model-directory-{RUNTIME_ENVIRONMENT}/RDSAP_CHANGE/autogluon/rdsap_change-medium_quality-30" "-2023-08-30_11-43-41/deployment/", ) diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index 22104993..da987978 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -11,6 +11,7 @@ from datetime import datetime from MLModel.Models import AutogluonModel from core.Logger import logger from core.DataLoader import dataloader_factory +from core.CloudClient import S3FSClient from core.Settings import ( BASE_REGISTRY_PATH, REGISTRY_FILE, @@ -23,12 +24,19 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") +CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) + # FOR TESTING # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to # DataFrame) # TEST_DATA = DataLoader.load(filepath="../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet") # DATA = TEST_DATA.sample(1) +# For testing in dev s3 +# Data path can be passed as so: +# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet +# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" + def ingest_arguments() -> argparse.Namespace: """ @@ -75,9 +83,7 @@ def prediction( logger.error("No registry path provided or registry doesn't exist") exit(1) elif RUNTIME_ENVIRONMENT == "dev": - registry_path = ( - "s3://retrofit-model-directory-dev/RDSAP_CHANGE/model_registry.csv" - ) + registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv" else: raise NotImplemented("TO be implemented") @@ -130,7 +136,9 @@ def prediction( logger.error("No other model currently") exit(1) - model.load_model(filepath=model_location) + model.load_model( + filepath=model_location, s3_client=CLIENT, model_folder="local_model" + ) logger.info("--- Generating Predictions ---") prediction = model.generate_predictions(data=data) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 4cc19ed3..71979b8b 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -28,7 +28,7 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) -RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) From baec5e7cc0ffecc033cadf3ca2e1908e71f9fc47 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 1 Sep 2023 15:50:53 +0100 Subject: [PATCH 8/8] change defaultt for training - runtime --- model_data/simulation_system/training.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 71979b8b..4cc19ed3 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -28,7 +28,7 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) -RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)