import argparse # import boto3 import os from pathlib import Path from datetime import datetime from typing import List from core.Logger import logger from core.DataLoader import DataLoader from core.FeatureProcessor import FeatureProcessor from MLModel.Models import AutogluonModel import pandas as pd from core.Settings import ( MODEL_DIRECTORY, BASE_REGISTRY_PATH, REGISTRY_FILE, MODEL_FOLDER, METRICS_FOLDER, DEPLOYMENT_FOLDER, SUBSAMPLE_FACTOR, MODEL_HYPERPARAMETERS ) TIMESTAMP = datetime.now().strftime(format="%Y-%m-%d_%H-%M-%S") # FOR TESTING # train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet" # test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet" # target_column = "RDSAP_CHANGE" # model_type = "autogluon" # hyperparameter = HYPERPARAMETERS # SUBSAMPLE_FACTOR = 200 # SESSION = boto3.Session() # S3_CLIENT = SESSION.client( # service_name="s3", # aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), # aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), # endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000") # ) # S3_CLIENT.create_bucket # S3_CLIENT.list_buckets() def ingest_arguments() -> argparse.Namespace: """ Helper function to take in arguments from script start """ parser = argparse.ArgumentParser(description='Inputs for training script') parser.add_argument('--train-filepath', type=str, help='Location of Parquet dataset to load for training', required=True) parser.add_argument('--test-filepath', type=str, help='Location of Parquet dataset to load for testing', required=True) parser.add_argument('--model-type', type=str, help='The type of model to train', choices=["autogluon"], default="autogluon") parser.add_argument('--target-column', type=str, help='The response variable', choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], default='RDSAP_CHANGE') args = parser.parse_args() return args def training( train_filepath: str, test_filepath: str, target_column: str = "RDSAP_CHANGE", model_type: str = "autogluon", hyperparameters: dict = None ) -> None: """ Pipeline to run training on the dataset """ logger.info('--- Loading data ---') dataloader = DataLoader() train_df = dataloader.load(filepath=train_filepath) test_df = dataloader.load(filepath=test_filepath) logger.info('--- Feature processing ---') feature_processor = FeatureProcessor() subsample_amount = round(len(train_df)/SUBSAMPLE_FACTOR) train_df = feature_processor.process(train_df, target_column=target_column, subsample_amount=subsample_amount) test_df = feature_processor.process(test_df, target_column=target_column) logger.info('--- Build Model ---') logger.info("--- Load Hyperparameters ---") if hyperparameters is None: logger.info("Use base hyperparameters in settings") hyperparameters = MODEL_HYPERPARAMETERS[model_type] logger.info(f'Hyperparameters are: {hyperparameters}') if model_type == "autogluon": model_root = f"{target_column}-{hyperparameters['presets']}-{hyperparameters['time_limit']}-{TIMESTAMP}".lower() output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root model = AutogluonModel( output_filepath = output_base / MODEL_FOLDER ) else: logger.error("No alternative model implemented yet") exit(1) model.train_model( data=train_df, target_column=target_column, hyperparameters=hyperparameters ) logger.info("--- Save Model ---") model.save_model(output_filepath=model.output_filepath) logger.info('--- Generate evaluation metrics ---') metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column, metrics_location = output_base / METRICS_FOLDER ) # TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment # Imagining for now that the model trained here is the best model amongst all models built logger.info("--- Optimising model for deployment ---") deployment_model_path = model.optimise_model_for_deployment(deployment_path= output_base / DEPLOYMENT_FOLDER) logger.info(f"Optimised version of best model can be found at: {deployment_model_path}") # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory logger.info("--- Append registry with new model ---") registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE if registry_path.exists(): logger.info("Registry file found - Loading into Dataframe") registry_df = pd.read_csv(registry_path, index_col=None) else: registry_df = pd.DataFrame(columns=['model_type', 'model_name', 'model_location', 'mean_absolute_error', 'root_mean_squared_error', 'mean_squared_error', 'r2', 'pearsonr', 'median_absolute_error', 'mape', 'best_model']) model_details_df = pd.DataFrame( [{ 'model_type': model_type, 'model_name': model_root, 'model_location': deployment_model_path }] ) registry_row = pd.concat([model_details_df, metrics_df], axis=1) registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True) # TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and regenerate new metrics # TODO: decide metric to optimise to registry_df = registry_df.sort_values("mean_absolute_error", ascending=False).reset_index(drop=True) registry_df['best_model'] = [False]*len(registry_df) registry_df.loc[0, 'best_model'] = True logger.info("--- Saving new model to registry ---") registry_df.to_csv(registry_path, index=False) logger.info("--- Training Pipeline Complete --- ") if __name__ == "__main__": logger.info('---Begin Pipeline---') logger.info('---Ingest Arguments---') args = ingest_arguments() # To run script: python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet # TODO: Ingest hyper parameters from somewhere - currently change at the top of script training( train_filepath=args.train_filepath, test_filepath=args.test_filepath, target_column=args.target_column, model_type=args.model_type )