added s3fs client to handler the connection to s3

This commit is contained in:
Michael Duong 2023-09-01 11:27:20 +01:00
parent 03f4b87e24
commit bf06636a58
9 changed files with 246 additions and 79 deletions

View file

@ -13,6 +13,7 @@ from numpy import ndarray
from pathlib import Path
from typing import Protocol, NamedTuple, Any
import pandas as pd
from training import S3FSClient
class MLModel(Protocol):
@ -25,7 +26,9 @@ class MLModel(Protocol):
Providing a path, this function will load the model to be used. Will load to internal variable
"""
def save_model(self, output_filepath: Path) -> None:
def save_model(
self, output_filepath: Path, s3_client: S3FSClient | None = None
) -> None:
"""
Providing a path, this function will save the model to be used.
"""

View file

@ -12,11 +12,10 @@ from typing import Any
from pathlib import Path
import pandas as pd
from autogluon.tabular import TabularDataset, TabularPredictor
from sklearn.metrics import mean_absolute_percentage_error
from core.Logger import logger
from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME
from MLModel.BaseMLModel import MLModel
from core.CloudClient import S3FSClient
AUTOGLUON_HYPERPARAMETERS = [
"problem_type",
@ -52,19 +51,34 @@ class AutogluonModel:
self.output_filepath = output_filepath
self.predictions = None
def load_model(self, filepath: str | Path) -> None:
def load_model(
self, filepath: str | Path, s3_client: S3FSClient | None = None
) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
filepath = str(filepath)
if s3_client is None:
logger.info("In local development mode - no need for s3 client")
filepath = str(filepath)
self.model = TabularPredictor.load(path=filepath)
else:
pass
# logger.info(f"Loading model from s3")
# s3_client.download_model(filepath=filepath, local_filepath=)
# self.model =
self.model = TabularPredictor.load(path=filepath)
def save_model(self, output_filepath: Path | None = None) -> None:
def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
logger.info("Using AutoGluon Model - Model saving already occured")
if s3fs_client.client is None:
logger.info("In local development mode - no need for s3 client")
logger.info("Using AutoGluon Model - Model saving already occured")
else:
logger.info(f"Saving model into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
logger.info("Save complete")
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict

View file

@ -0,0 +1,65 @@
"""
Set up the client to be used for downloading and uploading model files
"""
import os
import s3fs
from core.Logger import logger
class S3FSClient:
"""
Set up the correct client to upload files to s3
"""
def __init__(self, runtime_environment: str = "local") -> None:
self.client: s3fs.S3FileSystem | None = None
self.model_bucket: str
self.client_factory(runtime_environment)
self.determine_model_bucket(runtime_environment)
def client_factory(self, runtime_environment: str = "local"):
"""
Select the correct s3 client to use
"""
if runtime_environment == "local":
logger.info("No S3 client setup required")
elif runtime_environment == "local-mock":
logger.info(f"S3 settings for {runtime_environment}")
self.client = s3fs.S3FileSystem(
key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
client_kwargs={
"endpoint_url": os.environ.get(
"ENDPOINT_URL", "http://localhost:9000"
)
},
)
elif runtime_environment in ["dev", "staging", "prod"]:
logger.info(f"S3 settings for {runtime_environment}")
# Key/ token should be in session/lambda for this
self.client = s3fs.S3FileSystem()
else:
raise NotImplementedError("No correspnding runtime environment")
def determine_model_bucket(self, runtime_environment: str) -> None:
"""
For the given environment, return the correct bucket for models
"""
if runtime_environment == "local":
logger.info("In local development - no need for s3")
elif runtime_environment in ["local-mock", "dev"]:
self.model_bucket = "retrofit-model-directory-dev"
elif runtime_environment in ["staging", "prod"]:
self.model_bucket = f"retrofit-model-directory-{runtime_environment}"
else:
raise NotImplementedError("No corresponding runtime environment")
# def download_model(self, filepath: str, local_filepath: str = "."):
# """
# For the file path, download the model locally so that we can load the model
# """
# if local_filepath is None:
# self.local_filepath = filepath

View file

@ -55,6 +55,9 @@ class S3MockDataLoader:
},
}
if not filepath.startswith("s3://"):
filepath = "s3://" + filepath
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=storage_options)
if index_col is not None:
@ -107,7 +110,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
dataloader_types = {
"local": LocalDataLoader(),
"local-mock": S3MockDataLoader(),
"dev": S3DataLoader(),
"dev": S3MockDataLoader(),
"staging": S3DataLoader(),
"prod": S3DataLoader(),
}

View file

@ -9,6 +9,8 @@ import pandas as pd
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from core.CloudClient import S3FSClient
from core.Logger import logger
from core.Settings import (
RESIDUAL_TRUE_LABEL,
RESIDUAL_PREDICTION_LABEL,
@ -62,6 +64,18 @@ class Metrics:
All metric functions used to generate a dictionary of metrics
"""
def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
"""
Providing a path, this function will save the metrics folders/files.
"""
if s3fs_client.client is None:
logger.info("In local development mode - no need to upload")
else:
logger.info(f"Saving metrics into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
logger.info("Save complete")
@staticmethod
def list_metric_functions() -> list:
"""

View file

@ -0,0 +1,83 @@
"""
"""
import pandas as pd
from pathlib import Path
from core.Logger import logger
from core.CloudClient import S3FSClient
from core.Metrics import Metrics
from core.Settings import BEST_MODEL_COLUMN_NAME
class RegistryHandler:
"""
Handles the loading of the registry depending on the environment
"""
def load_registry(
self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics
):
"""
Depening on the environment, we will have to load from locally or s3 (mock/real)
"""
if s3fs_client.client is None:
logger.info("Using local development - no need for s3 load")
return self.load_local_registry(
registry_path=registry_path, metrics=metrics
)
s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path)
logger.info(f"Check if registry exists")
if s3fs_client.client.exists(s3_location):
registry_df = pd.read_csv(
s3fs_client.client.open(s3_location), index_col=None
)
else:
logger.info("No registry found - creating new one")
registry_df = self.create_new_registry(metrics=metrics)
return registry_df
def load_local_registry(self, registry_path: Path, metrics: Metrics):
"""
In local development mode, load the registry
"""
if registry_path.exists():
logger.info("Registry file found - Loading into Dataframe")
registry_df = pd.read_csv(registry_path, index_col=None)
else:
logger.info("No registry found - creating new one")
registry_df = self.create_new_registry(metrics=metrics)
return registry_df
def create_new_registry(self, metrics: Metrics):
"""
If no registry is found, create a new one
"""
# TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns
columns = [
BEST_MODEL_COLUMN_NAME,
"model_type",
"model_name",
"model_location",
] + metrics.list_metric_functions()
registry_df = pd.DataFrame(columns=columns)
return registry_df
def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if s3fs_client.client is None:
logger.info("In local development mode - no need for s3 client")
else:
logger.info(f"Saving registry into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
logger.info("Save complete")

View file

@ -26,7 +26,7 @@ MODEL_HYPERPARAMETERS = {
"autogluon": {
"problem_type": "regression",
"eval_metric": "mean_absolute_error",
"time_limit": 30,
"time_limit": 45,
"presets": "medium_quality",
"excluded_model_types": None,
}

View file

@ -18,19 +18,19 @@ services:
timeout: 20s
retries: 3
simulation_system_training:
build:
context: ./
dockerfile: ./Dockerfiles/Dockerfile.training
image: simulation_system_training
environment:
ENDPOINT_URL: http://minio:9000/
AWS_ACCESS_KEY_ID: *MINIO_USER
AWS_SECRET_ACCESS_KEY: *MINIO_PASS
tty: true
depends_on:
minio:
condition: service_healthy
# simulation_system_training:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.training
# image: simulation_system_training
# environment:
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# minio:
# condition: service_healthy
# command:
# ["bash"]

View file

@ -1,10 +1,7 @@
import argparse
import os
# from s3pathlib import S3Path
# import boto3
import shutil
from pathlib import Path
from datetime import datetime
import pandas as pd
@ -13,9 +10,10 @@ from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor
from core.CloudClient import S3FSClient
from core.RegistryHandler import RegistryHandler
from core.Settings import (
MODEL_DIRECTORY,
BASE_REGISTRY_PATH,
REGISTRY_FILE,
MODEL_FOLDER,
METRICS_FOLDER,
@ -30,15 +28,10 @@ from core.Settings import (
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# STORAGE_OPTIONS = {
# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'),
# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'),
# "client_kwargs": {
# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
# }
# }
# FOR TESTING
# train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet"
@ -48,22 +41,16 @@ RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
# hyperparameter = HYPERPARAMETERS
# SUBSAMPLE_FACTOR = 200
# SESSION = boto3.Session()
# Mock location
# train_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
# S3_CLIENT = SESSION.client(
# service_name="s3",
# aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", 'admin'),
# aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'),
# endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000")
# )
# S3_CLIENT.create_bucket
# S3_CLIENT.list_buckets()
# To run script in local mode:
# python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet
# df = pd.read_parquet(
# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet",
# storage_options=STORAGE_OPTIONS
# )
# To run script in local-mock mode:
# python3 training.py --train-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
def ingest_arguments() -> argparse.Namespace:
@ -166,27 +153,34 @@ def training(
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath)
model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT)
logger.info("--- Generate evaluation metrics ---")
metrics = Metrics()
metric_output_path = output_base / METRICS_FOLDER
metrics_df = model.model_evaluation(
validation_data=test_df,
target_column=target_column,
metrics_location=output_base / METRICS_FOLDER,
metrics_location=metric_output_path,
metrics=metrics,
)
metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT)
logger.info("--- Generate metric outputs using predictions ---")
# metrics.generate_plot_suite()
plot_output_path = output_base / METRICS_FOLDER / RESIDUAL_FILE
metrics.generate_residual_plot(
actuals=test_df[target_column],
predictions=model.predictions,
target_column=target_column,
output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE,
output_filepath=plot_output_path,
)
metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
@ -195,35 +189,27 @@ def training(
logger.info("--- Optimising model for deployment ---")
deployment_model_path = model.optimise_model_for_deployment(
deployment_path=output_base / DEPLOYMENT_FOLDER
)
deployment_model_path = output_base / DEPLOYMENT_FOLDER
model.optimise_model_for_deployment(deployment_path=deployment_model_path)
logger.info(
f"Optimised version of best model can be found at: {deployment_model_path}"
)
model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT)
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
# Loading registry from s3
logger.info("--- Append registry with new model ---")
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
# registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri
# registry = RegistryHandler(location=registry_path)
registry_handler = RegistryHandler()
if registry_path.exists():
logger.info("Registry file found - Loading into Dataframe")
registry_df = pd.read_csv(registry_path, index_col=None)
else:
# TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns
columns = [
BEST_MODEL_COLUMN_NAME,
"model_type",
"model_name",
"model_location",
] + metrics.list_metric_functions()
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = pd.DataFrame(columns=columns)
registry_df = registry_handler.load_registry(
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics
)
model_details_df = pd.DataFrame(
[
@ -238,10 +224,6 @@ def training(
registry_row = pd.concat([model_details_df, metrics_df], axis=1)
registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True)
# TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and
# regenerate new metrics
# TODO: decide metric to optimise to
registry_df = sort_by_metric(
registry_df,
optimse_metric=OPTIMISE_METRIC,
@ -253,6 +235,13 @@ def training(
registry_path.parent.mkdir(parents=True, exist_ok=True)
registry_df.to_csv(registry_path, index=False)
registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT)
logger.info("--- Clean up ---")
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():
logger.info("Removing local development files not in s3")
shutil.rmtree(Path(MODEL_DIRECTORY))
logger.info("--- Training Pipeline Complete --- ")
@ -263,10 +252,6 @@ if __name__ == "__main__":
logger.info("---Ingest Arguments---")
args = ingest_arguments()
# To run script: python3 training.py --train-filepath
# ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath
# ./model_build_data/change_data/rdsap_full/test_data.parquet
# TODO: Ingest hyper parameters from somewhere - currently change at the top of script
training(
train_filepath=args.train_filepath,
test_filepath=args.test_filepath,