initial commit

This commit is contained in:
Michael Duong 2023-09-08 18:57:06 +01:00
commit fb81191ec8
26 changed files with 787 additions and 0 deletions

1
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1 @@
# Pre commit hooks

11
README.md Normal file
View file

@ -0,0 +1,11 @@
# ML Toolkit
Creating a ML-toolkit that can be reused:
- ML pipeline:
- A dummy pipeline that has data version control, experiment
tracking and a model registry
- ML monitoring:
- A bolt-on service that can implement model monitoring

View file

@ -0,0 +1,4 @@
# ML-Monitoring
Concept:
- To have a plug in a play model to do model monitoring

View file

@ -0,0 +1,5 @@
# Dockerfile that grabs the current dvc hashed model
FROM python:3.9-slim
RUN pip install -r experimentation/requirements/training.txt

View file

@ -0,0 +1,5 @@
# Inference
This folder contains the inference codebase to:
- Load a model
- Generate a prediction

View file

@ -0,0 +1,3 @@
"""
Pipeline that stitches all steps together
"""

View file

@ -0,0 +1,37 @@
# Training
This folder contains the code base for training experimentation.
To understand the pipeline, run `dvc dag`
There are 3 main steps:
- Preparing data
- This is loading data (locally or from s3)
- Splitting the data into train and validation
- Creating additional features (if needed)
- **Data is cached**
- This will be down to the dvc remote location
- Build model
- For the prepared data, we build a model using our configurations
- Model is saved (locally or s3)
- **Model is cached**
- This will be down to the dvc remote location
- Generate Metrics
- For the given model, we generate metrics on validation data/test data
- **Metrics are cached**
- This will be down to the dvc remote location
Workflow:
- Use `dvc metrics show` to view current metrics score
- Adjust parameters/ codebase
- When happy with changes, use `dvc exp run` to trigger an experiment
- Due to cache, only need stages are re-run
- Use `dvc metrics diff` to check the change in metrics
- Use `dvc exp show` to view all experiments
- NOTE: the last experiment will always be applied to the workspace!
- After running experiments, you can apply the the best model to workspace using `dvc exp apply [EXPERIMENT_NAME]`
- This experiment will have the corresponding .dvc files for the hashed model and data
- Use version control as normal
- git add, git commit etc

View file

@ -0,0 +1,104 @@
"""
Second Pipieline step:
Once we have the features, we build a model
"""
import os
import yaml
import pandas as pd
from typing import Union
from pathlib import Path
from core.Logger import logger
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
def build_model(
dataclient: DataClient,
model: MLModel,
target: str,
model_save_location: str,
model_hyperparameters: dict,
train_location: Union[str, None] = None,
test_location: Union[str, None] = None,
train_data: Union[pd.DataFrame, None] = None,
test_data: Union[pd.DataFrame, None] = None,
pipeline_mode: bool = False
):
logger.info("--------------------------------------")
logger.info("--- Loading Data for build process ---")
logger.info("--------------------------------------")
if train_data is None:
# TODO: replace this with the data client to load
if train_location is None:
raise ValueError(f"Need {train_location}")
train_data = pd.read_parquet(train_location)
if test_data is None:
# TODO: replace this with the data client to load
if test_location is None:
raise ValueError(f"Need {test_location}")
test_data = pd.read_parquet(test_location)
logger.info("----------------------")
logger.info("--- Training model ---")
logger.info("----------------------")
model.train_model(data=train_data, target=target, model_hyperparameters=model_hyperparameters)
logger.info("--------------------")
logger.info("--- Saving model ---")
logger.info("--------------------")
model.save_model(path=Path(model_save_location))
# TODO: replace this with the data client to load
# TODO: can fine tune model here if need with the test data
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
logger.info("----------------------------")
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(prepare_data_params['client_type'])
logger.info("-------------------------")
logger.info(f"--- Initiate MLModel ---")
logger.info("-------------------------")
model_type = build_model_params['model_type']
model = model_factory(model_type)
logger.info("--------------------------")
logger.info(f"--- Build Model Stage ---")
logger.info("--------------------------")
build_model(
dataclient=dataclient,
model=model,
target=build_model_params['target'],
model_save_location=build_model_params['model_save_location'],
model_hyperparameters=build_model_params[model_type],
train_location=prepare_data_params['output_train_filename'],
test_location=prepare_data_params['output_test_filename']
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------===---")

View file

@ -0,0 +1,8 @@
model_type: SKLearnLinearRegression
train_location: ./data/prepared_data/train.parquet
target: target
test_location: ./data/prepared_data/test.parquet
model_save_location: ./data/model/model.joblib
SKLearnSVMRegression:
kernel: "linear"

View file

@ -0,0 +1,3 @@
"""
Stitch all yaml configuration files together, override some settings (such as bucket location) based off environment variables
"""

View file

@ -0,0 +1,4 @@
metrics_type: Regression
test_data_location: ./data/prepared_data/
predictions_output_location: ./data/predictions/predictions.csv
metrics_output_location: ./metrics/metrics.json

View file

@ -0,0 +1,8 @@
dataclient: minio
data_location: s3://dev_bucket
train_proportion: 0.8
output_location: ./data/prepared_data/
output_train_filename: train.parquet
output_test_filename: test.parquet
cache_o

View file

@ -0,0 +1,66 @@
"""
Implementations of the DataClient Protocol
"""
import pandas as pd
from typing import List
from core.interface.InterfaceDataClient import DataClient
def dataclient_factory(dataclient_type: str) -> DataClient:
"""
Determine which dataclient to use
"""
dataclients = {
"minio": MinioClient(),
# ADD MORE DATACLIENTS HERE
}
if dataclient_type not in dataclients:
raise ValueError("Dataclient type specified is not in factory")
return dataclients[dataclient_type]
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class MinioClient:
"""
Using the Minio s3 client, to do local testing
"""
ACCEPTED_CONFIG_KEYS = ["aws_access_key_id", "aws_secret_access_key", "endpoint_url"]
ACCEPTED_LOAD_CONFIG_KEYS = []
ACCEPTED_SAVE_CONFIG_KEYS = []
def ingest_configurations(self, config: dict) -> None:
"""
Load all configuration into the instance (self.config)
"""
validate_dict_keys(keys_1=list(config.keys()), keys_2=self.ACCEPTED_CONFIG_KEYS, config_type="config")
self.config = config
def establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
...
def load_data(self, load_config: dict) -> pd.DataFrame:
"""
When the client is established, we can load data
"""
validate_dict_keys(keys_1=list(load_config.keys()), keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS, config_type="load_config")
return pd.DataFrame()
def save_data(self, obj: object, save_config: dict) -> None:
"""
When the client is established, we can save out objects
"""
validate_dict_keys(keys_1=list(save_config.keys()), keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS, config_type="save_config")

View file

@ -0,0 +1,26 @@
"""
Logger that will be used throughout the application
"""
import logging
def setup_logger():
# Create a logger
logger = logging.getLogger()
# Set the log level
logger.setLevel(logging.INFO)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Create a stream handler to direct logs to stdout
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
# Add the stream handler to the logger
logger.addHandler(stream_handler)
return logger
logger = setup_logger()

View file

@ -0,0 +1,51 @@
"""
Implementation of MLMetrics, all of which will have two methods:
- Generate Metrics Suite
- Generate Plot Suite
"""
import pandas as pd
from typing import Union
from sklearn.metrics import (
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error
)
from core.interface.InterfaceMetrics import MLMetrics
def metrics_factory(metrics_type: str) -> MLMetrics:
metrics = {
"Regression": RegressionMetrics(),
# ADD OTHER METRIC CLASSES HERE
}
if metrics_type not in metrics:
raise ValueError("Metrics type specified is not in factory")
return metrics[metrics_type]
class RegressionMetrics:
METRIC_TO_APPLY = [
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
# max_error
]
def generate_metrics(
self, target: Union[pd.DataFrame, pd.Series], predictions: pd.Series
) -> dict:
"""
Method to generate metrics
"""
metric_dict = {}
for metric_function in self.METRIC_TO_APPLY:
metric_dict[metric_function.__name__] = metric_function(
target, predictions
)
return metric_dict

View file

@ -0,0 +1,127 @@
""""
Implementations of MLModels, all of which will have four methods to:
- Load model
- Save Model
- Train Model
- Geenrate predictions
"""
import os
import joblib
import pandas as pd
from pathlib import Path
from typing import Union, List
from sklearn import linear_model
from sklearn.svm import SVR
from core.interface.InterfaceModels import MLModel
def model_factory(model_type: str) -> MLModel:
"""
Determine which model to use from the model type
"""
models = {
"SKLearnLinearRegression": SKLearnLinearRegression(),
"SKLearnSVMRegression": SKLearnSVMRegression(),
# ADD OTHER MODELS HERE
}
if model_type not in models:
raise ValueError("Model type specified is not in factory")
return models[model_type]
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class SKLearnLinearRegression:
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
string_path = str(path)
self.model = joblib.load(string_path)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
if not path.parent.exists():
os.mkdir(path.parent)
string_path = str(path)
joblib.dump(self.model, string_path)
return string_path
def train_model(self, data: pd.DataFrame, target: str, model_hyperparameters: dict) -> None:
"""
Method to train a model
"""
self.model = linear_model.LinearRegression()
x_train = data.iloc[:, data.columns != target]
y_train = data[target]
self.model.fit(x_train, y_train)
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions
class SKLearnSVMRegression:
MODEL_HYPERPARAMETERS = ["kernel"]
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
string_path = str(path)
self.model = joblib.load(string_path)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
if not path.parent.exists():
os.mkdir(path.parent)
string_path = str(path)
joblib.dump(self.model, string_path)
return string_path
def train_model(self, data: pd.DataFrame, target: str, model_hyperparameters: dict) -> None:
"""
Method to train a model
"""
validate_dict_keys(list(model_hyperparameters.keys()), self.MODEL_HYPERPARAMETERS, config_type="Train_model_config")
self.model = SVR(kernel=model_hyperparameters['kernel'])
x_train = data.iloc[:, data.columns != target]
y_train = data[target]
self.model.fit(x_train, y_train)
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions

View file

@ -0,0 +1,34 @@
"""
Interface for all DataClient i.e. s3, database, local etc
"""
import pandas as pd
from typing import Protocol
class DataClient(Protocol):
"""
Declare the methods required for a DataClient
"""
def ingest_configurations(self, config: dict) -> None:
"""
Load all configuration into the instance (self.config)
"""
...
def establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
...
def load_data(self, load_config: dict) -> pd.DataFrame:
"""
When the client is established, we can load data
"""
...
def save_data(self, obj: object, save_config: dict) -> None:
"""
When the client is established, we can save out objects
"""

View file

@ -0,0 +1,19 @@
"""
Define the interface for creating metrics
"""
import pandas as pd
from typing import Protocol, Union
class MLMetrics(Protocol):
"""
All metrics will need to have the following interface to interact with the ML Pipeline
"""
def generate_metrics(
self, target: Union[pd.DataFrame, pd.Series], predictions: pd.Series
) -> dict:
"""
Method to generate metrics
"""
...

View file

@ -0,0 +1,36 @@
"""
Define the protocol for models in this pipeline
"""
import pandas as pd
from pathlib import Path
from typing import Protocol, Union
class MLModel(Protocol):
"""
All models will need to have the following interface to interact with the ML pipeline
"""
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
...
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
...
def train_model(self, data: pd.DataFrame, target: str, model_hyperparameters: dict) -> None:
"""
Method to train a model
"""
...
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
"""
...

View file

@ -0,0 +1,118 @@
"""
Third part of the pipeline:
After the model is built, we can evaluate its performance
"""
import os
import yaml
import json
import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.Logger import logger
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml"
generate_metrics_params = yaml.safe_load(open(generate_metrics_path))
def generate_metrics(
dataclient: DataClient,
model: MLModel,
metrics: MLMetrics,
target: str,
model_location: str,
test_data_location: str,
predictions_output_location: str,
metrics_output_location: str
):
"""
For a given model, we generate prediction and evaluate this against the true target
"""
logger.info("-------------------------")
logger.info("--- Loading test data ---")
logger.info("-------------------------")
# TODO: replace with client loader here
test_data = pd.read_parquet(test_data_location)
logger.info("---------------------")
logger.info("--- Loading model ---")
logger.info("---------------------")
model.load_model(model_location)
logger.info("------------------------------")
logger.info("--- Generating predictions ---")
logger.info("------------------------------")
# Clean test data for now
prediction_data = test_data.drop(columns=target) if target in test_data.columns else test_data
predictions = model.predict(data=prediction_data)
logger.info("--------------------------")
logger.info("--- Saving predictions ---")
logger.info("--------------------------")
# TODO: replace with client
if not Path(predictions_output_location).parent.exists():
os.mkdir(Path(predictions_output_location).parent)
predictions.to_json(predictions_output_location)
logger.info("--------------------------")
logger.info("--- Generating metrics ---")
logger.info("--------------------------")
metrics_output = metrics.generate_metrics(
target=test_data[target], predictions=predictions
)
logger.info("----------------------")
logger.info("--- Saving metrics ---")
logger.info("----------------------")
# TODO: replace with client
if not Path(metrics_output_location).parent.exists():
os.mkdir(Path(metrics_output_location).parent)
with open(metrics_output_location, "w") as f:
json.dump(metrics_output, f)
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
model = model_factory(build_model_params['model_type'])
dataclient = dataclient_factory(prepare_data_params['dataclient_type'])
metrics = metrics_factory(generate_metrics_params['metrics_type'])
generate_metrics(
dataclient=dataclient,
model=model,
metrics=metrics,
target=build_model_params["target"],
model_location=build_model_params["model_save_location"],
test_data_location=generate_metrics_params["test_data_location"],
predictions_output_location=generate_metrics_params["predictions_output_location"],
metrics_output_location=generate_metrics_params["metrics_output_location"]
)

View file

@ -0,0 +1,103 @@
"""
First part of the pipeline:
Loading data from a client
"""
import os
import yaml
import pandas as pd
from typing import Optional, Tuple, Union
from pathlib import Path
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger
from core.DataClient import dataclient_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
params_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
params = yaml.safe_load(open(params_path))
def use_dummy_data() -> pd.DataFrame:
diabetes_data = load_diabetes()
x_data = pd.DataFrame(diabetes_data['data'], columns=diabetes_data['feature_names']) # type: ignore
y_data = pd.DataFrame(diabetes_data['target'], columns=['target']) # type: ignore
data = pd.concat([x_data, y_data], axis=1)
return data
def prepare_data(dataclient: DataClient, train_proportion: float, output_location: str, output_train_filename: str = "train.parquet", output_test_filename: str = "test.parquet") -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Given a client and location, load data into the pipeline
:param dataclient: DataClient, Determines how to get data from the given provider (cloud or local)
:param pipeline_mode: bool, Default False, this caches out the file for experimentation, objects returned in pipeline mode
"""
logger.info("--------------------")
logger.info("--- Loading data ---")
logger.info("--------------------")
# TODO: REPLACE THIS WIL CLIENT AND LOAD DATA
data = use_dummy_data()
logger.info("----------------------")
logger.info("--- Splitting data ---")
logger.info("----------------------")
train, test = train_test_split(
data, train_size=train_proportion, test_size=(1-train_proportion)
)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
logger.info("--------------------------")
logger.info("-----------------------")
logger.info("--- Outputting data ---")
logger.info("-----------------------")
# TODO: REPLACE WITH CLIENT
output_path = Path(output_location)
if not output_path.exists():
os.mkdir(output_path)
logger.info("--- Outputting train and test data ---")
train.to_csv(output_path / output_train_filename, index=False)
test.to_csv(output_path/ output_test_filename, index=False)
# client.save_data(obj=train)
# client.save_data(obj=test)
return train, test
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
logger.info("----------------------------")
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(params['dataclient_type'])
logger.info("---------------------------")
logger.info(f"--- Prepare Data Stage ---")
logger.info("---------------------------")
prepare_data(
dataclient=dataclient,
train_proportion=params['train_proportion'],
output_location=params['output_location']
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------===---")

View file

@ -0,0 +1,9 @@
joblib==1.3.2
boto3==1.28.41
pandas==1.5.3
dvc==3.18.0
gto==1.0.4
scikit-learn==1.3.0
pre-commit==3.3.3
sphinx==7.2.5
sphinx_rtd_theme==1.3.0

View file

@ -0,0 +1,5 @@
boto3==1.28.41
pandas==1.5.3
dvc==3.18.0
gto==1.0.4
scikit-learn==1.3.0