add metric class and regneration script

This commit is contained in:
Michael Duong 2023-08-30 16:36:36 +01:00
parent d8c19d5382
commit b1decfe253
8 changed files with 311 additions and 9 deletions

View file

@ -13,6 +13,7 @@ import pandas as pd
from autogluon.tabular import TabularDataset, TabularPredictor
from sklearn.metrics import mean_absolute_percentage_error
from core.Logger import logger
from MLModel.BaseMLModel import MLModel
AUTOGLUON_HYPERPARAMETERS = [
"problem_type",
@ -24,6 +25,20 @@ AUTOGLUON_HYPERPARAMETERS = [
METRIC_FILENAME = "metrics.csv"
def select_model(model_type: str) -> MLModel:
"""
Helper function to select the model to use
"""
if model_type == "autogluon":
model = AutogluonModel()
else:
logger.error("No other model currently implemented")
exit(1)
return model
class AutogluonModel:
"""
Autogluon model that implements the MLModel Protocol

View file

@ -3,7 +3,6 @@ import os
class DataLoader:
@staticmethod
def load(filepath: str, index_col: str = None) -> pd.DataFrame:
"""
@ -13,13 +12,53 @@ class DataLoader:
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
if filepath.endswith('.parquet'):
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith('.csv'):
elif filepath.endswith(".csv"):
df = pd.read_csv(filepath, index_col=index_col)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
@staticmethod
def s3_load(filepath: str, index_col: str = None) -> pd.DataFrame:
"""
Load different datasets from s3
"""
STORAGE_OPTIONS = {
"key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
"client_kwargs": {
# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
"endpoint_url": os.environ.get("ENDPOINT_URL")
},
}
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=STORAGE_OPTIONS)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(
filepath, index_col=index_col, storage_options=STORAGE_OPTIONS
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
def process(self, filepath: str, index_col: str = None) -> pd.DataFrame:
"""
Based off the filepath, we choose a loader style
"""
if filepath.startswith("s3://"):
df = self.s3_load(filepath=filepath, index_col=index_col)
else:
df = self.load(filepath=filepath, index_col=index_col)
return df

View file

@ -0,0 +1,66 @@
"""
Generate metrics and enable regeneration of metrics if new metrics are generated
Key tasks:
- Specify metric functions that take in prediction vs actual to generate a metric value
- Given a model and test data, produce a suite of all metrics
"""
import pandas as pd
from core.Settings import OPTIMISE_METRIC
from MLModel.BaseMLModel import MLModel
def sort_by_metric(
data: pd.DataFrame, optimse_metric: str, best_model_column_name: str
) -> pd.DataFrame:
"""
Helper function to sort data frame by metric and append a best model flag
"""
data = data.sort_values(optimse_metric, ascending=False).reset_index(drop=True)
data[best_model_column_name] = [False] * len(data)
data.loc[0, best_model_column_name] = True
return data
class Metrics:
"""
All metric functions used to generate a dictionary of metrics
"""
@staticmethod
def metric_1(predictions: pd.Series, actuals: pd.Series) -> float:
"""
Can leverage ML packages like sklearn for individual metrics like MAPE etc
"""
pass
@staticmethod
def metric_2(predictions: pd.Series, actuals: pd.Series) -> float:
"""
Can leverage ML packages like sklearn for individual metrics like MAPE etc
"""
pass
def list_metric_functions(self) -> list:
"""
Gather all metric functions to run
"""
pass
def generate_metric_suite(
self, model: MLModel, data: pd.DataFrame, target_column: str
) -> pd.Series:
"""
For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values
"""
predictions = model.generate_predictions(data=data)
actuals = data[target_column]
metric_dict = {}
for key, metric_function in asd: # TODO:
metric_dict[key] = metric_function(predictions, actuals)
metrics = pd.Series([metric_dict])
return metrics

View file

@ -2,6 +2,9 @@
# TODO: migrate to dynaconf
from pathlib import Path
OPTIMISE_METRIC = "mean_absolute_error"
BEST_MODEL_COLUMN_NAME = "best_model"
# TODO: remove these setting elsewhere for CML
RESIDUAL_TRUE_LABEL = "true"
RESIDUAL_PREDICTION_LABEL = "pred"

View file

@ -2,7 +2,7 @@ version: '3'
services:
minio:
image: minio/minio
image: minio/minio:RELEASE.2022-05-26T05-48-41Z
ports:
- "9000:9000"
- "9001:9001"
@ -12,6 +12,45 @@ services:
MINIO_ROOT_USER: &MINIO_USER admin
MINIO_ROOT_PASSWORD: &MINIO_PASS password
command: server --console-address ":9001" /data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
# simulation_system_training:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.training
# image: simulation_system_training
# environment:
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# minio:
# condition: service_healthy
# command:
# ["bash"]
# simulation_system_prediction:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.prediction
# image: simulation_system_prediction
# environment:
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# simulation_system_training:
# condition: service_completed_successfully
# command:
# ["bash"]
# volumes:
# minio_storage: {}
# minio_storage: {}

View file

@ -109,7 +109,13 @@ def prediction(
print(data)
logger.info("--- Loading Model ---")
model = AutogluonModel()
if model_type == "autogluon":
logger.info("Using an Autogluon model")
model = AutogluonModel()
else:
logger.error("No other model currently")
exit(1)
model.load_model(filepath=model_location)
@ -147,7 +153,7 @@ if __name__ == "__main__":
args = ingest_arguments()
# Data can be passed in as JSON string: python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'
# Data path can be passed as so: python3 predictions.py --data-path ../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet
# Data path can be passed as so: python3 predictions.py --data-path ./model_build_data/change_data/rdsap_full/test_data.parquet
prediction(
target_column=args.target_column,
model_path=args.model_path,

View file

@ -0,0 +1,115 @@
"""
Script to regenerate metrics for all the models in the model registry
Key task:
- Load model registry
- For each model in the registry, generate the metrics (Key questions here is what if the test data changes)
- Save the new metrics out to s3 bucket
"""
import argparse
from s3pathlib import S3Path
import pandas as pd
from tqdm import tqdm
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import DataLoader
from core.Settings import (
OPTIMISE_METRIC,
MODEL_DIRECTORY,
REGISTRY_FILE,
BEST_MODEL_COLUMN_NAME,
)
from MLModel.Models import AutogluonModel, select_model
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--test-filepath",
type=str,
help="Location of Parquet dataset to load for testing",
required=True,
)
parser.add_argument(
"--target-column",
type=str,
help="The response variable",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
args = parser.parse_args()
return args
def regenerate_metrics(test_filepath: str, target_column: str) -> None:
"""
Recreate all metrics for all models
"""
logger.info("--- Loading test data ---")
dataloader = DataLoader()
test_df = dataloader.load(filepath=test_filepath)
logger.info("--- Loading model registry ---")
logger.info(f"Loading registry for {target_column} models")
registry_df = dataloader.load(
filepath=S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri
)
logger.info("Extract non-metric columns")
registry_df = registry_df[["model_type", "model_name", "model_location"]]
logger.info("--- Regenerating metrics ---")
metric_suite = Metrics()
metrics_df = pd.DataFrame(columns=metric_suite.list_metric_functions())
for _, row in tqdm(registry_df.iterrows()):
logger.info(f"--- Loading Model ({row['model_name']}) ---")
model = select_model(model_type=row["model_type"])
model.load_model(filepath=row["model_location"])
metrics = metric_suite.generate_metric_suite(
model=model, data=test_df, target_column=target_column
)
# Add metrics row by row
metrics_df = pd.concat([metrics_df, metrics], axis=0).reset_index(drop=True)
# Add metrics df to registry df side by side
registry_df = pd.concat([registry_df, metrics_df], axis=1)
logger.info(f"--- Sorting by Optimise Metric ({OPTIMISE_METRIC}) ---")
registry_df = sort_by_metric(
data=registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving model metrics ---")
registry_df.to_csv(S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri)
if __name__ == "__main__":
logger.info("---Begin Pipeline to regenerate metrics---")
logger.info("---Ingest Arguments---")
args = ingest_arguments()
regenerate_metrics(
test_filepath=args.test_filepath, target_column=args.target_column
)

View file

@ -1,4 +1,6 @@
import argparse
import os
from s3pathlib import S3Path
# import boto3
from pathlib import Path
@ -35,6 +37,14 @@ from core.Settings import (
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
# STORAGE_OPTIONS = {
# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'),
# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'),
# "client_kwargs": {
# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
# }
# }
# FOR TESTING
# train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet"
@ -55,6 +65,11 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
# S3_CLIENT.create_bucket
# S3_CLIENT.list_buckets()
# df = pd.read_parquet(
# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet",
# storage_options=STORAGE_OPTIONS
# )
def ingest_arguments() -> argparse.Namespace:
"""
@ -108,8 +123,8 @@ def training(
logger.info("--- Loading data ---")
dataloader = DataLoader()
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
train_df = dataloader.process(filepath=train_filepath)
test_df = dataloader.process(filepath=test_filepath)
logger.info("--- Feature processing ---")
@ -205,8 +220,12 @@ def training(
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
# Loading registry from s3
logger.info("--- Append registry with new model ---")
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
# registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri
# registry = RegistryHandler(location=registry_path)
if registry_path.exists():
logger.info("Registry file found - Loading into Dataframe")