Model/model_data/simulation_system/predictions.py
2023-09-01 15:43:13 +01:00

185 lines
6 KiB
Python

"""
Script to load MLModel class and generate predictions
"""
import os
import json
import argparse
import pandas as pd
from typing import Optional
from datetime import datetime
from MLModel.Models import AutogluonModel
from core.Logger import logger
from core.DataLoader import dataloader_factory
from core.CloudClient import S3FSClient
from core.Settings import (
BASE_REGISTRY_PATH,
REGISTRY_FILE,
PREDICTION_LOCATION,
PREDICTION_FILE,
METADATA_FILE,
TIMESTAMP_FORMAT,
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
# DataFrame)
# TEST_DATA = DataLoader.load(filepath="../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet")
# DATA = TEST_DATA.sample(1)
# For testing in dev s3
# Data path can be passed as so:
# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--target-column",
type=str,
help="The response variable you are predicting for",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
parser.add_argument(
"--model-path",
type=str,
help="If you wish to use a specific model, specify the model path here",
)
parser.add_argument("--data", type=str, help="Json data for predictions")
parser.add_argument(
"--data-path", type=str, help="Location of Parquet dataset to load for training"
)
args = parser.parse_args()
return args
def prediction(
target_column: str = "RDSAP_CHANGE",
model_path: str | None = None,
data: Optional[pd.DataFrame | str] = None,
data_path: Optional[str] = None,
):
"""
Main pipeline function
"""
if RUNTIME_ENVIRONMENT == "local":
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist")
exit(1)
elif RUNTIME_ENVIRONMENT == "dev":
registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv"
else:
raise NotImplemented("TO be implemented")
if model_path is not None:
logger.info("User specified a model to load - ignoring registry")
model_location = model_path
model_type = model_path
model_name = model_path
else:
# TODO: Think about where registry will sit/ type
logger.info("Loading best model from registry")
registry_df = pd.read_csv(registry_path)
best_model_df = registry_df[registry_df["best_model"]]
model_location = best_model_df["model_location"].values[0]
model_type = best_model_df["model_type"].values[0]
model_name = best_model_df["model_name"].values[0]
logger.info("--- Model Info: ---")
logger.info(f"Model type: {model_type}")
logger.info(f"Model name: {model_name}")
logger.info(f"Model location: {model_location}")
logger.info("--- Loading Data ---")
if data is None and data_path is None:
logger.error("No Data/Data Path passed")
exit(1)
if data_path and data is None:
logger.info("Loading data from provided path")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(filepath=data_path, index_col="id")
if data is None:
raise ValueError("No data loaded")
# # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
# data = data.sample(1)
else:
logger.info("Using data provided")
data = json.loads(str(data))
data = pd.DataFrame([data])
print(data)
logger.info("--- Loading Model ---")
if model_type == "autogluon":
logger.info("Using an Autogluon model")
model = AutogluonModel()
else:
logger.error("No other model currently")
exit(1)
model.load_model(
filepath=model_location, s3_client=CLIENT, model_folder="local_model"
)
logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data)
return pd.concat([data["id"], prediction], axis=1)
# Save prediction some where?
# prediction.to_csv("s3?")
# TODO: Check how we want to structure outputs
# For now, just categorise by uprn and timestamp
# Assume one uprn coming in for now
# uprn = data.index.values[0]
# # Saving prediction local for now
# # TODO: change uprn to TARGET_ID, put in setting
# logger.info("--- Outputting prediction and metadata --- ")
# output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP
# output_base.mkdir(parents=True, exist_ok=True)
# json_prediction = prediction.to_json(output_base / PREDICTION_FILE)
# prediction_metadata = {
# "model_type": model_type,
# "model_name": model_name,
# "model_location": model_location,
# "model_settings": model.model_metadata(),
# }
# pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE)
return json_prediction
if __name__ == "__main__":
args = ingest_arguments()
# Data can be passed in as JSON string: python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'
# Data path can be passed as so: python3 predictions.py --data-path
# ./model_build_data/change_data/rdsap_full/test_data.parquet
prediction(
target_column=args.target_column,
model_path=args.model_path,
data=args.data,
data_path=args.data_path,
)