Merge pull request #218 from Hestia-Homes/main

handling lack of permissions when getting model from s3
This commit is contained in:
KhalimCK 2023-09-05 11:30:15 +01:00 committed by GitHub
commit c63d7f449e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 322 additions and 156 deletions

View file

@ -9,6 +9,11 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
# TODO: use dvc to pull data, mkdir s3-mock, load data, then use docker compose
# - name: Build docker compose stack
# run: |
# cd model_data/simulation_system
# docker-compose up -d
- uses: actions/setup-python@v4 - uses: actions/setup-python@v4
- uses: iterative/setup-cml@v1 - uses: iterative/setup-cml@v1
- name: Train model - name: Train model

View file

@ -6,6 +6,7 @@ ARG GID=100
# Install patches # Install patches
RUN apt-get update && apt-get upgrade -y \ RUN apt-get update && apt-get upgrade -y \
&& apt-get install libgomp1 -y \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists && rm -rf /var/lib/apt/lists
@ -34,4 +35,4 @@ USER ${USER}
WORKDIR /home/simulation_system WORKDIR /home/simulation_system
# Run the python command # Run the python command
CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"] CMD ["python3", "predictions.py", "--data-path", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"]

View file

@ -6,6 +6,7 @@ ARG GID=100
# Install patches # Install patches
RUN apt-get update && apt-get upgrade -y \ RUN apt-get update && apt-get upgrade -y \
&& apt-get install libgomp1 -y \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists && rm -rf /var/lib/apt/lists
@ -34,4 +35,4 @@ USER ${USER}
WORKDIR /home/simulation_system WORKDIR /home/simulation_system
# Run the python command # Run the python command
CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"] CMD ["python3", "training.py", "--train-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"]

View file

@ -8,6 +8,7 @@ Key tasks:
- Generate Inference - Generate Inference
""" """
import os
from typing import Any from typing import Any
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor
from core.Logger import logger from core.Logger import logger
from core.Metrics import Metrics from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME from core.Settings import METRIC_FILENAME
from core.CloudClient import S3FSClient from core.CloudClient import BotoClient
AUTOGLUON_HYPERPARAMETERS = [ AUTOGLUON_HYPERPARAMETERS = [
"problem_type", "problem_type",
@ -54,34 +55,55 @@ class AutogluonModel:
def load_model( def load_model(
self, self,
filepath: str | Path, filepath: str | Path,
s3_client: S3FSClient, client: BotoClient,
model_folder: str = "local_model", model_folder: str = "local_model",
) -> None: ) -> None:
""" """
Providing a path, this function will load the model to be used. Will load to internal variable Providing a path, this function will load the model to be used. Will load to internal variable
""" """
filepath = str(filepath) filepath = str(filepath)
if s3_client.client is None: if client.client is None:
logger.info("In local development mode - no need for s3 client") logger.info("In local development mode - no need for s3 client")
self.model = TabularPredictor.load(path=filepath) self.model = TabularPredictor.load(path=filepath)
else: else:
logger.info(f"Loading model from s3") logger.info(f"Loading model from s3")
s3_client.download_model(filepath=filepath, model_folder=model_folder) client.download_model(filepath=filepath, model_folder=model_folder)
self.model = TabularPredictor.load(path=model_folder) self.model = TabularPredictor.load(path=model_folder)
def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: def save_model(self, output_filepath: Path, client: BotoClient) -> None:
""" """
Providing a path, this function will save the model to be used. Providing a path, this function will save the model to be used.
""" """
if s3fs_client.client is None: if client.client is None:
logger.info("In local development mode - no need for s3 client") logger.info("In local development mode - no need for s3 client")
logger.info("Using AutoGluon Model - Model saving already occured") logger.info("Using AutoGluon Model - Model saving already occured")
else: else:
logger.info(f"Saving model into s3") logger.info(f"Saving model into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete") logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
def train_model( def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None: ) -> None:

View file

@ -3,17 +3,105 @@ Set up the client to be used for downloading and uploading model files
""" """
import os import os
import s3fs import boto3
from core.Logger import logger from core.Logger import logger
class S3FSClient: # class S3FSClient:
# """
# Set up the correct client to upload files to s3
# """
# def __init__(self, runtime_environment: str = "local") -> None:
# self.client: s3fs.S3FileSystem | None = None
# self.model_bucket: str
# self.client_factory(runtime_environment)
# self.determine_model_bucket(runtime_environment)
# def client_factory(self, runtime_environment: str = "local"):
# """
# Select the correct s3 client to use
# """
# if runtime_environment == "local":
# logger.info("No S3 client setup required")
# elif runtime_environment == "local-mock":
# logger.info(f"S3 settings for {runtime_environment}")
# self.client = s3fs.S3FileSystem(
# key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
# secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
# client_kwargs={
# "endpoint_url": os.environ.get(
# "ENDPOINT_URL", "http://localhost:9000"
# )
# },
# )
# elif runtime_environment in ["dev", "staging", "prod"]:
# logger.info(f"S3 settings for {runtime_environment}")
# # Key/ token should be in session/lambda for this
# self.client = s3fs.S3FileSystem()
# else:
# raise NotImplementedError("No correspnding runtime environment")
# def determine_model_bucket(self, runtime_environment: str) -> None:
# """
# For the given environment, return the correct bucket for models
# """
# if runtime_environment == "local":
# logger.info("In local development - no need for s3")
# elif runtime_environment in ["local-mock", "dev"]:
# # TODO: get from enironment
# self.model_bucket = "retrofit-model-directory-dev"
# elif runtime_environment in ["staging", "prod"]:
# self.model_bucket = f"retrofit-model-directory-{runtime_environment}"
# else:
# raise NotImplementedError("No corresponding runtime environment")
# def download_model(self, filepath: str, model_folder: str):
# """
# For the file path, download the model locally so that we can load the model
# """
# if self.client is None:
# logger.info("No need to download model as local development")
# else:
# def list_files_recursively(folder_path, client):
# all_files = []
# for root, dirs, files in client.walk(folder_path):
# for file in files:
# s3_path = os.path.join(root, file)
# all_files.append(s3_path)
# return all_files
# # List all files in the specified S3 folder and its subfolders
# files = list_files_recursively(
# f"{self.model_bucket}/{filepath}", client=self.client
# )
# # Download each file
# for file in files:
# # Extract the filename from the S3 path
# filename = file.split(filepath)[-1]
# # Define the local path where you want to save the file
# local_path = os.path.join(model_folder, filename)
# # Download the file from S3 to the local directory
# self.client.get(file, local_path)
# print(f"Downloaded {filename} to {local_path}")
# print("Download completed.")
class BotoClient:
""" """
Set up the correct client to upload files to s3 Using boto3 to access the different aws storage configurations
""" """
def __init__(self, runtime_environment: str = "local") -> None: def __init__(self, runtime_environment: str = "local") -> None:
self.client: s3fs.S3FileSystem | None = None self.client = None
self.model_bucket: str self.model_bucket: str
self.client_factory(runtime_environment) self.client_factory(runtime_environment)
@ -28,19 +116,19 @@ class S3FSClient:
logger.info("No S3 client setup required") logger.info("No S3 client setup required")
elif runtime_environment == "local-mock": elif runtime_environment == "local-mock":
logger.info(f"S3 settings for {runtime_environment}") logger.info(f"S3 settings for {runtime_environment}")
self.client = s3fs.S3FileSystem( session = boto3.Session()
key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), self.client = session.client(
secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), service_name="s3",
client_kwargs={ aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
"endpoint_url": os.environ.get( aws_secret_access_key=os.environ.get(
"ENDPOINT_URL", "http://localhost:9000" "AWS_SECRET_ACCESS_KEY", "password"
) ),
}, endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000"),
) )
elif runtime_environment in ["dev", "staging", "prod"]: elif runtime_environment in ["dev", "staging", "prod"]:
logger.info(f"S3 settings for {runtime_environment}") logger.info(f"S3 settings for {runtime_environment}")
# Key/ token should be in session/lambda for this # Key/ token should be in session/lambda for this
self.client = s3fs.S3FileSystem() self.client = boto3.client("s3")
else: else:
raise NotImplementedError("No correspnding runtime environment") raise NotImplementedError("No correspnding runtime environment")
@ -62,34 +150,34 @@ class S3FSClient:
""" """
For the file path, download the model locally so that we can load the model For the file path, download the model locally so that we can load the model
""" """
# List all objects with the specified prefix in the bucket
if self.client is None: if self.client is None:
logger.info("No need to download model as local development") raise ValueError("SHould not be in here!")
else:
def list_files_recursively(folder_path, client): objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath)
all_files = []
for root, dirs, files in client.walk(folder_path):
for file in files:
s3_path = os.path.join(root, file)
all_files.append(s3_path)
return all_files
# List all files in the specified S3 folder and its subfolders # Ensure the local directory for downloads exists
files = list_files_recursively( if not os.path.exists(model_folder):
f"{self.model_bucket}/{filepath}", client=self.client os.makedirs(model_folder)
# Download each object with the specified prefix
for obj in objects.get("Contents", []):
# Get the object key (file path)
object_key = obj["Key"]
# Determine the local file path to save the object
local_file_path = os.path.join(
model_folder, object_key.split(f"{filepath}/")[-1]
) )
# Download each file # Create the local directory if it doesn't exist
for file in files: local_directory = os.path.dirname(local_file_path)
# Extract the filename from the S3 path if not os.path.exists(local_directory):
filename = file.split(filepath)[-1] os.makedirs(local_directory)
# Define the local path where you want to save the file # Download the object from S3 to the local directory
local_path = os.path.join("local_model", filename) self.client.download_file(self.model_bucket, object_key, local_file_path)
print(f"Downloaded {object_key} to {local_file_path}")
# Download the file from S3 to the local directory print("Download completed.")
self.client.get(file, local_path)
print(f"Downloaded {filename} to {local_path}")
print("Download completed.")

View file

@ -3,9 +3,10 @@ import os
from typing import Protocol from typing import Protocol
import boto3 import boto3
from io import BytesIO, StringIO from io import BytesIO, StringIO
from core.CloudClient import BotoClient
def read_parquet_from_s3(bucket_name, file_key): def read_parquet_from_s3(client, bucket_name, file_key):
""" """
Read a CSV file from S3 using boto3 and pandas. Read a CSV file from S3 using boto3 and pandas.
@ -15,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key):
:param aws_secret_access_key: AWS Secret Access Key :param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data. :return: DataFrame containing the CSV data.
""" """
# Initialize the S3 client
s3_client = boto3.client("s3")
# Get the object # Get the object
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame # Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read() csv_body = s3_object["Body"].read()
@ -28,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key):
return df return df
def read_csv_from_s3(bucket_name, file_key, index_col): def read_csv_from_s3(client, bucket_name, file_key, index_col):
""" """
Read a CSV file from S3 using boto3 and pandas. Read a CSV file from S3 using boto3 and pandas.
@ -38,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col):
:param aws_secret_access_key: AWS Secret Access Key :param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data. :return: DataFrame containing the CSV data.
""" """
# Initialize the S3 client
s3_client = boto3.client("s3")
# Get the object # Get the object
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame # Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8") csv_body = s3_object["Body"].read().decode("utf-8")
@ -57,7 +54,9 @@ class DataLoader(Protocol):
""" """
@staticmethod @staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None: def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame | None:
""" """
Loading data from the relevant source Loading data from the relevant source
""" """
@ -69,7 +68,9 @@ class LocalDataLoader:
""" """
@staticmethod @staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
if not os.path.exists(filepath): if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}") raise FileNotFoundError(f"File not found: {filepath}")
@ -86,56 +87,36 @@ class LocalDataLoader:
return df return df
class S3MockDataLoader: class S3DataLoader:
""" """
Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service
""" """
@staticmethod @staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: def load(
client: BotoClient, filepath: str, index_col: str | None = None
# TODO: Ingest these as environment variables in the docker compose file ) -> pd.DataFrame:
storage_options = {
"key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
"client_kwargs": {
"endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
},
}
if not filepath.startswith("s3://"): if not filepath.startswith("s3://"):
filepath = "s3://" + filepath filepath = "s3://" + filepath
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=storage_options)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(
filepath, index_col=index_col, storage_options=storage_options
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
filepath_split = filepath.split("s3://")[-1].split("/", 1) filepath_split = filepath.split("s3://")[-1].split("/", 1)
bucket = filepath_split[0] bucket = filepath_split[0]
key = filepath_split[1] key = filepath_split[1]
if filepath.endswith(".parquet"): if filepath.endswith(".parquet"):
df = read_parquet_from_s3(bucket, key) df = read_parquet_from_s3(
client=client.client, bucket_name=bucket, file_key=key
)
if index_col is not None: if index_col is not None:
df = df.set_index(index_col) df = df.set_index(index_col)
elif filepath.endswith(".csv"): elif filepath.endswith(".csv"):
df = read_csv_from_s3(bucket, key, index_col) df = read_csv_from_s3(
client=client.client,
bucket_name=bucket,
file_key=key,
index_col=index_col,
)
else: else:
raise ValueError(f"File format not supported for file: {filepath}") raise ValueError(f"File format not supported for file: {filepath}")
@ -152,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
dataloader_types = { dataloader_types = {
"local": LocalDataLoader(), "local": LocalDataLoader(),
"local-mock": S3MockDataLoader(), "local-mock": S3DataLoader(),
"dev": S3DataLoader(), "dev": S3DataLoader(),
"staging": S3DataLoader(), "staging": S3DataLoader(),
"prod": S3DataLoader(), "prod": S3DataLoader(),

View file

@ -5,11 +5,12 @@ Key tasks:
- Given a model and test data, produce a suite of all metrics - Given a model and test data, produce a suite of all metrics
""" """
import os
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
import seaborn as sns import seaborn as sns
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from core.CloudClient import S3FSClient from core.CloudClient import BotoClient
from core.Logger import logger from core.Logger import logger
from core.Settings import ( from core.Settings import (
RESIDUAL_TRUE_LABEL, RESIDUAL_TRUE_LABEL,
@ -64,18 +65,40 @@ class Metrics:
All metric functions used to generate a dictionary of metrics All metric functions used to generate a dictionary of metrics
""" """
def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None:
""" """
Providing a path, this function will save the metrics folders/files. Providing a path, this function will save the metrics folders/files.
""" """
if s3fs_client.client is None: if client.client is None:
logger.info("In local development mode - no need to upload") logger.info("In local development mode - no need to upload")
else: else:
logger.info(f"Saving metrics into s3") logger.info(f"Saving metrics into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) s3_location = client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete") logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
@staticmethod @staticmethod
def list_metric_functions() -> list: def list_metric_functions() -> list:
""" """

View file

@ -2,38 +2,65 @@
""" """
from io import StringIO
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
from core.Logger import logger from core.Logger import logger
from core.CloudClient import S3FSClient from core.CloudClient import BotoClient
from core.Metrics import Metrics from core.Metrics import Metrics
from core.Settings import BEST_MODEL_COLUMN_NAME from core.Settings import BEST_MODEL_COLUMN_NAME
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param aws_access_key_id: AWS Access Key ID
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
df = pd.read_csv(StringIO(csv_body), index_col=index_col)
return df
class RegistryHandler: class RegistryHandler:
""" """
Handles the loading of the registry depending on the environment Handles the loading of the registry depending on the environment
""" """
def load_registry( def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics):
self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics
):
""" """
Depening on the environment, we will have to load from locally or s3 (mock/real) Depening on the environment, we will have to load from locally or s3 (mock/real)
""" """
if s3fs_client.client is None: if client.client is None:
logger.info("Using local development - no need for s3 load") logger.info("Using local development - no need for s3 load")
return self.load_local_registry( return self.load_local_registry(
registry_path=registry_path, metrics=metrics registry_path=registry_path, metrics=metrics
) )
s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path)
logger.info(f"Check if registry exists") logger.info(f"Check if registry exists")
if s3fs_client.client.exists(s3_location):
registry_df = pd.read_csv( check_exists = client.client.list_objects_v2(
s3fs_client.client.open(s3_location), index_col=None Bucket=client.model_bucket, Prefix=str(registry_path)
)
if "Contents" in check_exists:
logger.info("Loading existing registry")
registry_df = read_csv_from_s3(
client=client.client,
bucket_name=client.model_bucket,
file_key=str(registry_path),
index_col=None,
) )
else: else:
logger.info("No registry found - creating new one") logger.info("No registry found - creating new one")
@ -70,14 +97,18 @@ class RegistryHandler:
return registry_df return registry_df
def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: def save_registry(self, output_filepath: Path, client: BotoClient) -> None:
""" """
Providing a path, this function will save the model to be used. Providing a path, this function will save the model to be used.
""" """
if s3fs_client.client is None: if client.client is None:
logger.info("In local development mode - no need for s3 client") logger.info("In local development mode - no need for s3 client")
else: else:
logger.info(f"Saving registry into s3") logger.info(f"Saving registry into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) s3_location = client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
client.client.upload_file(
str(output_filepath), client.model_bucket, str(output_filepath)
)
logger.info("Save complete") logger.info("Save complete")

View file

@ -18,19 +18,20 @@ services:
timeout: 20s timeout: 20s
retries: 3 retries: 3
# simulation_system_training: simulation_system_training:
# build: build:
# context: ./ context: ./
# dockerfile: ./Dockerfiles/Dockerfile.training dockerfile: ./Dockerfiles/Dockerfile.training
# image: simulation_system_training image: simulation_system_training
# environment: environment:
# ENDPOINT_URL: http://minio:9000/ RUNTIME_ENVIRONMENT: local-mock
# AWS_ACCESS_KEY_ID: *MINIO_USER ENDPOINT_URL: http://minio:9000/
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS AWS_ACCESS_KEY_ID: *MINIO_USER
# tty: true AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# depends_on: tty: true
# minio: depends_on:
# condition: service_healthy minio:
condition: service_healthy
# command: # command:
# ["bash"] # ["bash"]
@ -40,6 +41,7 @@ services:
# dockerfile: ./Dockerfiles/Dockerfile.prediction # dockerfile: ./Dockerfiles/Dockerfile.prediction
# image: simulation_system_prediction # image: simulation_system_prediction
# environment: # environment:
# RUNTIME_ENVIRONMENT: local-mock
# ENDPOINT_URL: http://minio:9000/ # ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER # AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS # AWS_SECRET_ACCESS_KEY: *MINIO_PASS
@ -47,7 +49,7 @@ services:
# depends_on: # depends_on:
# simulation_system_training: # simulation_system_training:
# condition: service_completed_successfully # condition: service_completed_successfully
# command: # command:
# ["bash"] # ["bash"]

View file

@ -5,13 +5,16 @@ Script to load MLModel class and generate predictions
import os import os
import json import json
import argparse import argparse
from pathlib import Path
import pandas as pd import pandas as pd
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime
from MLModel.Models import AutogluonModel from MLModel.Models import AutogluonModel
from core.Logger import logger from core.Logger import logger
from core.DataLoader import dataloader_factory from core.DataLoader import dataloader_factory
from core.CloudClient import S3FSClient from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.RegistryHandler import RegistryHandler
from core.Settings import ( from core.Settings import (
BASE_REGISTRY_PATH, BASE_REGISTRY_PATH,
REGISTRY_FILE, REGISTRY_FILE,
@ -19,13 +22,13 @@ from core.Settings import (
PREDICTION_FILE, PREDICTION_FILE,
METADATA_FILE, METADATA_FILE,
TIMESTAMP_FORMAT, TIMESTAMP_FORMAT,
MODEL_DIRECTORY,
) )
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING # FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
@ -36,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# For testing in dev s3 # For testing in dev s3
# Data path can be passed as so: # Data path can be passed as so:
# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet # python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" # data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"
def ingest_arguments() -> argparse.Namespace: def ingest_arguments() -> argparse.Namespace:
@ -83,7 +86,7 @@ def prediction(
if registry_path is None or not registry_path.exists(): if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist") logger.error("No registry path provided or registry doesn't exist")
exit(1) exit(1)
elif RUNTIME_ENVIRONMENT == "dev": elif RUNTIME_ENVIRONMENT in ["local-mock", "dev"]:
registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv" registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv"
else: else:
raise NotImplemented("TO be implemented") raise NotImplemented("TO be implemented")
@ -96,7 +99,17 @@ def prediction(
else: else:
# TODO: Think about where registry will sit/ type # TODO: Think about where registry will sit/ type
logger.info("Loading best model from registry") logger.info("Loading best model from registry")
registry_df = pd.read_csv(registry_path)
metrics = Metrics()
registry_handler = RegistryHandler()
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, client=CLIENT, metrics=metrics
)
# registry_df = pd.read_csv(registry_path)
best_model_df = registry_df[registry_df["best_model"]] best_model_df = registry_df[registry_df["best_model"]]
model_location = best_model_df["model_location"].values[0] model_location = best_model_df["model_location"].values[0]
@ -115,7 +128,7 @@ def prediction(
if data_path and data is None: if data_path and data is None:
logger.info("Loading data from provided path") logger.info("Loading data from provided path")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(filepath=data_path, index_col="id") data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None)
if data is None: if data is None:
raise ValueError("No data loaded") raise ValueError("No data loaded")
@ -137,14 +150,14 @@ def prediction(
logger.error("No other model currently") logger.error("No other model currently")
exit(1) exit(1)
model.load_model( model.load_model(filepath=model_location, client=CLIENT, model_folder="local_model")
filepath=model_location, s3_client=CLIENT, model_folder="local_model"
)
logger.info("--- Generating Predictions ---") logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data) prediction = model.generate_predictions(data=data)
return pd.concat([pd.Series(data.index, name='id'), prediction], axis=1) # logger.info(pd.concat([data["id"], prediction], axis=1))
return pd.concat([data["id"], prediction], axis=1)
# Save prediction some where? # Save prediction some where?
# prediction.to_csv("s3?") # prediction.to_csv("s3?")

View file

@ -1,4 +1,6 @@
boto3
autogluon==0.8.2 autogluon==0.8.2
pandas==1.5.3 pandas==1.5.3
s3fs==2023.6.0 seaborn==0.12.2
matplotlib==3.7.2
pre-commit==3.3.3 pre-commit==3.3.3

View file

@ -1,6 +1,5 @@
boto3 boto3
autogluon==0.8.2 autogluon==0.8.2
pandas==1.5.3 pandas==1.5.3
s3fs
seaborn==0.12.2 seaborn==0.12.2
matplotlib==3.7.2 matplotlib==3.7.2

View file

@ -1,5 +1,4 @@
autogluon==0.8.2 autogluon==0.8.2
pandas==1.5.3 pandas==1.5.3
seaborn==0.12.2 seaborn==0.12.2
s3fs==2023.6.0
pre-commit==3.3.3 pre-commit==3.3.3

View file

@ -1,4 +1,3 @@
autogluon==0.8.2 autogluon==0.8.2
pandas==1.5.3 pandas==1.5.3
seaborn==0.12.2 seaborn==0.12.2
s3fs==2023.6.0

View file

@ -10,7 +10,7 @@ from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor from core.FeatureProcessor import FeatureProcessor
from core.CloudClient import S3FSClient from core.CloudClient import BotoClient
from core.RegistryHandler import RegistryHandler from core.RegistryHandler import RegistryHandler
from core.Settings import ( from core.Settings import (
MODEL_DIRECTORY, MODEL_DIRECTORY,
@ -30,7 +30,8 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING # FOR TESTING
@ -105,8 +106,8 @@ def training(
logger.info("--- Loading data ---") logger.info("--- Loading data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
train_df = dataloader.load(filepath=train_filepath) train_df = dataloader.load(client=CLIENT, filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath) test_df = dataloader.load(client=CLIENT, filepath=test_filepath)
if train_df is None or test_df is None: if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline") raise ValueError("No data Loaded - cancelling pipeline")
@ -153,7 +154,7 @@ def training(
) )
logger.info("--- Save Model ---") logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT) model.save_model(output_filepath=model.output_filepath, client=CLIENT)
logger.info("--- Generate evaluation metrics ---") logger.info("--- Generate evaluation metrics ---")
metrics = Metrics() metrics = Metrics()
@ -166,8 +167,6 @@ def training(
metrics=metrics, metrics=metrics,
) )
metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT)
logger.info("--- Generate metric outputs using predictions ---") logger.info("--- Generate metric outputs using predictions ---")
# metrics.generate_plot_suite() # metrics.generate_plot_suite()
@ -179,7 +178,7 @@ def training(
output_filepath=plot_output_path, output_filepath=plot_output_path,
) )
metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT) metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report # TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host # If we want residual plot/ any plots, we will need to self host
@ -196,7 +195,7 @@ def training(
f"Optimised version of best model can be found at: {deployment_model_path}" f"Optimised version of best model can be found at: {deployment_model_path}"
) )
model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT) model.save_model(output_filepath=deployment_model_path, client=CLIENT)
# TODO: Need a model registry - for now have this as a CSV # TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory # Save this in the model directory
@ -208,7 +207,7 @@ def training(
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry( registry_df = registry_handler.load_registry(
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics registry_path=registry_path, client=CLIENT, metrics=metrics
) )
model_details_df = pd.DataFrame( model_details_df = pd.DataFrame(
@ -235,7 +234,7 @@ def training(
registry_path.parent.mkdir(parents=True, exist_ok=True) registry_path.parent.mkdir(parents=True, exist_ok=True)
registry_df.to_csv(registry_path, index=False) registry_df.to_csv(registry_path, index=False)
registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT) registry_handler.save_registry(output_filepath=registry_path, client=CLIENT)
logger.info("--- Clean up ---") logger.info("--- Clean up ---")
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists(): if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():

View file

@ -19,13 +19,14 @@ provider:
# Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET # Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET
- Effect: Allow - Effect: Allow
Action: Action:
- s3:GetObject - s3:*
- s3:ListBucket # - s3:GetObject
# - s3:ListBucket
Resource: Resource:
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET} - arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/* - arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/*
- arn:aws:s3:::${env:DATA_BUCKET} - arn:aws:s3:::${env:DATA_BUCKET}
- arn:aws:s3:::${env:DATA_BUCKET}/* - arn:aws:s3:::${env:DATA_BUCKET}/*
# Allow reading and writing to PREDICTIONS_BUCKET # Allow reading and writing to PREDICTIONS_BUCKET
- Effect: Allow - Effect: Allow
Action: Action: