From b1decfe25302fd2a8306c07530085d169a1103ec Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Wed, 30 Aug 2023 16:36:36 +0100 Subject: [PATCH] add metric class and regneration script --- .../simulation_system/MLModel/Models.py | 15 +++ .../simulation_system/core/DataLoader.py | 45 ++++++- model_data/simulation_system/core/Metrics.py | 66 ++++++++++ model_data/simulation_system/core/Settings.py | 3 + .../simulation_system/docker-compose.yml | 43 ++++++- model_data/simulation_system/predictions.py | 10 +- .../simulation_system/regenerate_metrics.py | 115 ++++++++++++++++++ model_data/simulation_system/training.py | 23 +++- 8 files changed, 311 insertions(+), 9 deletions(-) create mode 100644 model_data/simulation_system/core/Metrics.py create mode 100644 model_data/simulation_system/regenerate_metrics.py diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index fcb25654..25668bc2 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -13,6 +13,7 @@ import pandas as pd from autogluon.tabular import TabularDataset, TabularPredictor from sklearn.metrics import mean_absolute_percentage_error from core.Logger import logger +from MLModel.BaseMLModel import MLModel AUTOGLUON_HYPERPARAMETERS = [ "problem_type", @@ -24,6 +25,20 @@ AUTOGLUON_HYPERPARAMETERS = [ METRIC_FILENAME = "metrics.csv" +def select_model(model_type: str) -> MLModel: + """ + Helper function to select the model to use + """ + + if model_type == "autogluon": + model = AutogluonModel() + else: + logger.error("No other model currently implemented") + exit(1) + + return model + + class AutogluonModel: """ Autogluon model that implements the MLModel Protocol diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index dcd7af16..17fd36b9 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -3,7 +3,6 @@ import os class DataLoader: - @staticmethod def load(filepath: str, index_col: str = None) -> pd.DataFrame: """ @@ -13,13 +12,53 @@ class DataLoader: if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") - if filepath.endswith('.parquet'): + if filepath.endswith(".parquet"): df = pd.read_parquet(filepath) if index_col is not None: df = df.set_index(index_col) - elif filepath.endswith('.csv'): + elif filepath.endswith(".csv"): df = pd.read_csv(filepath, index_col=index_col) else: raise ValueError(f"File format not supported for file: {filepath}") return df + + @staticmethod + def s3_load(filepath: str, index_col: str = None) -> pd.DataFrame: + """ + Load different datasets from s3 + """ + + STORAGE_OPTIONS = { + "key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"), + "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), + "client_kwargs": { + # "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") + "endpoint_url": os.environ.get("ENDPOINT_URL") + }, + } + + if filepath.endswith(".parquet"): + df = pd.read_parquet(filepath, storage_options=STORAGE_OPTIONS) + if index_col is not None: + df = df.set_index(index_col) + elif filepath.endswith(".csv"): + df = pd.read_csv( + filepath, index_col=index_col, storage_options=STORAGE_OPTIONS + ) + else: + raise ValueError(f"File format not supported for file: {filepath}") + + return df + + def process(self, filepath: str, index_col: str = None) -> pd.DataFrame: + """ + Based off the filepath, we choose a loader style + """ + + if filepath.startswith("s3://"): + df = self.s3_load(filepath=filepath, index_col=index_col) + else: + df = self.load(filepath=filepath, index_col=index_col) + + return df diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py new file mode 100644 index 00000000..15fb93f9 --- /dev/null +++ b/model_data/simulation_system/core/Metrics.py @@ -0,0 +1,66 @@ +""" +Generate metrics and enable regeneration of metrics if new metrics are generated +Key tasks: +- Specify metric functions that take in prediction vs actual to generate a metric value +- Given a model and test data, produce a suite of all metrics +""" + +import pandas as pd +from core.Settings import OPTIMISE_METRIC +from MLModel.BaseMLModel import MLModel + + +def sort_by_metric( + data: pd.DataFrame, optimse_metric: str, best_model_column_name: str +) -> pd.DataFrame: + """ + Helper function to sort data frame by metric and append a best model flag + """ + data = data.sort_values(optimse_metric, ascending=False).reset_index(drop=True) + data[best_model_column_name] = [False] * len(data) + data.loc[0, best_model_column_name] = True + + return data + + +class Metrics: + """ + All metric functions used to generate a dictionary of metrics + """ + + @staticmethod + def metric_1(predictions: pd.Series, actuals: pd.Series) -> float: + """ + Can leverage ML packages like sklearn for individual metrics like MAPE etc + """ + pass + + @staticmethod + def metric_2(predictions: pd.Series, actuals: pd.Series) -> float: + """ + Can leverage ML packages like sklearn for individual metrics like MAPE etc + """ + pass + + def list_metric_functions(self) -> list: + """ + Gather all metric functions to run + """ + pass + + def generate_metric_suite( + self, model: MLModel, data: pd.DataFrame, target_column: str + ) -> pd.Series: + """ + For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values + """ + predictions = model.generate_predictions(data=data) + actuals = data[target_column] + + metric_dict = {} + for key, metric_function in asd: # TODO: + metric_dict[key] = metric_function(predictions, actuals) + + metrics = pd.Series([metric_dict]) + + return metrics diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index 3b9c8abf..5c1c37a2 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -2,6 +2,9 @@ # TODO: migrate to dynaconf from pathlib import Path +OPTIMISE_METRIC = "mean_absolute_error" +BEST_MODEL_COLUMN_NAME = "best_model" + # TODO: remove these setting elsewhere for CML RESIDUAL_TRUE_LABEL = "true" RESIDUAL_PREDICTION_LABEL = "pred" diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index 55f181bc..f79b6c48 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -2,7 +2,7 @@ version: '3' services: minio: - image: minio/minio + image: minio/minio:RELEASE.2022-05-26T05-48-41Z ports: - "9000:9000" - "9001:9001" @@ -12,6 +12,45 @@ services: MINIO_ROOT_USER: &MINIO_USER admin MINIO_ROOT_PASSWORD: &MINIO_PASS password command: server --console-address ":9001" /data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + # simulation_system_training: + # build: + # context: ./ + # dockerfile: ./Dockerfiles/Dockerfile.training + # image: simulation_system_training + # environment: + # ENDPOINT_URL: http://minio:9000/ + # AWS_ACCESS_KEY_ID: *MINIO_USER + # AWS_SECRET_ACCESS_KEY: *MINIO_PASS + # tty: true + # depends_on: + # minio: + # condition: service_healthy + # command: + # ["bash"] + + # simulation_system_prediction: + # build: + # context: ./ + # dockerfile: ./Dockerfiles/Dockerfile.prediction + # image: simulation_system_prediction + # environment: + # ENDPOINT_URL: http://minio:9000/ + # AWS_ACCESS_KEY_ID: *MINIO_USER + # AWS_SECRET_ACCESS_KEY: *MINIO_PASS + # tty: true + # depends_on: + # simulation_system_training: + # condition: service_completed_successfully + # command: + # ["bash"] + + # volumes: -# minio_storage: {} \ No newline at end of file +# minio_storage: {} diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index bc85b74a..680193be 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -109,7 +109,13 @@ def prediction( print(data) logger.info("--- Loading Model ---") - model = AutogluonModel() + + if model_type == "autogluon": + logger.info("Using an Autogluon model") + model = AutogluonModel() + else: + logger.error("No other model currently") + exit(1) model.load_model(filepath=model_location) @@ -147,7 +153,7 @@ if __name__ == "__main__": args = ingest_arguments() # Data can be passed in as JSON string: python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}' - # Data path can be passed as so: python3 predictions.py --data-path ../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet + # Data path can be passed as so: python3 predictions.py --data-path ./model_build_data/change_data/rdsap_full/test_data.parquet prediction( target_column=args.target_column, model_path=args.model_path, diff --git a/model_data/simulation_system/regenerate_metrics.py b/model_data/simulation_system/regenerate_metrics.py new file mode 100644 index 00000000..1386f7d9 --- /dev/null +++ b/model_data/simulation_system/regenerate_metrics.py @@ -0,0 +1,115 @@ +""" +Script to regenerate metrics for all the models in the model registry +Key task: +- Load model registry +- For each model in the registry, generate the metrics (Key questions here is what if the test data changes) +- Save the new metrics out to s3 bucket +""" + +import argparse +from s3pathlib import S3Path +import pandas as pd +from tqdm import tqdm +from core.Logger import logger +from core.Metrics import Metrics, sort_by_metric +from core.DataLoader import DataLoader +from core.Settings import ( + OPTIMISE_METRIC, + MODEL_DIRECTORY, + REGISTRY_FILE, + BEST_MODEL_COLUMN_NAME, +) +from MLModel.Models import AutogluonModel, select_model + + +def ingest_arguments() -> argparse.Namespace: + """ + Helper function to take in arguments from script start + """ + + parser = argparse.ArgumentParser(description="Inputs for training script") + + parser.add_argument( + "--test-filepath", + type=str, + help="Location of Parquet dataset to load for testing", + required=True, + ) + parser.add_argument( + "--target-column", + type=str, + help="The response variable", + choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], + default="RDSAP_CHANGE", + ) + + args = parser.parse_args() + + return args + + +def regenerate_metrics(test_filepath: str, target_column: str) -> None: + """ + Recreate all metrics for all models + """ + + logger.info("--- Loading test data ---") + dataloader = DataLoader() + test_df = dataloader.load(filepath=test_filepath) + + logger.info("--- Loading model registry ---") + logger.info(f"Loading registry for {target_column} models") + registry_df = dataloader.load( + filepath=S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri + ) + + logger.info("Extract non-metric columns") + registry_df = registry_df[["model_type", "model_name", "model_location"]] + + logger.info("--- Regenerating metrics ---") + + metric_suite = Metrics() + + metrics_df = pd.DataFrame(columns=metric_suite.list_metric_functions()) + + for _, row in tqdm(registry_df.iterrows()): + + logger.info(f"--- Loading Model ({row['model_name']}) ---") + + model = select_model(model_type=row["model_type"]) + + model.load_model(filepath=row["model_location"]) + + metrics = metric_suite.generate_metric_suite( + model=model, data=test_df, target_column=target_column + ) + + # Add metrics row by row + metrics_df = pd.concat([metrics_df, metrics], axis=0).reset_index(drop=True) + + # Add metrics df to registry df side by side + registry_df = pd.concat([registry_df, metrics_df], axis=1) + + logger.info(f"--- Sorting by Optimise Metric ({OPTIMISE_METRIC}) ---") + + registry_df = sort_by_metric( + data=registry_df, + optimse_metric=OPTIMISE_METRIC, + best_model_column_name=BEST_MODEL_COLUMN_NAME, + ) + + logger.info("--- Saving model metrics ---") + + registry_df.to_csv(S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri) + + +if __name__ == "__main__": + + logger.info("---Begin Pipeline to regenerate metrics---") + + logger.info("---Ingest Arguments---") + args = ingest_arguments() + + regenerate_metrics( + test_filepath=args.test_filepath, target_column=args.target_column + ) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index c2ed5c21..0b4cd5b3 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -1,4 +1,6 @@ import argparse +import os +from s3pathlib import S3Path # import boto3 from pathlib import Path @@ -35,6 +37,14 @@ from core.Settings import ( TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) +# STORAGE_OPTIONS = { +# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), +# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), +# "client_kwargs": { +# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") +# } +# } + # FOR TESTING # train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet" # test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet" @@ -55,6 +65,11 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) # S3_CLIENT.create_bucket # S3_CLIENT.list_buckets() +# df = pd.read_parquet( +# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet", +# storage_options=STORAGE_OPTIONS +# ) + def ingest_arguments() -> argparse.Namespace: """ @@ -108,8 +123,8 @@ def training( logger.info("--- Loading data ---") dataloader = DataLoader() - train_df = dataloader.load(filepath=train_filepath) - test_df = dataloader.load(filepath=test_filepath) + train_df = dataloader.process(filepath=train_filepath) + test_df = dataloader.process(filepath=test_filepath) logger.info("--- Feature processing ---") @@ -205,8 +220,12 @@ def training( # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory + # Loading registry from s3 logger.info("--- Append registry with new model ---") + registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE + # registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri + # registry = RegistryHandler(location=registry_path) if registry_path.exists(): logger.info("Registry file found - Loading into Dataframe")