ML/modules/ml-pipeline/src/pipeline/core/MLModels.py
2023-10-22 21:05:07 +00:00

247 lines
6.8 KiB
Python

""""
Implementations of MLModels, all of which will have four methods to:
- Load model
- Save Model
- Train Model
- Geenrate predictions
"""
import os
import joblib
import pandas as pd
from pathlib import Path
from typing import Union, List
from sklearn import linear_model
from sklearn.svm import SVR
from autogluon.tabular import TabularDataset, TabularPredictor
from core.interface.InterfaceModels import MLModel
from core.Logger import logger
def model_factory(model_type: str) -> MLModel:
"""
Determine which model to use from the model type
"""
models = {
"SKLearnLinearRegression": SKLearnLinearRegression(),
"SKLearnSVMRegression": SKLearnSVMRegression(),
"AutogluonAutoML": AutogluonAutoML()
# ADD OTHER MODELS HERE
}
if model_type not in models:
raise ValueError("Model type specified is not in factory")
return models[model_type]
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class SKLearnLinearRegression:
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
string_path = str(path)
self.model = joblib.load(string_path)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
if not path.parent.exists():
os.mkdir(path.parent)
string_path = str(path)
joblib.dump(self.model, string_path)
return string_path
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
self.model = linear_model.LinearRegression()
x_train = data.iloc[:, data.columns != target]
y_train = data[target]
self.model.fit(x_train, y_train)
def predict(
self, data: pd.DataFrame, post_prediction_logic: dict | None = None
) -> pd.Series:
"""
Method to predict
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions
class SKLearnSVMRegression:
MODEL_HYPERPARAMETERS = ["kernel"]
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
string_path = str(path)
self.model = joblib.load(string_path)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
if not path.parent.exists():
os.mkdir(path.parent)
string_path = str(path)
joblib.dump(self.model, string_path)
return string_path
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
validate_dict_keys(
list(model_hyperparameters.keys()),
self.MODEL_HYPERPARAMETERS,
config_type="Train_model_config",
)
self.model = SVR(kernel=model_hyperparameters["kernel"])
x_train = data.iloc[:, data.columns != target]
y_train = data[target]
self.model.fit(x_train, y_train)
def predict(
self, data: pd.DataFrame, post_prediction_logic: dict | None = None
) -> pd.Series:
"""
Method to predict
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions
class AutogluonAutoML:
ACCEPTED_MODEL_HYPERPAREMETERS = [
"output_filepath",
"problem_type",
"eval_metric",
"time_limit",
"presets",
"excluded_model_types",
"infer_limit",
"infer_limit_batch_size",
]
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
filepath = str(path)
self.model = TabularPredictor.load(path=filepath)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
logger.info(
"Using AutoGluon Model - Model saving is using optimised deployment mode"
)
logger.info("Saving optimised model")
self.model.clone_for_deployment(str(path))
return str(path)
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
validate_dict_keys(
keys_1=list(model_hyperparameters.keys()),
keys_2=self.ACCEPTED_MODEL_HYPERPAREMETERS,
config_type="Model Hyperparameters",
)
if model_hyperparameters["output_filepath"] is None:
logger.error("Please specify a output_filepath in order to train a model")
exit(1)
AGdata = TabularDataset(data=data)
self.model = TabularPredictor(
label=target,
path=model_hyperparameters["output_filepath"],
problem_type=model_hyperparameters["problem_type"],
eval_metric=model_hyperparameters["eval_metric"],
).fit(
AGdata,
time_limit=model_hyperparameters["time_limit"],
presets=model_hyperparameters["presets"],
excluded_model_types=model_hyperparameters["excluded_model_types"],
infer_limit=model_hyperparameters["infer_limit"],
infer_limit_batch_size=model_hyperparameters["infer_limit_batch_size"],
)
def predict(
self, data: pd.DataFrame, post_prediction_logic: dict | None = None
) -> pd.Series:
"""
Method to predict
"""
if post_prediction_logic is None:
post_prediction_logic = {}
if self.model is None:
print("No model loaded/ trained")
exit(1)
predictions = pd.Series(self.model.predict(data))
if len(post_prediction_logic) != 0:
predictions = self._apply_post_prediction_logic(
data=data,
predictions=predictions,
post_prediction_logic=post_prediction_logic,
)
return predictions
def _apply_post_prediction_logic(
self, data: pd.DataFrame, predictions: pd.Series, post_prediction_logic: dict
):
"""
For predictions, we can apply post processing logic to clean up predictions
"""
for _, value in post_prediction_logic.items():
predictions = value(data, predictions)
return predictions