mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
commit
bc3b16548e
16 changed files with 316 additions and 151 deletions
5
.github/workflows/cml.yml
vendored
5
.github/workflows/cml.yml
vendored
|
|
@ -9,6 +9,11 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
# TODO: use dvc to pull data, mkdir s3-mock, load data, then use docker compose
|
||||
# - name: Build docker compose stack
|
||||
# run: |
|
||||
# cd model_data/simulation_system
|
||||
# docker-compose up -d
|
||||
- uses: actions/setup-python@v4
|
||||
- uses: iterative/setup-cml@v1
|
||||
- name: Train model
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ ARG GID=100
|
|||
|
||||
# Install patches
|
||||
RUN apt-get update && apt-get upgrade -y \
|
||||
&& apt-get install libgomp1 -y \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists
|
||||
|
||||
|
|
@ -34,4 +35,4 @@ USER ${USER}
|
|||
WORKDIR /home/simulation_system
|
||||
|
||||
# Run the python command
|
||||
CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"]
|
||||
CMD ["python3", "predictions.py", "--data-path", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"]
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ ARG GID=100
|
|||
|
||||
# Install patches
|
||||
RUN apt-get update && apt-get upgrade -y \
|
||||
&& apt-get install libgomp1 -y \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists
|
||||
|
||||
|
|
@ -34,4 +35,4 @@ USER ${USER}
|
|||
WORKDIR /home/simulation_system
|
||||
|
||||
# Run the python command
|
||||
CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"]
|
||||
CMD ["python3", "training.py", "--train-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"]
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ Key tasks:
|
|||
- Generate Inference
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Any
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
|
|
@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor
|
|||
from core.Logger import logger
|
||||
from core.Metrics import Metrics
|
||||
from core.Settings import METRIC_FILENAME
|
||||
from core.CloudClient import S3FSClient
|
||||
from core.CloudClient import BotoClient
|
||||
|
||||
AUTOGLUON_HYPERPARAMETERS = [
|
||||
"problem_type",
|
||||
|
|
@ -54,34 +55,55 @@ class AutogluonModel:
|
|||
def load_model(
|
||||
self,
|
||||
filepath: str | Path,
|
||||
s3_client: S3FSClient,
|
||||
client: BotoClient,
|
||||
model_folder: str = "local_model",
|
||||
) -> None:
|
||||
"""
|
||||
Providing a path, this function will load the model to be used. Will load to internal variable
|
||||
"""
|
||||
filepath = str(filepath)
|
||||
if s3_client.client is None:
|
||||
if client.client is None:
|
||||
logger.info("In local development mode - no need for s3 client")
|
||||
self.model = TabularPredictor.load(path=filepath)
|
||||
else:
|
||||
logger.info(f"Loading model from s3")
|
||||
s3_client.download_model(filepath=filepath, model_folder=model_folder)
|
||||
client.download_model(filepath=filepath, model_folder=model_folder)
|
||||
self.model = TabularPredictor.load(path=model_folder)
|
||||
|
||||
def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
|
||||
def save_model(self, output_filepath: Path, client: BotoClient) -> None:
|
||||
"""
|
||||
Providing a path, this function will save the model to be used.
|
||||
"""
|
||||
if s3fs_client.client is None:
|
||||
if client.client is None:
|
||||
logger.info("In local development mode - no need for s3 client")
|
||||
logger.info("Using AutoGluon Model - Model saving already occured")
|
||||
else:
|
||||
logger.info(f"Saving model into s3")
|
||||
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
|
||||
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
|
||||
|
||||
self.directory_upload(
|
||||
client=client,
|
||||
local_directory=str(output_filepath),
|
||||
bucket_name=client.model_bucket,
|
||||
)
|
||||
|
||||
logger.info("Save complete")
|
||||
|
||||
def directory_upload(self, client, local_directory, bucket_name):
|
||||
|
||||
# Iterate through the local directory and upload each file
|
||||
for root, dirs, files in os.walk(local_directory):
|
||||
for file in files:
|
||||
# Determine the local file path and S3 object key
|
||||
local_file_path = os.path.join(root, file)
|
||||
s3_object_key = os.path.relpath(local_file_path, local_directory)
|
||||
|
||||
# Upload the file to S3
|
||||
client.client.upload_file(local_file_path, bucket_name, local_file_path)
|
||||
|
||||
logger.info(
|
||||
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
|
||||
)
|
||||
|
||||
def train_model(
|
||||
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
|
||||
) -> None:
|
||||
|
|
|
|||
|
|
@ -3,17 +3,105 @@ Set up the client to be used for downloading and uploading model files
|
|||
"""
|
||||
|
||||
import os
|
||||
import s3fs
|
||||
import boto3
|
||||
from core.Logger import logger
|
||||
|
||||
|
||||
class S3FSClient:
|
||||
# class S3FSClient:
|
||||
# """
|
||||
# Set up the correct client to upload files to s3
|
||||
# """
|
||||
|
||||
# def __init__(self, runtime_environment: str = "local") -> None:
|
||||
# self.client: s3fs.S3FileSystem | None = None
|
||||
# self.model_bucket: str
|
||||
|
||||
# self.client_factory(runtime_environment)
|
||||
# self.determine_model_bucket(runtime_environment)
|
||||
|
||||
# def client_factory(self, runtime_environment: str = "local"):
|
||||
# """
|
||||
# Select the correct s3 client to use
|
||||
# """
|
||||
|
||||
# if runtime_environment == "local":
|
||||
# logger.info("No S3 client setup required")
|
||||
# elif runtime_environment == "local-mock":
|
||||
# logger.info(f"S3 settings for {runtime_environment}")
|
||||
# self.client = s3fs.S3FileSystem(
|
||||
# key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
|
||||
# secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
|
||||
# client_kwargs={
|
||||
# "endpoint_url": os.environ.get(
|
||||
# "ENDPOINT_URL", "http://localhost:9000"
|
||||
# )
|
||||
# },
|
||||
# )
|
||||
# elif runtime_environment in ["dev", "staging", "prod"]:
|
||||
# logger.info(f"S3 settings for {runtime_environment}")
|
||||
# # Key/ token should be in session/lambda for this
|
||||
# self.client = s3fs.S3FileSystem()
|
||||
# else:
|
||||
# raise NotImplementedError("No correspnding runtime environment")
|
||||
|
||||
# def determine_model_bucket(self, runtime_environment: str) -> None:
|
||||
# """
|
||||
# For the given environment, return the correct bucket for models
|
||||
# """
|
||||
# if runtime_environment == "local":
|
||||
# logger.info("In local development - no need for s3")
|
||||
# elif runtime_environment in ["local-mock", "dev"]:
|
||||
# # TODO: get from enironment
|
||||
# self.model_bucket = "retrofit-model-directory-dev"
|
||||
# elif runtime_environment in ["staging", "prod"]:
|
||||
# self.model_bucket = f"retrofit-model-directory-{runtime_environment}"
|
||||
# else:
|
||||
# raise NotImplementedError("No corresponding runtime environment")
|
||||
|
||||
# def download_model(self, filepath: str, model_folder: str):
|
||||
# """
|
||||
# For the file path, download the model locally so that we can load the model
|
||||
# """
|
||||
|
||||
# if self.client is None:
|
||||
# logger.info("No need to download model as local development")
|
||||
# else:
|
||||
|
||||
# def list_files_recursively(folder_path, client):
|
||||
# all_files = []
|
||||
# for root, dirs, files in client.walk(folder_path):
|
||||
# for file in files:
|
||||
# s3_path = os.path.join(root, file)
|
||||
# all_files.append(s3_path)
|
||||
# return all_files
|
||||
|
||||
# # List all files in the specified S3 folder and its subfolders
|
||||
# files = list_files_recursively(
|
||||
# f"{self.model_bucket}/{filepath}", client=self.client
|
||||
# )
|
||||
|
||||
# # Download each file
|
||||
# for file in files:
|
||||
# # Extract the filename from the S3 path
|
||||
# filename = file.split(filepath)[-1]
|
||||
|
||||
# # Define the local path where you want to save the file
|
||||
# local_path = os.path.join(model_folder, filename)
|
||||
|
||||
# # Download the file from S3 to the local directory
|
||||
# self.client.get(file, local_path)
|
||||
# print(f"Downloaded {filename} to {local_path}")
|
||||
|
||||
# print("Download completed.")
|
||||
|
||||
|
||||
class BotoClient:
|
||||
"""
|
||||
Set up the correct client to upload files to s3
|
||||
Using boto3 to access the different aws storage configurations
|
||||
"""
|
||||
|
||||
def __init__(self, runtime_environment: str = "local") -> None:
|
||||
self.client: s3fs.S3FileSystem | None = None
|
||||
self.client = None
|
||||
self.model_bucket: str
|
||||
|
||||
self.client_factory(runtime_environment)
|
||||
|
|
@ -28,19 +116,19 @@ class S3FSClient:
|
|||
logger.info("No S3 client setup required")
|
||||
elif runtime_environment == "local-mock":
|
||||
logger.info(f"S3 settings for {runtime_environment}")
|
||||
self.client = s3fs.S3FileSystem(
|
||||
key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
|
||||
secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
|
||||
client_kwargs={
|
||||
"endpoint_url": os.environ.get(
|
||||
"ENDPOINT_URL", "http://localhost:9000"
|
||||
)
|
||||
},
|
||||
session = boto3.Session()
|
||||
self.client = session.client(
|
||||
service_name="s3",
|
||||
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
|
||||
aws_secret_access_key=os.environ.get(
|
||||
"AWS_SECRET_ACCESS_KEY", "password"
|
||||
),
|
||||
endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000"),
|
||||
)
|
||||
elif runtime_environment in ["dev", "staging", "prod"]:
|
||||
logger.info(f"S3 settings for {runtime_environment}")
|
||||
# Key/ token should be in session/lambda for this
|
||||
self.client = s3fs.S3FileSystem()
|
||||
self.client = boto3.client("s3")
|
||||
else:
|
||||
raise NotImplementedError("No correspnding runtime environment")
|
||||
|
||||
|
|
@ -62,34 +150,34 @@ class S3FSClient:
|
|||
"""
|
||||
For the file path, download the model locally so that we can load the model
|
||||
"""
|
||||
# List all objects with the specified prefix in the bucket
|
||||
|
||||
if self.client is None:
|
||||
logger.info("No need to download model as local development")
|
||||
else:
|
||||
raise ValueError("SHould not be in here!")
|
||||
|
||||
def list_files_recursively(folder_path, client):
|
||||
all_files = []
|
||||
for root, dirs, files in client.walk(folder_path):
|
||||
for file in files:
|
||||
s3_path = os.path.join(root, file)
|
||||
all_files.append(s3_path)
|
||||
return all_files
|
||||
objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath)
|
||||
|
||||
# List all files in the specified S3 folder and its subfolders
|
||||
files = list_files_recursively(
|
||||
f"{self.model_bucket}/{filepath}", client=self.client
|
||||
# Ensure the local directory for downloads exists
|
||||
if not os.path.exists(model_folder):
|
||||
os.makedirs(model_folder)
|
||||
|
||||
# Download each object with the specified prefix
|
||||
for obj in objects.get("Contents", []):
|
||||
# Get the object key (file path)
|
||||
object_key = obj["Key"]
|
||||
|
||||
# Determine the local file path to save the object
|
||||
local_file_path = os.path.join(
|
||||
model_folder, object_key.split(f"{filepath}/")[-1]
|
||||
)
|
||||
|
||||
# Download each file
|
||||
for file in files:
|
||||
# Extract the filename from the S3 path
|
||||
filename = file.split(filepath)[-1]
|
||||
# Create the local directory if it doesn't exist
|
||||
local_directory = os.path.dirname(local_file_path)
|
||||
if not os.path.exists(local_directory):
|
||||
os.makedirs(local_directory)
|
||||
|
||||
# Define the local path where you want to save the file
|
||||
local_path = os.path.join("local_model", filename)
|
||||
# Download the object from S3 to the local directory
|
||||
self.client.download_file(self.model_bucket, object_key, local_file_path)
|
||||
print(f"Downloaded {object_key} to {local_file_path}")
|
||||
|
||||
# Download the file from S3 to the local directory
|
||||
self.client.get(file, local_path)
|
||||
print(f"Downloaded {filename} to {local_path}")
|
||||
|
||||
print("Download completed.")
|
||||
print("Download completed.")
|
||||
|
|
|
|||
|
|
@ -3,9 +3,10 @@ import os
|
|||
from typing import Protocol
|
||||
import boto3
|
||||
from io import BytesIO, StringIO
|
||||
from core.CloudClient import BotoClient
|
||||
|
||||
|
||||
def read_parquet_from_s3(bucket_name, file_key):
|
||||
def read_parquet_from_s3(client, bucket_name, file_key):
|
||||
"""
|
||||
Read a CSV file from S3 using boto3 and pandas.
|
||||
|
||||
|
|
@ -15,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key):
|
|||
:param aws_secret_access_key: AWS Secret Access Key
|
||||
:return: DataFrame containing the CSV data.
|
||||
"""
|
||||
# Initialize the S3 client
|
||||
s3_client = boto3.client("s3")
|
||||
|
||||
# Get the object
|
||||
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key)
|
||||
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
|
||||
|
||||
# Read the CSV body into a DataFrame
|
||||
csv_body = s3_object["Body"].read()
|
||||
|
|
@ -28,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key):
|
|||
return df
|
||||
|
||||
|
||||
def read_csv_from_s3(bucket_name, file_key, index_col):
|
||||
def read_csv_from_s3(client, bucket_name, file_key, index_col):
|
||||
"""
|
||||
Read a CSV file from S3 using boto3 and pandas.
|
||||
|
||||
|
|
@ -38,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col):
|
|||
:param aws_secret_access_key: AWS Secret Access Key
|
||||
:return: DataFrame containing the CSV data.
|
||||
"""
|
||||
# Initialize the S3 client
|
||||
s3_client = boto3.client("s3")
|
||||
|
||||
# Get the object
|
||||
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key)
|
||||
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
|
||||
|
||||
# Read the CSV body into a DataFrame
|
||||
csv_body = s3_object["Body"].read().decode("utf-8")
|
||||
|
|
@ -57,7 +54,9 @@ class DataLoader(Protocol):
|
|||
"""
|
||||
|
||||
@staticmethod
|
||||
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None:
|
||||
def load(
|
||||
client: BotoClient, filepath: str, index_col: str | None = None
|
||||
) -> pd.DataFrame | None:
|
||||
"""
|
||||
Loading data from the relevant source
|
||||
"""
|
||||
|
|
@ -69,7 +68,9 @@ class LocalDataLoader:
|
|||
"""
|
||||
|
||||
@staticmethod
|
||||
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
|
||||
def load(
|
||||
client: BotoClient, filepath: str, index_col: str | None = None
|
||||
) -> pd.DataFrame:
|
||||
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError(f"File not found: {filepath}")
|
||||
|
|
@ -86,56 +87,36 @@ class LocalDataLoader:
|
|||
return df
|
||||
|
||||
|
||||
class S3MockDataLoader:
|
||||
class S3DataLoader:
|
||||
"""
|
||||
Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
|
||||
|
||||
# TODO: Ingest these as environment variables in the docker compose file
|
||||
storage_options = {
|
||||
"key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
|
||||
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
|
||||
"client_kwargs": {
|
||||
"endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
|
||||
},
|
||||
}
|
||||
def load(
|
||||
client: BotoClient, filepath: str, index_col: str | None = None
|
||||
) -> pd.DataFrame:
|
||||
|
||||
if not filepath.startswith("s3://"):
|
||||
filepath = "s3://" + filepath
|
||||
|
||||
if filepath.endswith(".parquet"):
|
||||
df = pd.read_parquet(filepath, storage_options=storage_options)
|
||||
if index_col is not None:
|
||||
df = df.set_index(index_col)
|
||||
elif filepath.endswith(".csv"):
|
||||
df = pd.read_csv(
|
||||
filepath, index_col=index_col, storage_options=storage_options
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"File format not supported for file: {filepath}")
|
||||
|
||||
return df
|
||||
|
||||
|
||||
class S3DataLoader:
|
||||
"""
|
||||
Implements the DataLoader Protocol for s3 files
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
|
||||
filepath_split = filepath.split("s3://")[-1].split("/", 1)
|
||||
bucket = filepath_split[0]
|
||||
key = filepath_split[1]
|
||||
|
||||
if filepath.endswith(".parquet"):
|
||||
df = read_parquet_from_s3(bucket, key)
|
||||
df = read_parquet_from_s3(
|
||||
client=client.client, bucket_name=bucket, file_key=key
|
||||
)
|
||||
if index_col is not None:
|
||||
df = df.set_index(index_col)
|
||||
elif filepath.endswith(".csv"):
|
||||
df = read_csv_from_s3(bucket, key, index_col)
|
||||
df = read_csv_from_s3(
|
||||
client=client.client,
|
||||
bucket_name=bucket,
|
||||
file_key=key,
|
||||
index_col=index_col,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"File format not supported for file: {filepath}")
|
||||
|
||||
|
|
@ -152,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
|
|||
|
||||
dataloader_types = {
|
||||
"local": LocalDataLoader(),
|
||||
"local-mock": S3MockDataLoader(),
|
||||
"local-mock": S3DataLoader(),
|
||||
"dev": S3DataLoader(),
|
||||
"staging": S3DataLoader(),
|
||||
"prod": S3DataLoader(),
|
||||
|
|
|
|||
|
|
@ -5,11 +5,12 @@ Key tasks:
|
|||
- Given a model and test data, produce a suite of all metrics
|
||||
"""
|
||||
|
||||
import os
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
import seaborn as sns
|
||||
import matplotlib.pyplot as plt
|
||||
from core.CloudClient import S3FSClient
|
||||
from core.CloudClient import BotoClient
|
||||
from core.Logger import logger
|
||||
from core.Settings import (
|
||||
RESIDUAL_TRUE_LABEL,
|
||||
|
|
@ -64,18 +65,40 @@ class Metrics:
|
|||
All metric functions used to generate a dictionary of metrics
|
||||
"""
|
||||
|
||||
def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
|
||||
def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None:
|
||||
"""
|
||||
Providing a path, this function will save the metrics folders/files.
|
||||
"""
|
||||
if s3fs_client.client is None:
|
||||
if client.client is None:
|
||||
logger.info("In local development mode - no need to upload")
|
||||
else:
|
||||
logger.info(f"Saving metrics into s3")
|
||||
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
|
||||
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
|
||||
s3_location = client.model_bucket + "/" + str(output_filepath)
|
||||
|
||||
self.directory_upload(
|
||||
client=client,
|
||||
local_directory=str(output_filepath),
|
||||
bucket_name=client.model_bucket,
|
||||
)
|
||||
|
||||
logger.info("Save complete")
|
||||
|
||||
def directory_upload(self, client, local_directory, bucket_name):
|
||||
|
||||
# Iterate through the local directory and upload each file
|
||||
for root, dirs, files in os.walk(local_directory):
|
||||
for file in files:
|
||||
# Determine the local file path and S3 object key
|
||||
local_file_path = os.path.join(root, file)
|
||||
s3_object_key = os.path.relpath(local_file_path, local_directory)
|
||||
|
||||
# Upload the file to S3
|
||||
client.client.upload_file(local_file_path, bucket_name, local_file_path)
|
||||
|
||||
logger.info(
|
||||
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def list_metric_functions() -> list:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -2,38 +2,65 @@
|
|||
|
||||
"""
|
||||
|
||||
from io import StringIO
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from core.Logger import logger
|
||||
from core.CloudClient import S3FSClient
|
||||
from core.CloudClient import BotoClient
|
||||
from core.Metrics import Metrics
|
||||
from core.Settings import BEST_MODEL_COLUMN_NAME
|
||||
|
||||
|
||||
def read_csv_from_s3(client, bucket_name, file_key, index_col):
|
||||
"""
|
||||
Read a CSV file from S3 using boto3 and pandas.
|
||||
|
||||
:param bucket_name: Name of the S3 bucket.
|
||||
:param file_key: Key of the file (including directory path within the bucket).
|
||||
:param aws_access_key_id: AWS Access Key ID
|
||||
:param aws_secret_access_key: AWS Secret Access Key
|
||||
:return: DataFrame containing the CSV data.
|
||||
"""
|
||||
|
||||
# Get the object
|
||||
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
|
||||
|
||||
# Read the CSV body into a DataFrame
|
||||
csv_body = s3_object["Body"].read().decode("utf-8")
|
||||
df = pd.read_csv(StringIO(csv_body), index_col=index_col)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
class RegistryHandler:
|
||||
"""
|
||||
Handles the loading of the registry depending on the environment
|
||||
"""
|
||||
|
||||
def load_registry(
|
||||
self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics
|
||||
):
|
||||
def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics):
|
||||
"""
|
||||
Depening on the environment, we will have to load from locally or s3 (mock/real)
|
||||
"""
|
||||
|
||||
if s3fs_client.client is None:
|
||||
if client.client is None:
|
||||
logger.info("Using local development - no need for s3 load")
|
||||
return self.load_local_registry(
|
||||
registry_path=registry_path, metrics=metrics
|
||||
)
|
||||
|
||||
s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path)
|
||||
|
||||
logger.info(f"Check if registry exists")
|
||||
if s3fs_client.client.exists(s3_location):
|
||||
registry_df = pd.read_csv(
|
||||
s3fs_client.client.open(s3_location), index_col=None
|
||||
|
||||
check_exists = client.client.list_objects_v2(
|
||||
Bucket=client.model_bucket, Prefix=str(registry_path)
|
||||
)
|
||||
|
||||
if "Contents" in check_exists:
|
||||
logger.info("Loading existing registry")
|
||||
registry_df = read_csv_from_s3(
|
||||
client=client.client,
|
||||
bucket_name=client.model_bucket,
|
||||
file_key=str(registry_path),
|
||||
index_col=None,
|
||||
)
|
||||
else:
|
||||
logger.info("No registry found - creating new one")
|
||||
|
|
@ -70,14 +97,18 @@ class RegistryHandler:
|
|||
|
||||
return registry_df
|
||||
|
||||
def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
|
||||
def save_registry(self, output_filepath: Path, client: BotoClient) -> None:
|
||||
"""
|
||||
Providing a path, this function will save the model to be used.
|
||||
"""
|
||||
if s3fs_client.client is None:
|
||||
if client.client is None:
|
||||
logger.info("In local development mode - no need for s3 client")
|
||||
else:
|
||||
logger.info(f"Saving registry into s3")
|
||||
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
|
||||
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
|
||||
s3_location = client.model_bucket + "/" + str(output_filepath)
|
||||
|
||||
client.client.upload_file(
|
||||
str(output_filepath), client.model_bucket, str(output_filepath)
|
||||
)
|
||||
|
||||
logger.info("Save complete")
|
||||
|
|
|
|||
|
|
@ -18,19 +18,20 @@ services:
|
|||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
# simulation_system_training:
|
||||
# build:
|
||||
# context: ./
|
||||
# dockerfile: ./Dockerfiles/Dockerfile.training
|
||||
# image: simulation_system_training
|
||||
# environment:
|
||||
# ENDPOINT_URL: http://minio:9000/
|
||||
# AWS_ACCESS_KEY_ID: *MINIO_USER
|
||||
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
|
||||
# tty: true
|
||||
# depends_on:
|
||||
# minio:
|
||||
# condition: service_healthy
|
||||
simulation_system_training:
|
||||
build:
|
||||
context: ./
|
||||
dockerfile: ./Dockerfiles/Dockerfile.training
|
||||
image: simulation_system_training
|
||||
environment:
|
||||
RUNTIME_ENVIRONMENT: local-mock
|
||||
ENDPOINT_URL: http://minio:9000/
|
||||
AWS_ACCESS_KEY_ID: *MINIO_USER
|
||||
AWS_SECRET_ACCESS_KEY: *MINIO_PASS
|
||||
tty: true
|
||||
depends_on:
|
||||
minio:
|
||||
condition: service_healthy
|
||||
# command:
|
||||
# ["bash"]
|
||||
|
||||
|
|
@ -40,6 +41,7 @@ services:
|
|||
# dockerfile: ./Dockerfiles/Dockerfile.prediction
|
||||
# image: simulation_system_prediction
|
||||
# environment:
|
||||
# RUNTIME_ENVIRONMENT: local-mock
|
||||
# ENDPOINT_URL: http://minio:9000/
|
||||
# AWS_ACCESS_KEY_ID: *MINIO_USER
|
||||
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
|
||||
|
|
@ -47,7 +49,7 @@ services:
|
|||
# depends_on:
|
||||
# simulation_system_training:
|
||||
# condition: service_completed_successfully
|
||||
# command:
|
||||
# command:
|
||||
# ["bash"]
|
||||
|
||||
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -5,13 +5,16 @@ Script to load MLModel class and generate predictions
|
|||
import os
|
||||
import json
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import pandas as pd
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
from MLModel.Models import AutogluonModel
|
||||
from core.Logger import logger
|
||||
from core.DataLoader import dataloader_factory
|
||||
from core.CloudClient import S3FSClient
|
||||
from core.CloudClient import BotoClient
|
||||
from core.Metrics import Metrics
|
||||
from core.RegistryHandler import RegistryHandler
|
||||
from core.Settings import (
|
||||
BASE_REGISTRY_PATH,
|
||||
REGISTRY_FILE,
|
||||
|
|
@ -19,13 +22,13 @@ from core.Settings import (
|
|||
PREDICTION_FILE,
|
||||
METADATA_FILE,
|
||||
TIMESTAMP_FORMAT,
|
||||
MODEL_DIRECTORY,
|
||||
)
|
||||
|
||||
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
|
||||
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
|
||||
|
||||
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
|
||||
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
|
||||
# FOR TESTING
|
||||
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
|
||||
|
|
@ -36,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
|
|||
# For testing in dev s3
|
||||
# Data path can be passed as so:
|
||||
# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
|
||||
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
|
||||
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"
|
||||
|
||||
|
||||
def ingest_arguments() -> argparse.Namespace:
|
||||
|
|
@ -83,7 +86,7 @@ def prediction(
|
|||
if registry_path is None or not registry_path.exists():
|
||||
logger.error("No registry path provided or registry doesn't exist")
|
||||
exit(1)
|
||||
elif RUNTIME_ENVIRONMENT == "dev":
|
||||
elif RUNTIME_ENVIRONMENT in ["local-mock", "dev"]:
|
||||
registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv"
|
||||
else:
|
||||
raise NotImplemented("TO be implemented")
|
||||
|
|
@ -96,7 +99,17 @@ def prediction(
|
|||
else:
|
||||
# TODO: Think about where registry will sit/ type
|
||||
logger.info("Loading best model from registry")
|
||||
registry_df = pd.read_csv(registry_path)
|
||||
|
||||
metrics = Metrics()
|
||||
registry_handler = RegistryHandler()
|
||||
|
||||
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
|
||||
|
||||
registry_df = registry_handler.load_registry(
|
||||
registry_path=registry_path, client=CLIENT, metrics=metrics
|
||||
)
|
||||
|
||||
# registry_df = pd.read_csv(registry_path)
|
||||
best_model_df = registry_df[registry_df["best_model"]]
|
||||
|
||||
model_location = best_model_df["model_location"].values[0]
|
||||
|
|
@ -115,7 +128,7 @@ def prediction(
|
|||
if data_path and data is None:
|
||||
logger.info("Loading data from provided path")
|
||||
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
data = dataloader.load(filepath=data_path, index_col="id")
|
||||
data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None)
|
||||
|
||||
if data is None:
|
||||
raise ValueError("No data loaded")
|
||||
|
|
@ -137,14 +150,14 @@ def prediction(
|
|||
logger.error("No other model currently")
|
||||
exit(1)
|
||||
|
||||
model.load_model(
|
||||
filepath=model_location, s3_client=CLIENT, model_folder="local_model"
|
||||
)
|
||||
model.load_model(filepath=model_location, client=CLIENT, model_folder="local_model")
|
||||
|
||||
logger.info("--- Generating Predictions ---")
|
||||
prediction = model.generate_predictions(data=data)
|
||||
|
||||
return pd.concat([pd.Series(data.index, name='id'), prediction], axis=1)
|
||||
# logger.info(pd.concat([data["id"], prediction], axis=1))
|
||||
|
||||
return pd.concat([data["id"], prediction], axis=1)
|
||||
|
||||
# Save prediction some where?
|
||||
# prediction.to_csv("s3?")
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
boto3
|
||||
autogluon==0.8.2
|
||||
pandas==1.5.3
|
||||
s3fs==2023.6.0
|
||||
seaborn==0.12.2
|
||||
matplotlib==3.7.2
|
||||
pre-commit==3.3.3
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
boto3
|
||||
autogluon==0.8.2
|
||||
pandas==1.5.3
|
||||
s3fs
|
||||
seaborn==0.12.2
|
||||
matplotlib==3.7.2
|
||||
matplotlib==3.7.2
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
autogluon==0.8.2
|
||||
pandas==1.5.3
|
||||
seaborn==0.12.2
|
||||
s3fs==2023.6.0
|
||||
pre-commit==3.3.3
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
autogluon==0.8.2
|
||||
pandas==1.5.3
|
||||
seaborn==0.12.2
|
||||
s3fs==2023.6.0
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from core.Logger import logger
|
|||
from core.Metrics import Metrics, sort_by_metric
|
||||
from core.DataLoader import dataloader_factory
|
||||
from core.FeatureProcessor import FeatureProcessor
|
||||
from core.CloudClient import S3FSClient
|
||||
from core.CloudClient import BotoClient
|
||||
from core.RegistryHandler import RegistryHandler
|
||||
from core.Settings import (
|
||||
MODEL_DIRECTORY,
|
||||
|
|
@ -30,7 +30,8 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
|
|||
|
||||
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
|
||||
|
||||
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
# CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
|
||||
|
||||
# FOR TESTING
|
||||
|
|
@ -105,8 +106,8 @@ def training(
|
|||
|
||||
logger.info("--- Loading data ---")
|
||||
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
|
||||
train_df = dataloader.load(filepath=train_filepath)
|
||||
test_df = dataloader.load(filepath=test_filepath)
|
||||
train_df = dataloader.load(client=CLIENT, filepath=train_filepath)
|
||||
test_df = dataloader.load(client=CLIENT, filepath=test_filepath)
|
||||
|
||||
if train_df is None or test_df is None:
|
||||
raise ValueError("No data Loaded - cancelling pipeline")
|
||||
|
|
@ -153,7 +154,7 @@ def training(
|
|||
)
|
||||
|
||||
logger.info("--- Save Model ---")
|
||||
model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT)
|
||||
model.save_model(output_filepath=model.output_filepath, client=CLIENT)
|
||||
|
||||
logger.info("--- Generate evaluation metrics ---")
|
||||
metrics = Metrics()
|
||||
|
|
@ -166,8 +167,6 @@ def training(
|
|||
metrics=metrics,
|
||||
)
|
||||
|
||||
metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT)
|
||||
|
||||
logger.info("--- Generate metric outputs using predictions ---")
|
||||
# metrics.generate_plot_suite()
|
||||
|
||||
|
|
@ -179,7 +178,7 @@ def training(
|
|||
output_filepath=plot_output_path,
|
||||
)
|
||||
|
||||
metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT)
|
||||
metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT)
|
||||
|
||||
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
|
||||
# If we want residual plot/ any plots, we will need to self host
|
||||
|
|
@ -196,7 +195,7 @@ def training(
|
|||
f"Optimised version of best model can be found at: {deployment_model_path}"
|
||||
)
|
||||
|
||||
model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT)
|
||||
model.save_model(output_filepath=deployment_model_path, client=CLIENT)
|
||||
|
||||
# TODO: Need a model registry - for now have this as a CSV
|
||||
# Save this in the model directory
|
||||
|
|
@ -208,7 +207,7 @@ def training(
|
|||
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
|
||||
|
||||
registry_df = registry_handler.load_registry(
|
||||
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics
|
||||
registry_path=registry_path, client=CLIENT, metrics=metrics
|
||||
)
|
||||
|
||||
model_details_df = pd.DataFrame(
|
||||
|
|
@ -235,7 +234,7 @@ def training(
|
|||
registry_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
registry_df.to_csv(registry_path, index=False)
|
||||
|
||||
registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT)
|
||||
registry_handler.save_registry(output_filepath=registry_path, client=CLIENT)
|
||||
|
||||
logger.info("--- Clean up ---")
|
||||
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue