change s3fs for boto3

This commit is contained in:
Michael Duong 2023-09-01 19:25:35 +01:00
parent 5cf29db634
commit 8120c0bdcd
7 changed files with 165 additions and 93 deletions

View file

@ -8,6 +8,7 @@ Key tasks:
- Generate Inference
"""
import os
from typing import Any
from pathlib import Path
import pandas as pd
@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor
from core.Logger import logger
from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
AUTOGLUON_HYPERPARAMETERS = [
"problem_type",
@ -54,34 +55,55 @@ class AutogluonModel:
def load_model(
self,
filepath: str | Path,
s3_client: S3FSClient,
client: BotoClient,
model_folder: str = "local_model",
) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
filepath = str(filepath)
if s3_client.client is None:
if client.client is None:
logger.info("In local development mode - no need for s3 client")
self.model = TabularPredictor.load(path=filepath)
else:
logger.info(f"Loading model from s3")
s3_client.download_model(filepath=filepath, model_folder=model_folder)
client.download_model(filepath=filepath, model_folder=model_folder)
self.model = TabularPredictor.load(path=model_folder)
def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
def save_model(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("In local development mode - no need for s3 client")
logger.info("Using AutoGluon Model - Model saving already occured")
else:
logger.info(f"Saving model into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:

View file

@ -151,4 +151,34 @@ class BotoClient:
"""
For the file path, download the model locally so that we can load the model
"""
pass
# List all objects with the specified prefix in the bucket
if self.client is None:
raise ValueError("SHould not be in here!")
objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath)
# Ensure the local directory for downloads exists
if not os.path.exists(model_folder):
os.makedirs(model_folder)
# Download each object with the specified prefix
for obj in objects.get("Contents", []):
# Get the object key (file path)
object_key = obj["Key"]
# Determine the local file path to save the object
local_file_path = os.path.join(
model_folder, object_key.split(f"{filepath}/")[-1]
)
# Create the local directory if it doesn't exist
local_directory = os.path.dirname(local_file_path)
if not os.path.exists(local_directory):
os.makedirs(local_directory)
# Download the object from S3 to the local directory
self.client.download_file(self.model_bucket, object_key, local_file_path)
print(f"Downloaded {object_key} to {local_file_path}")
print("Download completed.")

View file

@ -6,7 +6,7 @@ from io import BytesIO, StringIO
from core.CloudClient import BotoClient
def read_parquet_from_s3(bucket_name, file_key):
def read_parquet_from_s3(client, bucket_name, file_key):
"""
Read a CSV file from S3 using boto3 and pandas.
@ -16,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key):
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Initialize the S3 client
s3_client = boto3.client("s3")
# Get the object
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key)
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
@ -29,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key):
return df
def read_csv_from_s3(bucket_name, file_key, index_col):
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
@ -39,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col):
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Initialize the S3 client
s3_client = boto3.client("s3")
# Get the object
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key)
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
@ -91,7 +87,7 @@ class LocalDataLoader:
return df
class S3MockDataLoader:
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service
"""
@ -101,51 +97,26 @@ class S3MockDataLoader:
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
# TODO: Ingest these as environment variables in the docker compose file
storage_options = {
"key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
"client_kwargs": {
"endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
},
}
if not filepath.startswith("s3://"):
filepath = "s3://" + filepath
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=storage_options)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(
filepath, index_col=index_col, storage_options=storage_options
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files
"""
@staticmethod
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
filepath_split = filepath.split("s3://")[-1].split("/", 1)
bucket = filepath_split[0]
key = filepath_split[1]
if filepath.endswith(".parquet"):
df = read_parquet_from_s3(bucket, key)
df = read_parquet_from_s3(
client=client.client, bucket_name=bucket, file_key=key
)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = read_csv_from_s3(bucket, key, index_col)
df = read_csv_from_s3(
client=client.client,
bucket_name=bucket,
file_key=key,
index_col=index_col,
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
@ -162,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
dataloader_types = {
"local": LocalDataLoader(),
"local-mock": S3MockDataLoader(),
"local-mock": S3DataLoader(),
"dev": S3DataLoader(),
"staging": S3DataLoader(),
"prod": S3DataLoader(),

View file

@ -5,11 +5,12 @@ Key tasks:
- Given a model and test data, produce a suite of all metrics
"""
import os
import pandas as pd
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.Logger import logger
from core.Settings import (
RESIDUAL_TRUE_LABEL,
@ -64,18 +65,40 @@ class Metrics:
All metric functions used to generate a dictionary of metrics
"""
def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the metrics folders/files.
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("In local development mode - no need to upload")
else:
logger.info(f"Saving metrics into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
s3_location = client.model_bucket + "/" + str(output_filepath)
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
@staticmethod
def list_metric_functions() -> list:
"""

View file

@ -2,38 +2,65 @@
"""
from io import StringIO
import pandas as pd
from pathlib import Path
from core.Logger import logger
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.Settings import BEST_MODEL_COLUMN_NAME
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param aws_access_key_id: AWS Access Key ID
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
df = pd.read_csv(StringIO(csv_body), index_col=index_col)
return df
class RegistryHandler:
"""
Handles the loading of the registry depending on the environment
"""
def load_registry(
self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics
):
def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics):
"""
Depening on the environment, we will have to load from locally or s3 (mock/real)
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("Using local development - no need for s3 load")
return self.load_local_registry(
registry_path=registry_path, metrics=metrics
)
s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path)
logger.info(f"Check if registry exists")
if s3fs_client.client.exists(s3_location):
registry_df = pd.read_csv(
s3fs_client.client.open(s3_location), index_col=None
check_exists = client.client.list_objects_v2(
Bucket=client.model_bucket, Prefix=str(registry_path)
)
if "Contents" in check_exists:
logger.info("Loading existing registry")
registry_df = read_csv_from_s3(
client=client.client,
bucket_name=client.model_bucket,
file_key=str(registry_path),
index_col=None,
)
else:
logger.info("No registry found - creating new one")
@ -70,14 +97,18 @@ class RegistryHandler:
return registry_df
def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
def save_registry(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("In local development mode - no need for s3 client")
else:
logger.info(f"Saving registry into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
s3_location = client.model_bucket + "/" + str(output_filepath)
client.client.upload_file(
str(output_filepath), client.model_bucket, str(output_filepath)
)
logger.info("Save complete")

View file

@ -12,7 +12,7 @@ from datetime import datetime
from MLModel.Models import AutogluonModel
from core.Logger import logger
from core.DataLoader import dataloader_factory
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.RegistryHandler import RegistryHandler
from core.Settings import (
@ -28,8 +28,7 @@ from core.Settings import (
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local-mock")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
@ -40,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# For testing in dev s3
# Data path can be passed as so:
# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"
def ingest_arguments() -> argparse.Namespace:
@ -107,7 +106,7 @@ def prediction(
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics
registry_path=registry_path, client=CLIENT, metrics=metrics
)
# registry_df = pd.read_csv(registry_path)
@ -129,13 +128,13 @@ def prediction(
if data_path and data is None:
logger.info("Loading data from provided path")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(filepath=data_path, index_col="id")
data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None)
if data is None:
raise ValueError("No data loaded")
# # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
data = data.sample(1)
# data = data.sample(1)
else:
logger.info("Using data provided")
data = json.loads(str(data))
@ -151,14 +150,12 @@ def prediction(
logger.error("No other model currently")
exit(1)
model.load_model(
filepath=model_location, s3_client=CLIENT, model_folder="local_model"
)
model.load_model(filepath=model_location, client=CLIENT, model_folder="local_model")
logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data)
return pd.concat([pd.Series(data.index, name="id"), prediction], axis=1)
return pd.concat([data["id"], prediction], axis=1)
# Save prediction some where?
# prediction.to_csv("s3?")

View file

@ -106,8 +106,8 @@ def training(
logger.info("--- Loading data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
train_df = dataloader.load(client=CLIENT, filepath=train_filepath)
test_df = dataloader.load(client=CLIENT, filepath=test_filepath)
if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline")
@ -154,7 +154,7 @@ def training(
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT)
model.save_model(output_filepath=model.output_filepath, client=CLIENT)
logger.info("--- Generate evaluation metrics ---")
metrics = Metrics()
@ -167,8 +167,6 @@ def training(
metrics=metrics,
)
metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT)
logger.info("--- Generate metric outputs using predictions ---")
# metrics.generate_plot_suite()
@ -180,7 +178,7 @@ def training(
output_filepath=plot_output_path,
)
metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT)
metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
@ -197,7 +195,7 @@ def training(
f"Optimised version of best model can be found at: {deployment_model_path}"
)
model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT)
model.save_model(output_filepath=deployment_model_path, client=CLIENT)
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
@ -209,7 +207,7 @@ def training(
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics
registry_path=registry_path, client=CLIENT, metrics=metrics
)
model_details_df = pd.DataFrame(
@ -236,7 +234,7 @@ def training(
registry_path.parent.mkdir(parents=True, exist_ok=True)
registry_df.to_csv(registry_path, index=False)
registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT)
registry_handler.save_registry(output_filepath=registry_path, client=CLIENT)
logger.info("--- Clean up ---")
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():