diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index 4a520e4b..4a4f195e 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -8,6 +8,7 @@ Key tasks: - Generate Inference """ +import os from typing import Any from pathlib import Path import pandas as pd @@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor from core.Logger import logger from core.Metrics import Metrics from core.Settings import METRIC_FILENAME -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient AUTOGLUON_HYPERPARAMETERS = [ "problem_type", @@ -54,34 +55,55 @@ class AutogluonModel: def load_model( self, filepath: str | Path, - s3_client: S3FSClient, + client: BotoClient, model_folder: str = "local_model", ) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ filepath = str(filepath) - if s3_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") self.model = TabularPredictor.load(path=filepath) else: logger.info(f"Loading model from s3") - s3_client.download_model(filepath=filepath, model_folder=model_folder) + client.download_model(filepath=filepath, model_folder=model_folder) self.model = TabularPredictor.load(path=model_folder) - def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def save_model(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the model to be used. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") logger.info("Using AutoGluon Model - Model saving already occured") else: logger.info(f"Saving model into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + + self.directory_upload( + client=client, + local_directory=str(output_filepath), + bucket_name=client.model_bucket, + ) + logger.info("Save complete") + def directory_upload(self, client, local_directory, bucket_name): + + # Iterate through the local directory and upload each file + for root, dirs, files in os.walk(local_directory): + for file in files: + # Determine the local file path and S3 object key + local_file_path = os.path.join(root, file) + s3_object_key = os.path.relpath(local_file_path, local_directory) + + # Upload the file to S3 + client.client.upload_file(local_file_path, bucket_name, local_file_path) + + logger.info( + f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}" + ) + def train_model( self, data: pd.DataFrame, target_column: str, hyperparameters: dict ) -> None: diff --git a/model_data/simulation_system/core/CloudClient.py b/model_data/simulation_system/core/CloudClient.py index b3e0cac1..b22d73bb 100644 --- a/model_data/simulation_system/core/CloudClient.py +++ b/model_data/simulation_system/core/CloudClient.py @@ -151,4 +151,34 @@ class BotoClient: """ For the file path, download the model locally so that we can load the model """ - pass + # List all objects with the specified prefix in the bucket + + if self.client is None: + raise ValueError("SHould not be in here!") + + objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath) + + # Ensure the local directory for downloads exists + if not os.path.exists(model_folder): + os.makedirs(model_folder) + + # Download each object with the specified prefix + for obj in objects.get("Contents", []): + # Get the object key (file path) + object_key = obj["Key"] + + # Determine the local file path to save the object + local_file_path = os.path.join( + model_folder, object_key.split(f"{filepath}/")[-1] + ) + + # Create the local directory if it doesn't exist + local_directory = os.path.dirname(local_file_path) + if not os.path.exists(local_directory): + os.makedirs(local_directory) + + # Download the object from S3 to the local directory + self.client.download_file(self.model_bucket, object_key, local_file_path) + print(f"Downloaded {object_key} to {local_file_path}") + + print("Download completed.") diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 6bc13c2f..c5c10b0a 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -6,7 +6,7 @@ from io import BytesIO, StringIO from core.CloudClient import BotoClient -def read_parquet_from_s3(bucket_name, file_key): +def read_parquet_from_s3(client, bucket_name, file_key): """ Read a CSV file from S3 using boto3 and pandas. @@ -16,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key): :param aws_secret_access_key: AWS Secret Access Key :return: DataFrame containing the CSV data. """ - # Initialize the S3 client - s3_client = boto3.client("s3") # Get the object - s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) # Read the CSV body into a DataFrame csv_body = s3_object["Body"].read() @@ -29,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key): return df -def read_csv_from_s3(bucket_name, file_key, index_col): +def read_csv_from_s3(client, bucket_name, file_key, index_col): """ Read a CSV file from S3 using boto3 and pandas. @@ -39,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col): :param aws_secret_access_key: AWS Secret Access Key :return: DataFrame containing the CSV data. """ - # Initialize the S3 client - s3_client = boto3.client("s3") # Get the object - s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) # Read the CSV body into a DataFrame csv_body = s3_object["Body"].read().decode("utf-8") @@ -91,7 +87,7 @@ class LocalDataLoader: return df -class S3MockDataLoader: +class S3DataLoader: """ Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service """ @@ -101,51 +97,26 @@ class S3MockDataLoader: client: BotoClient, filepath: str, index_col: str | None = None ) -> pd.DataFrame: - # TODO: Ingest these as environment variables in the docker compose file - storage_options = { - "key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"), - "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), - "client_kwargs": { - "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") - }, - } - if not filepath.startswith("s3://"): filepath = "s3://" + filepath - if filepath.endswith(".parquet"): - df = pd.read_parquet(filepath, storage_options=storage_options) - if index_col is not None: - df = df.set_index(index_col) - elif filepath.endswith(".csv"): - df = pd.read_csv( - filepath, index_col=index_col, storage_options=storage_options - ) - else: - raise ValueError(f"File format not supported for file: {filepath}") - - return df - - -class S3DataLoader: - """ - Implements the DataLoader Protocol for s3 files - """ - - @staticmethod - def load( - client: BotoClient, filepath: str, index_col: str | None = None - ) -> pd.DataFrame: - filepath_split = filepath.split("s3://")[-1].split("/", 1) bucket = filepath_split[0] key = filepath_split[1] + if filepath.endswith(".parquet"): - df = read_parquet_from_s3(bucket, key) + df = read_parquet_from_s3( + client=client.client, bucket_name=bucket, file_key=key + ) if index_col is not None: df = df.set_index(index_col) elif filepath.endswith(".csv"): - df = read_csv_from_s3(bucket, key, index_col) + df = read_csv_from_s3( + client=client.client, + bucket_name=bucket, + file_key=key, + index_col=index_col, + ) else: raise ValueError(f"File format not supported for file: {filepath}") @@ -162,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: dataloader_types = { "local": LocalDataLoader(), - "local-mock": S3MockDataLoader(), + "local-mock": S3DataLoader(), "dev": S3DataLoader(), "staging": S3DataLoader(), "prod": S3DataLoader(), diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py index d1382d94..9369587d 100644 --- a/model_data/simulation_system/core/Metrics.py +++ b/model_data/simulation_system/core/Metrics.py @@ -5,11 +5,12 @@ Key tasks: - Given a model and test data, produce a suite of all metrics """ +import os import pandas as pd from pathlib import Path import seaborn as sns import matplotlib.pyplot as plt -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Logger import logger from core.Settings import ( RESIDUAL_TRUE_LABEL, @@ -64,18 +65,40 @@ class Metrics: All metric functions used to generate a dictionary of metrics """ - def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the metrics folders/files. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need to upload") else: logger.info(f"Saving metrics into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + s3_location = client.model_bucket + "/" + str(output_filepath) + + self.directory_upload( + client=client, + local_directory=str(output_filepath), + bucket_name=client.model_bucket, + ) + logger.info("Save complete") + def directory_upload(self, client, local_directory, bucket_name): + + # Iterate through the local directory and upload each file + for root, dirs, files in os.walk(local_directory): + for file in files: + # Determine the local file path and S3 object key + local_file_path = os.path.join(root, file) + s3_object_key = os.path.relpath(local_file_path, local_directory) + + # Upload the file to S3 + client.client.upload_file(local_file_path, bucket_name, local_file_path) + + logger.info( + f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}" + ) + @staticmethod def list_metric_functions() -> list: """ diff --git a/model_data/simulation_system/core/RegistryHandler.py b/model_data/simulation_system/core/RegistryHandler.py index e492b2d2..e7ec641c 100644 --- a/model_data/simulation_system/core/RegistryHandler.py +++ b/model_data/simulation_system/core/RegistryHandler.py @@ -2,38 +2,65 @@ """ +from io import StringIO import pandas as pd from pathlib import Path from core.Logger import logger -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Metrics import Metrics from core.Settings import BEST_MODEL_COLUMN_NAME +def read_csv_from_s3(client, bucket_name, file_key, index_col): + """ + Read a CSV file from S3 using boto3 and pandas. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :param aws_access_key_id: AWS Access Key ID + :param aws_secret_access_key: AWS Secret Access Key + :return: DataFrame containing the CSV data. + """ + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read().decode("utf-8") + df = pd.read_csv(StringIO(csv_body), index_col=index_col) + + return df + + class RegistryHandler: """ Handles the loading of the registry depending on the environment """ - def load_registry( - self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics - ): + def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics): """ Depening on the environment, we will have to load from locally or s3 (mock/real) """ - if s3fs_client.client is None: + if client.client is None: logger.info("Using local development - no need for s3 load") return self.load_local_registry( registry_path=registry_path, metrics=metrics ) - s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path) - logger.info(f"Check if registry exists") - if s3fs_client.client.exists(s3_location): - registry_df = pd.read_csv( - s3fs_client.client.open(s3_location), index_col=None + + check_exists = client.client.list_objects_v2( + Bucket=client.model_bucket, Prefix=str(registry_path) + ) + + if "Contents" in check_exists: + logger.info("Loading existing registry") + registry_df = read_csv_from_s3( + client=client.client, + bucket_name=client.model_bucket, + file_key=str(registry_path), + index_col=None, ) else: logger.info("No registry found - creating new one") @@ -70,14 +97,18 @@ class RegistryHandler: return registry_df - def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def save_registry(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the model to be used. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") else: logger.info(f"Saving registry into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + s3_location = client.model_bucket + "/" + str(output_filepath) + + client.client.upload_file( + str(output_filepath), client.model_bucket, str(output_filepath) + ) + logger.info("Save complete") diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index 0ca4f585..370e960a 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -12,7 +12,7 @@ from datetime import datetime from MLModel.Models import AutogluonModel from core.Logger import logger from core.DataLoader import dataloader_factory -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Metrics import Metrics from core.RegistryHandler import RegistryHandler from core.Settings import ( @@ -28,8 +28,7 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local-mock") -CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) - +CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT) # FOR TESTING # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to @@ -40,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) # For testing in dev s3 # Data path can be passed as so: # python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet -# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" +# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet" def ingest_arguments() -> argparse.Namespace: @@ -107,7 +106,7 @@ def prediction( registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE registry_df = registry_handler.load_registry( - registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics + registry_path=registry_path, client=CLIENT, metrics=metrics ) # registry_df = pd.read_csv(registry_path) @@ -129,13 +128,13 @@ def prediction( if data_path and data is None: logger.info("Loading data from provided path") dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) - data = dataloader.load(filepath=data_path, index_col="id") + data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None) if data is None: raise ValueError("No data loaded") # # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION - data = data.sample(1) + # data = data.sample(1) else: logger.info("Using data provided") data = json.loads(str(data)) @@ -151,14 +150,12 @@ def prediction( logger.error("No other model currently") exit(1) - model.load_model( - filepath=model_location, s3_client=CLIENT, model_folder="local_model" - ) + model.load_model(filepath=model_location, client=CLIENT, model_folder="local_model") logger.info("--- Generating Predictions ---") prediction = model.generate_predictions(data=data) - return pd.concat([pd.Series(data.index, name="id"), prediction], axis=1) + return pd.concat([data["id"], prediction], axis=1) # Save prediction some where? # prediction.to_csv("s3?") diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index b277e441..d3212209 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -106,8 +106,8 @@ def training( logger.info("--- Loading data ---") dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) - train_df = dataloader.load(filepath=train_filepath) - test_df = dataloader.load(filepath=test_filepath) + train_df = dataloader.load(client=CLIENT, filepath=train_filepath) + test_df = dataloader.load(client=CLIENT, filepath=test_filepath) if train_df is None or test_df is None: raise ValueError("No data Loaded - cancelling pipeline") @@ -154,7 +154,7 @@ def training( ) logger.info("--- Save Model ---") - model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT) + model.save_model(output_filepath=model.output_filepath, client=CLIENT) logger.info("--- Generate evaluation metrics ---") metrics = Metrics() @@ -167,8 +167,6 @@ def training( metrics=metrics, ) - metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT) - logger.info("--- Generate metric outputs using predictions ---") # metrics.generate_plot_suite() @@ -180,7 +178,7 @@ def training( output_filepath=plot_output_path, ) - metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT) + metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT) # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host @@ -197,7 +195,7 @@ def training( f"Optimised version of best model can be found at: {deployment_model_path}" ) - model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT) + model.save_model(output_filepath=deployment_model_path, client=CLIENT) # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory @@ -209,7 +207,7 @@ def training( registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE registry_df = registry_handler.load_registry( - registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics + registry_path=registry_path, client=CLIENT, metrics=metrics ) model_details_df = pd.DataFrame( @@ -236,7 +234,7 @@ def training( registry_path.parent.mkdir(parents=True, exist_ok=True) registry_df.to_csv(registry_path, index=False) - registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT) + registry_handler.save_registry(output_filepath=registry_path, client=CLIENT) logger.info("--- Clean up ---") if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():