diff --git a/.github/workflows/cml.yml b/.github/workflows/cml.yml index be06486b..3a0f4cae 100644 --- a/.github/workflows/cml.yml +++ b/.github/workflows/cml.yml @@ -9,6 +9,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + # TODO: use dvc to pull data, mkdir s3-mock, load data, then use docker compose + # - name: Build docker compose stack + # run: | + # cd model_data/simulation_system + # docker-compose up -d - uses: actions/setup-python@v4 - uses: iterative/setup-cml@v1 - name: Train model diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.prediction b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction index c0efdb5f..88cea6c4 100644 --- a/model_data/simulation_system/Dockerfiles/Dockerfile.prediction +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction @@ -6,6 +6,7 @@ ARG GID=100 # Install patches RUN apt-get update && apt-get upgrade -y \ + && apt-get install libgomp1 -y \ && apt-get clean \ && rm -rf /var/lib/apt/lists @@ -34,4 +35,4 @@ USER ${USER} WORKDIR /home/simulation_system # Run the python command -CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"] +CMD ["python3", "predictions.py", "--data-path", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"] diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.training b/model_data/simulation_system/Dockerfiles/Dockerfile.training index dcba0499..1eadee7b 100644 --- a/model_data/simulation_system/Dockerfiles/Dockerfile.training +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.training @@ -6,6 +6,7 @@ ARG GID=100 # Install patches RUN apt-get update && apt-get upgrade -y \ + && apt-get install libgomp1 -y \ && apt-get clean \ && rm -rf /var/lib/apt/lists @@ -34,4 +35,4 @@ USER ${USER} WORKDIR /home/simulation_system # Run the python command -CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"] +CMD ["python3", "training.py", "--train-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"] diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index 4a520e4b..4a4f195e 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -8,6 +8,7 @@ Key tasks: - Generate Inference """ +import os from typing import Any from pathlib import Path import pandas as pd @@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor from core.Logger import logger from core.Metrics import Metrics from core.Settings import METRIC_FILENAME -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient AUTOGLUON_HYPERPARAMETERS = [ "problem_type", @@ -54,34 +55,55 @@ class AutogluonModel: def load_model( self, filepath: str | Path, - s3_client: S3FSClient, + client: BotoClient, model_folder: str = "local_model", ) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ filepath = str(filepath) - if s3_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") self.model = TabularPredictor.load(path=filepath) else: logger.info(f"Loading model from s3") - s3_client.download_model(filepath=filepath, model_folder=model_folder) + client.download_model(filepath=filepath, model_folder=model_folder) self.model = TabularPredictor.load(path=model_folder) - def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def save_model(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the model to be used. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") logger.info("Using AutoGluon Model - Model saving already occured") else: logger.info(f"Saving model into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + + self.directory_upload( + client=client, + local_directory=str(output_filepath), + bucket_name=client.model_bucket, + ) + logger.info("Save complete") + def directory_upload(self, client, local_directory, bucket_name): + + # Iterate through the local directory and upload each file + for root, dirs, files in os.walk(local_directory): + for file in files: + # Determine the local file path and S3 object key + local_file_path = os.path.join(root, file) + s3_object_key = os.path.relpath(local_file_path, local_directory) + + # Upload the file to S3 + client.client.upload_file(local_file_path, bucket_name, local_file_path) + + logger.info( + f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}" + ) + def train_model( self, data: pd.DataFrame, target_column: str, hyperparameters: dict ) -> None: diff --git a/model_data/simulation_system/core/CloudClient.py b/model_data/simulation_system/core/CloudClient.py index 5af65da5..8656a22e 100644 --- a/model_data/simulation_system/core/CloudClient.py +++ b/model_data/simulation_system/core/CloudClient.py @@ -3,17 +3,105 @@ Set up the client to be used for downloading and uploading model files """ import os -import s3fs +import boto3 from core.Logger import logger -class S3FSClient: +# class S3FSClient: +# """ +# Set up the correct client to upload files to s3 +# """ + +# def __init__(self, runtime_environment: str = "local") -> None: +# self.client: s3fs.S3FileSystem | None = None +# self.model_bucket: str + +# self.client_factory(runtime_environment) +# self.determine_model_bucket(runtime_environment) + +# def client_factory(self, runtime_environment: str = "local"): +# """ +# Select the correct s3 client to use +# """ + +# if runtime_environment == "local": +# logger.info("No S3 client setup required") +# elif runtime_environment == "local-mock": +# logger.info(f"S3 settings for {runtime_environment}") +# self.client = s3fs.S3FileSystem( +# key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), +# secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), +# client_kwargs={ +# "endpoint_url": os.environ.get( +# "ENDPOINT_URL", "http://localhost:9000" +# ) +# }, +# ) +# elif runtime_environment in ["dev", "staging", "prod"]: +# logger.info(f"S3 settings for {runtime_environment}") +# # Key/ token should be in session/lambda for this +# self.client = s3fs.S3FileSystem() +# else: +# raise NotImplementedError("No correspnding runtime environment") + +# def determine_model_bucket(self, runtime_environment: str) -> None: +# """ +# For the given environment, return the correct bucket for models +# """ +# if runtime_environment == "local": +# logger.info("In local development - no need for s3") +# elif runtime_environment in ["local-mock", "dev"]: +# # TODO: get from enironment +# self.model_bucket = "retrofit-model-directory-dev" +# elif runtime_environment in ["staging", "prod"]: +# self.model_bucket = f"retrofit-model-directory-{runtime_environment}" +# else: +# raise NotImplementedError("No corresponding runtime environment") + +# def download_model(self, filepath: str, model_folder: str): +# """ +# For the file path, download the model locally so that we can load the model +# """ + +# if self.client is None: +# logger.info("No need to download model as local development") +# else: + +# def list_files_recursively(folder_path, client): +# all_files = [] +# for root, dirs, files in client.walk(folder_path): +# for file in files: +# s3_path = os.path.join(root, file) +# all_files.append(s3_path) +# return all_files + +# # List all files in the specified S3 folder and its subfolders +# files = list_files_recursively( +# f"{self.model_bucket}/{filepath}", client=self.client +# ) + +# # Download each file +# for file in files: +# # Extract the filename from the S3 path +# filename = file.split(filepath)[-1] + +# # Define the local path where you want to save the file +# local_path = os.path.join(model_folder, filename) + +# # Download the file from S3 to the local directory +# self.client.get(file, local_path) +# print(f"Downloaded {filename} to {local_path}") + +# print("Download completed.") + + +class BotoClient: """ - Set up the correct client to upload files to s3 + Using boto3 to access the different aws storage configurations """ def __init__(self, runtime_environment: str = "local") -> None: - self.client: s3fs.S3FileSystem | None = None + self.client = None self.model_bucket: str self.client_factory(runtime_environment) @@ -28,19 +116,19 @@ class S3FSClient: logger.info("No S3 client setup required") elif runtime_environment == "local-mock": logger.info(f"S3 settings for {runtime_environment}") - self.client = s3fs.S3FileSystem( - key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), - secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), - client_kwargs={ - "endpoint_url": os.environ.get( - "ENDPOINT_URL", "http://localhost:9000" - ) - }, + session = boto3.Session() + self.client = session.client( + service_name="s3", + aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), + aws_secret_access_key=os.environ.get( + "AWS_SECRET_ACCESS_KEY", "password" + ), + endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000"), ) elif runtime_environment in ["dev", "staging", "prod"]: logger.info(f"S3 settings for {runtime_environment}") # Key/ token should be in session/lambda for this - self.client = s3fs.S3FileSystem() + self.client = boto3.client("s3") else: raise NotImplementedError("No correspnding runtime environment") @@ -62,34 +150,34 @@ class S3FSClient: """ For the file path, download the model locally so that we can load the model """ + # List all objects with the specified prefix in the bucket if self.client is None: - logger.info("No need to download model as local development") - else: + raise ValueError("SHould not be in here!") - def list_files_recursively(folder_path, client): - all_files = [] - for root, dirs, files in client.walk(folder_path): - for file in files: - s3_path = os.path.join(root, file) - all_files.append(s3_path) - return all_files + objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath) - # List all files in the specified S3 folder and its subfolders - files = list_files_recursively( - f"{self.model_bucket}/{filepath}", client=self.client + # Ensure the local directory for downloads exists + if not os.path.exists(model_folder): + os.makedirs(model_folder) + + # Download each object with the specified prefix + for obj in objects.get("Contents", []): + # Get the object key (file path) + object_key = obj["Key"] + + # Determine the local file path to save the object + local_file_path = os.path.join( + model_folder, object_key.split(f"{filepath}/")[-1] ) - # Download each file - for file in files: - # Extract the filename from the S3 path - filename = file.split(filepath)[-1] + # Create the local directory if it doesn't exist + local_directory = os.path.dirname(local_file_path) + if not os.path.exists(local_directory): + os.makedirs(local_directory) - # Define the local path where you want to save the file - local_path = os.path.join("local_model", filename) + # Download the object from S3 to the local directory + self.client.download_file(self.model_bucket, object_key, local_file_path) + print(f"Downloaded {object_key} to {local_file_path}") - # Download the file from S3 to the local directory - self.client.get(file, local_path) - print(f"Downloaded {filename} to {local_path}") - - print("Download completed.") + print("Download completed.") diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 9cb82a8a..c5c10b0a 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -3,9 +3,10 @@ import os from typing import Protocol import boto3 from io import BytesIO, StringIO +from core.CloudClient import BotoClient -def read_parquet_from_s3(bucket_name, file_key): +def read_parquet_from_s3(client, bucket_name, file_key): """ Read a CSV file from S3 using boto3 and pandas. @@ -15,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key): :param aws_secret_access_key: AWS Secret Access Key :return: DataFrame containing the CSV data. """ - # Initialize the S3 client - s3_client = boto3.client("s3") # Get the object - s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) # Read the CSV body into a DataFrame csv_body = s3_object["Body"].read() @@ -28,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key): return df -def read_csv_from_s3(bucket_name, file_key, index_col): +def read_csv_from_s3(client, bucket_name, file_key, index_col): """ Read a CSV file from S3 using boto3 and pandas. @@ -38,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col): :param aws_secret_access_key: AWS Secret Access Key :return: DataFrame containing the CSV data. """ - # Initialize the S3 client - s3_client = boto3.client("s3") # Get the object - s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) # Read the CSV body into a DataFrame csv_body = s3_object["Body"].read().decode("utf-8") @@ -57,7 +54,9 @@ class DataLoader(Protocol): """ @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None: + def load( + client: BotoClient, filepath: str, index_col: str | None = None + ) -> pd.DataFrame | None: """ Loading data from the relevant source """ @@ -69,7 +68,9 @@ class LocalDataLoader: """ @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: + def load( + client: BotoClient, filepath: str, index_col: str | None = None + ) -> pd.DataFrame: if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") @@ -86,56 +87,36 @@ class LocalDataLoader: return df -class S3MockDataLoader: +class S3DataLoader: """ Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service """ @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: - - # TODO: Ingest these as environment variables in the docker compose file - storage_options = { - "key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"), - "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), - "client_kwargs": { - "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") - }, - } + def load( + client: BotoClient, filepath: str, index_col: str | None = None + ) -> pd.DataFrame: if not filepath.startswith("s3://"): filepath = "s3://" + filepath - if filepath.endswith(".parquet"): - df = pd.read_parquet(filepath, storage_options=storage_options) - if index_col is not None: - df = df.set_index(index_col) - elif filepath.endswith(".csv"): - df = pd.read_csv( - filepath, index_col=index_col, storage_options=storage_options - ) - else: - raise ValueError(f"File format not supported for file: {filepath}") - - return df - - -class S3DataLoader: - """ - Implements the DataLoader Protocol for s3 files - """ - - @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: filepath_split = filepath.split("s3://")[-1].split("/", 1) bucket = filepath_split[0] key = filepath_split[1] + if filepath.endswith(".parquet"): - df = read_parquet_from_s3(bucket, key) + df = read_parquet_from_s3( + client=client.client, bucket_name=bucket, file_key=key + ) if index_col is not None: df = df.set_index(index_col) elif filepath.endswith(".csv"): - df = read_csv_from_s3(bucket, key, index_col) + df = read_csv_from_s3( + client=client.client, + bucket_name=bucket, + file_key=key, + index_col=index_col, + ) else: raise ValueError(f"File format not supported for file: {filepath}") @@ -152,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: dataloader_types = { "local": LocalDataLoader(), - "local-mock": S3MockDataLoader(), + "local-mock": S3DataLoader(), "dev": S3DataLoader(), "staging": S3DataLoader(), "prod": S3DataLoader(), diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py index d1382d94..9369587d 100644 --- a/model_data/simulation_system/core/Metrics.py +++ b/model_data/simulation_system/core/Metrics.py @@ -5,11 +5,12 @@ Key tasks: - Given a model and test data, produce a suite of all metrics """ +import os import pandas as pd from pathlib import Path import seaborn as sns import matplotlib.pyplot as plt -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Logger import logger from core.Settings import ( RESIDUAL_TRUE_LABEL, @@ -64,18 +65,40 @@ class Metrics: All metric functions used to generate a dictionary of metrics """ - def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the metrics folders/files. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need to upload") else: logger.info(f"Saving metrics into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + s3_location = client.model_bucket + "/" + str(output_filepath) + + self.directory_upload( + client=client, + local_directory=str(output_filepath), + bucket_name=client.model_bucket, + ) + logger.info("Save complete") + def directory_upload(self, client, local_directory, bucket_name): + + # Iterate through the local directory and upload each file + for root, dirs, files in os.walk(local_directory): + for file in files: + # Determine the local file path and S3 object key + local_file_path = os.path.join(root, file) + s3_object_key = os.path.relpath(local_file_path, local_directory) + + # Upload the file to S3 + client.client.upload_file(local_file_path, bucket_name, local_file_path) + + logger.info( + f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}" + ) + @staticmethod def list_metric_functions() -> list: """ diff --git a/model_data/simulation_system/core/RegistryHandler.py b/model_data/simulation_system/core/RegistryHandler.py index e492b2d2..e7ec641c 100644 --- a/model_data/simulation_system/core/RegistryHandler.py +++ b/model_data/simulation_system/core/RegistryHandler.py @@ -2,38 +2,65 @@ """ +from io import StringIO import pandas as pd from pathlib import Path from core.Logger import logger -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Metrics import Metrics from core.Settings import BEST_MODEL_COLUMN_NAME +def read_csv_from_s3(client, bucket_name, file_key, index_col): + """ + Read a CSV file from S3 using boto3 and pandas. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :param aws_access_key_id: AWS Access Key ID + :param aws_secret_access_key: AWS Secret Access Key + :return: DataFrame containing the CSV data. + """ + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read().decode("utf-8") + df = pd.read_csv(StringIO(csv_body), index_col=index_col) + + return df + + class RegistryHandler: """ Handles the loading of the registry depending on the environment """ - def load_registry( - self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics - ): + def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics): """ Depening on the environment, we will have to load from locally or s3 (mock/real) """ - if s3fs_client.client is None: + if client.client is None: logger.info("Using local development - no need for s3 load") return self.load_local_registry( registry_path=registry_path, metrics=metrics ) - s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path) - logger.info(f"Check if registry exists") - if s3fs_client.client.exists(s3_location): - registry_df = pd.read_csv( - s3fs_client.client.open(s3_location), index_col=None + + check_exists = client.client.list_objects_v2( + Bucket=client.model_bucket, Prefix=str(registry_path) + ) + + if "Contents" in check_exists: + logger.info("Loading existing registry") + registry_df = read_csv_from_s3( + client=client.client, + bucket_name=client.model_bucket, + file_key=str(registry_path), + index_col=None, ) else: logger.info("No registry found - creating new one") @@ -70,14 +97,18 @@ class RegistryHandler: return registry_df - def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def save_registry(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the model to be used. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") else: logger.info(f"Saving registry into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + s3_location = client.model_bucket + "/" + str(output_filepath) + + client.client.upload_file( + str(output_filepath), client.model_bucket, str(output_filepath) + ) + logger.info("Save complete") diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index 058dc062..f875e7ef 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -18,19 +18,20 @@ services: timeout: 20s retries: 3 - # simulation_system_training: - # build: - # context: ./ - # dockerfile: ./Dockerfiles/Dockerfile.training - # image: simulation_system_training - # environment: - # ENDPOINT_URL: http://minio:9000/ - # AWS_ACCESS_KEY_ID: *MINIO_USER - # AWS_SECRET_ACCESS_KEY: *MINIO_PASS - # tty: true - # depends_on: - # minio: - # condition: service_healthy + simulation_system_training: + build: + context: ./ + dockerfile: ./Dockerfiles/Dockerfile.training + image: simulation_system_training + environment: + RUNTIME_ENVIRONMENT: local-mock + ENDPOINT_URL: http://minio:9000/ + AWS_ACCESS_KEY_ID: *MINIO_USER + AWS_SECRET_ACCESS_KEY: *MINIO_PASS + tty: true + depends_on: + minio: + condition: service_healthy # command: # ["bash"] @@ -40,6 +41,7 @@ services: # dockerfile: ./Dockerfiles/Dockerfile.prediction # image: simulation_system_prediction # environment: + # RUNTIME_ENVIRONMENT: local-mock # ENDPOINT_URL: http://minio:9000/ # AWS_ACCESS_KEY_ID: *MINIO_USER # AWS_SECRET_ACCESS_KEY: *MINIO_PASS @@ -47,7 +49,7 @@ services: # depends_on: # simulation_system_training: # condition: service_completed_successfully - # command: + # command: # ["bash"] diff --git a/model_data/simulation_system/model_build_data/change_data/rdsap_full/test_data_with_id.parquet b/model_data/simulation_system/model_build_data/change_data/rdsap_full/test_data_with_id.parquet new file mode 100644 index 00000000..d41b23da Binary files /dev/null and b/model_data/simulation_system/model_build_data/change_data/rdsap_full/test_data_with_id.parquet differ diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index 1aa3defa..9f1ff344 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -5,13 +5,16 @@ Script to load MLModel class and generate predictions import os import json import argparse +from pathlib import Path import pandas as pd from typing import Optional from datetime import datetime from MLModel.Models import AutogluonModel from core.Logger import logger from core.DataLoader import dataloader_factory -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient +from core.Metrics import Metrics +from core.RegistryHandler import RegistryHandler from core.Settings import ( BASE_REGISTRY_PATH, REGISTRY_FILE, @@ -19,13 +22,13 @@ from core.Settings import ( PREDICTION_FILE, METADATA_FILE, TIMESTAMP_FORMAT, + MODEL_DIRECTORY, ) TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") -CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) - +CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT) # FOR TESTING # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to @@ -36,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) # For testing in dev s3 # Data path can be passed as so: # python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet -# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" +# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet" def ingest_arguments() -> argparse.Namespace: @@ -83,7 +86,7 @@ def prediction( if registry_path is None or not registry_path.exists(): logger.error("No registry path provided or registry doesn't exist") exit(1) - elif RUNTIME_ENVIRONMENT == "dev": + elif RUNTIME_ENVIRONMENT in ["local-mock", "dev"]: registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv" else: raise NotImplemented("TO be implemented") @@ -96,7 +99,17 @@ def prediction( else: # TODO: Think about where registry will sit/ type logger.info("Loading best model from registry") - registry_df = pd.read_csv(registry_path) + + metrics = Metrics() + registry_handler = RegistryHandler() + + registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE + + registry_df = registry_handler.load_registry( + registry_path=registry_path, client=CLIENT, metrics=metrics + ) + + # registry_df = pd.read_csv(registry_path) best_model_df = registry_df[registry_df["best_model"]] model_location = best_model_df["model_location"].values[0] @@ -115,7 +128,7 @@ def prediction( if data_path and data is None: logger.info("Loading data from provided path") dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) - data = dataloader.load(filepath=data_path, index_col="id") + data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None) if data is None: raise ValueError("No data loaded") @@ -137,14 +150,14 @@ def prediction( logger.error("No other model currently") exit(1) - model.load_model( - filepath=model_location, s3_client=CLIENT, model_folder="local_model" - ) + model.load_model(filepath=model_location, client=CLIENT, model_folder="local_model") logger.info("--- Generating Predictions ---") prediction = model.generate_predictions(data=data) - return pd.concat([pd.Series(data.index, name='id'), prediction], axis=1) + # logger.info(pd.concat([data["id"], prediction], axis=1)) + + return pd.concat([data["id"], prediction], axis=1) # Save prediction some where? # prediction.to_csv("s3?") diff --git a/model_data/simulation_system/requirements/predictions/predictions-dev.txt b/model_data/simulation_system/requirements/predictions/predictions-dev.txt index 48bbb3ca..a9e65dc2 100644 --- a/model_data/simulation_system/requirements/predictions/predictions-dev.txt +++ b/model_data/simulation_system/requirements/predictions/predictions-dev.txt @@ -1,4 +1,6 @@ +boto3 autogluon==0.8.2 pandas==1.5.3 -s3fs==2023.6.0 +seaborn==0.12.2 +matplotlib==3.7.2 pre-commit==3.3.3 diff --git a/model_data/simulation_system/requirements/predictions/predictions.txt b/model_data/simulation_system/requirements/predictions/predictions.txt index 6aeeaa45..af2d681e 100644 --- a/model_data/simulation_system/requirements/predictions/predictions.txt +++ b/model_data/simulation_system/requirements/predictions/predictions.txt @@ -1,6 +1,5 @@ boto3 autogluon==0.8.2 pandas==1.5.3 -s3fs seaborn==0.12.2 -matplotlib==3.7.2 \ No newline at end of file +matplotlib==3.7.2 diff --git a/model_data/simulation_system/requirements/training/training-dev.txt b/model_data/simulation_system/requirements/training/training-dev.txt index bcba4f18..ea205270 100644 --- a/model_data/simulation_system/requirements/training/training-dev.txt +++ b/model_data/simulation_system/requirements/training/training-dev.txt @@ -1,5 +1,4 @@ autogluon==0.8.2 pandas==1.5.3 seaborn==0.12.2 -s3fs==2023.6.0 pre-commit==3.3.3 diff --git a/model_data/simulation_system/requirements/training/training.txt b/model_data/simulation_system/requirements/training/training.txt index cb2e1f87..17e4c8da 100644 --- a/model_data/simulation_system/requirements/training/training.txt +++ b/model_data/simulation_system/requirements/training/training.txt @@ -1,4 +1,3 @@ autogluon==0.8.2 pandas==1.5.3 seaborn==0.12.2 -s3fs==2023.6.0 diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 4cc19ed3..b68d1c4d 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -10,7 +10,7 @@ from core.Logger import logger from core.Metrics import Metrics, sort_by_metric from core.DataLoader import dataloader_factory from core.FeatureProcessor import FeatureProcessor -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.RegistryHandler import RegistryHandler from core.Settings import ( MODEL_DIRECTORY, @@ -30,7 +30,8 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") -CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) +CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT) +# CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) # FOR TESTING @@ -105,8 +106,8 @@ def training( logger.info("--- Loading data ---") dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) - train_df = dataloader.load(filepath=train_filepath) - test_df = dataloader.load(filepath=test_filepath) + train_df = dataloader.load(client=CLIENT, filepath=train_filepath) + test_df = dataloader.load(client=CLIENT, filepath=test_filepath) if train_df is None or test_df is None: raise ValueError("No data Loaded - cancelling pipeline") @@ -153,7 +154,7 @@ def training( ) logger.info("--- Save Model ---") - model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT) + model.save_model(output_filepath=model.output_filepath, client=CLIENT) logger.info("--- Generate evaluation metrics ---") metrics = Metrics() @@ -166,8 +167,6 @@ def training( metrics=metrics, ) - metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT) - logger.info("--- Generate metric outputs using predictions ---") # metrics.generate_plot_suite() @@ -179,7 +178,7 @@ def training( output_filepath=plot_output_path, ) - metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT) + metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT) # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host @@ -196,7 +195,7 @@ def training( f"Optimised version of best model can be found at: {deployment_model_path}" ) - model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT) + model.save_model(output_filepath=deployment_model_path, client=CLIENT) # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory @@ -208,7 +207,7 @@ def training( registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE registry_df = registry_handler.load_registry( - registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics + registry_path=registry_path, client=CLIENT, metrics=metrics ) model_details_df = pd.DataFrame( @@ -235,7 +234,7 @@ def training( registry_path.parent.mkdir(parents=True, exist_ok=True) registry_df.to_csv(registry_path, index=False) - registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT) + registry_handler.save_registry(output_filepath=registry_path, client=CLIENT) logger.info("--- Clean up ---") if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():