Model/model_data/simulation_system/training.py
2023-09-01 15:42:04 +01:00

260 lines
8.3 KiB
Python

import argparse
import os
import shutil
from pathlib import Path
from datetime import datetime
import pandas as pd
from MLModel.Models import model_factory
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor
from core.CloudClient import S3FSClient
from core.RegistryHandler import RegistryHandler
from core.Settings import (
MODEL_DIRECTORY,
REGISTRY_FILE,
MODEL_FOLDER,
METRICS_FOLDER,
DEPLOYMENT_FOLDER,
SUBSAMPLE_FACTOR,
MODEL_HYPERPARAMETERS,
TIMESTAMP_FORMAT,
RESIDUAL_FILE,
BEST_MODEL_COLUMN_NAME,
OPTIMISE_METRIC,
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
# train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet"
# target_column = "RDSAP_CHANGE"
# model_type = "autogluon"
# hyperparameter = HYPERPARAMETERS
# SUBSAMPLE_FACTOR = 200
# Mock location
# train_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
# To run script in local mode:
# python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet
# To run script in local-mock mode:
# python3 training.py --train-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--train-filepath",
type=str,
help="Location of Parquet dataset to load for training",
required=True,
)
parser.add_argument(
"--test-filepath",
type=str,
help="Location of Parquet dataset to load for testing",
required=True,
)
parser.add_argument(
"--model-type",
type=str,
help="The type of model to train",
choices=["autogluon"],
default="autogluon",
)
parser.add_argument(
"--target-column",
type=str,
help="The response variable",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
args = parser.parse_args()
return args
def training(
train_filepath: str,
test_filepath: str,
target_column: str = "RDSAP_CHANGE",
model_type: str = "autogluon",
hyperparameters: dict | None = None,
) -> None:
"""
Pipeline to run training on the dataset
"""
logger.info("--- Loading data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline")
logger.info("--- Feature processing ---")
feature_processor = FeatureProcessor()
# This is for convenience for now
subsample_amount = round(len(train_df) / SUBSAMPLE_FACTOR)
train_df = feature_processor.process(
train_df, target_column=target_column, subsample_amount=subsample_amount
)
test_df = feature_processor.process(test_df, target_column=target_column)
logger.info("--- Build Model ---")
logger.info("--- Load Hyperparameters ---")
if hyperparameters is None:
logger.info("Use base hyperparameters in settings")
hyperparameters = MODEL_HYPERPARAMETERS[model_type]
logger.info(f"Hyperparameters are: {hyperparameters}")
logger.info(
"--- Loading model configuration (Model type and Naming convention) ---"
)
# We might want to have hyperparameters in the names to make models more recognisable
model_toolkit = model_factory(
model_type=model_type, hyperparameters=hyperparameters
)
model_root = (
f"{target_column}-{model_toolkit['naming_attributes']}-{TIMESTAMP}".lower()
)
output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root
# Will need to pass output path to model (for saving purposes)
model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER)
model.train_model(
data=train_df, target_column=target_column, hyperparameters=hyperparameters
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT)
logger.info("--- Generate evaluation metrics ---")
metrics = Metrics()
metric_output_path = output_base / METRICS_FOLDER
metrics_df = model.model_evaluation(
validation_data=test_df,
target_column=target_column,
metrics_location=metric_output_path,
metrics=metrics,
)
metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT)
logger.info("--- Generate metric outputs using predictions ---")
# metrics.generate_plot_suite()
plot_output_path = output_base / METRICS_FOLDER / RESIDUAL_FILE
metrics.generate_residual_plot(
actuals=test_df[target_column],
predictions=model.predictions,
target_column=target_column,
output_filepath=plot_output_path,
)
metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
# TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment
# Imagining for now that the model trained here is the best model amongst all models built
logger.info("--- Optimising model for deployment ---")
deployment_model_path = output_base / DEPLOYMENT_FOLDER
model.optimise_model_for_deployment(deployment_path=deployment_model_path)
logger.info(
f"Optimised version of best model can be found at: {deployment_model_path}"
)
model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT)
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
# Loading registry from s3
logger.info("--- Append registry with new model ---")
registry_handler = RegistryHandler()
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics
)
model_details_df = pd.DataFrame(
[
{
"model_type": model_type,
"model_name": model_root,
"model_location": deployment_model_path,
}
]
)
registry_row = pd.concat([model_details_df, metrics_df], axis=1)
registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True)
registry_df = sort_by_metric(
registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving new model to registry ---")
# Ensure the directory exists
registry_path.parent.mkdir(parents=True, exist_ok=True)
registry_df.to_csv(registry_path, index=False)
registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT)
logger.info("--- Clean up ---")
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():
logger.info("Removing local development files not in s3")
shutil.rmtree(Path(MODEL_DIRECTORY))
logger.info("--- Training Pipeline Complete --- ")
if __name__ == "__main__":
logger.info("---Begin Pipeline---")
logger.info("---Ingest Arguments---")
args = ingest_arguments()
training(
train_filepath=args.train_filepath,
test_filepath=args.test_filepath,
target_column=args.target_column,
model_type=args.model_type,
)