Model/model_data/simulation_system/training.py
2023-08-18 11:09:21 +01:00

181 lines
No EOL
6.6 KiB
Python

import argparse
import boto3
import os
from pathlib import Path
from datetime import datetime
from typing import List
from core.Logger import logger
from core.DataLoader import DataLoader
from core.FeatureProcessor import FeatureProcessor
from MLModel.Models import AutogluonModel
import pandas as pd
from core.Settings import (
MODEL_DIRECTORY,
BASE_REGISTRY_PATH,
REGISTRY_FILE,
MODEL_FOLDER,
METRICS_FOLDER,
DEPLOYMENT_FOLDER,
SUBSAMPLE_FACTOR,
MODEL_HYPERPARAMETERS
)
TIMESTAMP = datetime.now().strftime(format="%Y-%m-%d_%H-%M-%S")
# FOR TESTING
# train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet"
# target_column = "RDSAP_CHANGE"
# model_type = "autogluon"
# hyperparameter = HYPERPARAMETERS
# SUBSAMPLE_FACTOR = 200
# SESSION = boto3.Session()
# S3_CLIENT = SESSION.client(
# service_name="s3",
# aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", 'admin'),
# aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'),
# endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000")
# )
# S3_CLIENT.create_bucket
# S3_CLIENT.list_buckets()
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser.add_argument('--train-filepath', type=str, help='Location of Parquet dataset to load for training', required=True)
parser.add_argument('--test-filepath', type=str, help='Location of Parquet dataset to load for testing', required=True)
parser.add_argument('--model-type', type=str, help='The type of model to train', choices=["autogluon"], default="autogluon")
parser.add_argument('--target-column', type=str, help='The response variable', choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], default='RDSAP_CHANGE')
args = parser.parse_args()
return args
def training(
train_filepath: str,
test_filepath: str,
target_column: str = "RDSAP_CHANGE",
model_type: str = "autogluon",
hyperparameters: dict = None
) -> None:
"""
Pipeline to run training on the dataset
"""
logger.info('--- Loading data ---')
dataloader = DataLoader()
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
logger.info('--- Feature processing ---')
feature_processor = FeatureProcessor()
subsample_amount = round(len(train_df)/SUBSAMPLE_FACTOR)
train_df = feature_processor.process(train_df, target_column=target_column, subsample_amount=subsample_amount)
test_df = feature_processor.process(test_df, target_column=target_column)
logger.info('--- Build Model ---')
logger.info("--- Load Hyperparameters ---")
if hyperparameters is None:
logger.info("Use base hyperparameters in settings")
hyperparameters = MODEL_HYPERPARAMETERS[model_type]
logger.info(f'Hyperparameters are: {hyperparameters}')
if model_type == "autogluon":
model_root = f"{target_column}-{hyperparameters['presets']}-{hyperparameters['time_limit']}-{TIMESTAMP}".lower()
output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root
model = AutogluonModel(
output_filepath = output_base / MODEL_FOLDER
)
else:
logger.error("No alternative model implemented yet")
exit(1)
model.train_model(
data=train_df,
target_column=target_column,
hyperparameters=hyperparameters
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath)
logger.info('--- Generate evaluation metrics ---')
metrics_df = model.model_evaluation(
validation_data=test_df,
target_column=target_column,
metrics_location = output_base / METRICS_FOLDER
)
# TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment
# Imagining for now that the model trained here is the best model amongst all models built
logger.info("--- Optimising model for deployment ---")
deployment_model_path = model.optimise_model_for_deployment(deployment_path= output_base / DEPLOYMENT_FOLDER)
logger.info("Optimised version of best model can be found at: {deployment_model_path}")
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
logger.info("--- Append registry with new model ---")
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if registry_path.exists():
logger.info("Registry file found - Loading into Dataframe")
registry_df = pd.read_csv(registry_path, index_col=None)
else:
registry_df = pd.DataFrame(columns=['model_type', 'model_name', 'model_location', 'mean_absolute_error', 'root_mean_squared_error', 'mean_squared_error', 'r2', 'pearsonr', 'median_absolute_error', 'mape', 'best_model'])
model_details_df = pd.DataFrame(
[{
'model_type': model_type,
'model_name': model_root,
'model_location': deployment_model_path
}]
)
registry_row = pd.concat([model_details_df, metrics_df], axis=1)
registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True)
# TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and regenerate new metrics
# TODO: decide metric to optimise to
registry_df = registry_df.sort_values("mean_absolute_error", ascending=False).reset_index(drop=True)
registry_df['best_model'] = [False]*len(registry_df)
registry_df.loc[0, 'best_model'] = True
logger.info("--- Saving new model to registry ---")
registry_df.to_csv(registry_path, index=False)
logger.info("--- Training Pipeline Complete --- ")
if __name__ == "__main__":
logger.info('---Begin Pipeline---')
logger.info('---Ingest Arguments---')
args = ingest_arguments()
# To run script: python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet
# TODO: Ingest hyper parameters from somewhere - currently change at the top of script
training(
train_filepath=args.train_filepath,
test_filepath=args.test_filepath,
target_column=args.target_column,
model_type=args.model_type
)