import argparse # import boto3 import os from pathlib import Path from datetime import datetime from typing import List from model_data.simulation_system.core.Logger import logger from model_data.simulation_system.core.DataLoader import DataLoader from model_data.simulation_system.core.FeatureProcessor import FeatureProcessor from model_data.simulation_system.MLModel.Models import AutogluonModel import pandas as pd from model_data.simulation_system.core.Settings import ( MODEL_DIRECTORY, BASE_REGISTRY_PATH, REGISTRY_FILE, MODEL_FOLDER, METRICS_FOLDER, DEPLOYMENT_FOLDER, SUBSAMPLE_FACTOR, MODEL_HYPERPARAMETERS ) import seaborn as sns import matplotlib.pyplot as plt TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # FOR TESTING # train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet" # test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet" # target_column = "RDSAP_CHANGE" # model_type = "autogluon" # hyperparameter = HYPERPARAMETERS # SUBSAMPLE_FACTOR = 200 # SESSION = boto3.Session() # S3_CLIENT = SESSION.client( # service_name="s3", # aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), # aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), # endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000") # ) # S3_CLIENT.create_bucket # S3_CLIENT.list_buckets() def ingest_arguments() -> argparse.Namespace: """ Helper function to take in arguments from script start """ parser = argparse.ArgumentParser(description='Inputs for training script') parser.add_argument('--train-filepath', type=str, help='Location of Parquet dataset to load for training', required=True) parser.add_argument('--test-filepath', type=str, help='Location of Parquet dataset to load for testing', required=True) parser.add_argument('--model-type', type=str, help='The type of model to train', choices=["autogluon"], default="autogluon") parser.add_argument('--target-column', type=str, help='The response variable', choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], default='RDSAP_CHANGE') args = parser.parse_args() return args def training( train_filepath: str, test_filepath: str, target_column: str = "RDSAP_CHANGE", model_type: str = "autogluon", hyperparameters: dict = None ) -> None: """ Pipeline to run training on the dataset """ logger.info('--- Loading data ---') dataloader = DataLoader() train_df = dataloader.load(filepath=train_filepath) test_df = dataloader.load(filepath=test_filepath) logger.info('--- Feature processing ---') feature_processor = FeatureProcessor() subsample_amount = round(len(train_df) / SUBSAMPLE_FACTOR) train_df = feature_processor.process(train_df, target_column=target_column, subsample_amount=subsample_amount) test_df = feature_processor.process(test_df, target_column=target_column) logger.info('--- Build Model ---') logger.info("--- Load Hyperparameters ---") if hyperparameters is None: logger.info("Use base hyperparameters in settings") hyperparameters = MODEL_HYPERPARAMETERS[model_type] logger.info(f'Hyperparameters are: {hyperparameters}') if model_type == "autogluon": model_root = f"{target_column}-{hyperparameters['presets']}-{hyperparameters['time_limit']}-{TIMESTAMP}".lower() output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root model = AutogluonModel( output_filepath=output_base / MODEL_FOLDER ) else: raise ValueError("No alternative model implemented yet") model.train_model( data=train_df, target_column=target_column, hyperparameters=hyperparameters ) logger.info("--- Save Model ---") model.save_model(output_filepath=model.output_filepath) logger.info('--- Generate evaluation metrics ---') metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column, metrics_location=output_base / METRICS_FOLDER ) logger.info("--- Generate metric outputs using predictions ---") # TODO: can have a model.metric_outputs method # FOr not just do it here residual_df = pd.DataFrame(list(zip(test_df[target_column], model.predictions)), columns=['true', 'pred']) # image formatting # TODO: move to settings file , AXIS_FONT, TITLE_FONT axis_fs = 18 # fontsize title_fs = 22 # fontsize sns.set(style="whitegrid") ax = sns.scatterplot(x="true", y="pred", data=residual_df) ax.set_aspect('equal') ax.set_xlabel(f'True {target_column}', fontsize=axis_fs) ax.set_ylabel(f'Predicted {target_column}', fontsize=axis_fs) # ylabel ax.set_title('Residuals', fontsize=title_fs) # Square aspect ratio ax.plot([-100, 100], [-100, 100], 'black', linewidth=1) plt.tight_layout() RESIDUAL_FILE = "residuals.png" plt.savefig(output_base / METRICS_FOLDER / RESIDUAL_FILE, dpi=120) # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host # plt.savefig(RESIDUAL_FILE, dpi=120) # TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment # Imagining for now that the model trained here is the best model amongst all models built logger.info("--- Optimising model for deployment ---") deployment_model_path = model.optimise_model_for_deployment(deployment_path=output_base / DEPLOYMENT_FOLDER) logger.info(f"Optimised version of best model can be found at: {deployment_model_path}") # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory logger.info("--- Append registry with new model ---") registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE if registry_path.exists(): logger.info("Registry file found - Loading into Dataframe") registry_df = pd.read_csv(registry_path, index_col=None) else: # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns registry_df = pd.DataFrame( columns=['model_type', 'model_name', 'model_location', 'mean_absolute_error', 'root_mean_squared_error', 'mean_squared_error', 'r2', 'pearsonr', 'median_absolute_error', 'mape', 'best_model']) model_details_df = pd.DataFrame( [{ 'model_type': model_type, 'model_name': model_root, 'model_location': deployment_model_path }] ) registry_row = pd.concat([model_details_df, metrics_df], axis=1) registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True) # TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and # regenerate new metrics # TODO: decide metric to optimise to registry_df = registry_df.sort_values("mean_absolute_error", ascending=False).reset_index(drop=True) registry_df['best_model'] = [False] * len(registry_df) registry_df.loc[0, 'best_model'] = True logger.info("--- Saving new model to registry ---") # Ensure the directory exists registry_path.parent.mkdir(parents=True, exist_ok=True) registry_df.to_csv(registry_path, index=False) logger.info("--- Training Pipeline Complete --- ") if __name__ == "__main__": logger.info('---Begin Pipeline---') logger.info('---Ingest Arguments---') args = ingest_arguments() # To run script: python3 training.py --train-filepath # ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath # ./model_build_data/change_data/rdsap_full/test_data.parquet # TODO: Ingest hyper parameters from somewhere - currently change at the top of script training( train_filepath=args.train_filepath, test_filepath=args.test_filepath, target_column=args.target_column, model_type=args.model_type )