add pipeline structure

This commit is contained in:
Michael Duong 2023-09-09 10:07:30 +00:00
parent d907c64ee6
commit 0d18b440c1
25 changed files with 171 additions and 120 deletions

View file

@ -0,0 +1,12 @@
# Pre commit hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 22.10.0
hooks:
- id: black

View file

@ -11,7 +11,7 @@ dev-pyenv:
pyenv install ${PYTHON_VERSION} || echo "Python version already installed"
pyenv global ${PYTHON_VERSION}
python3 -m venv .dev_env
. .dev_env/bin/activate && pip install --upgrade pip && pip install -r src/training/requirements/requirements-dev.txt && pre-commit install
. .dev_env/bin/activate && pip install --upgrade pip && pip install -r src/pipeline/training/requirements/requirements-dev.txt && pre-commit install
echo "TO ACTIVATE ENVIRONMENT, USE THE FOLLOWING COMMAND"
echo "source .dev_env/bin/activate"

View file

@ -2,4 +2,3 @@
FROM python:3.9-slim
RUN pip install -r experimentation/requirements/training.txt

View file

@ -2,4 +2,4 @@
This folder contains the inference codebase to:
- Load a model
- Generate a prediction
- Generate a prediction

View file

@ -1,3 +1,3 @@
"""
Pipeline that stitches all steps together
"""
"""

View file

@ -25,13 +25,11 @@ Workflow:
- Use `dvc metrics show` to view current metrics score
- Adjust parameters/ codebase
- When happy with changes, use `dvc exp run` to trigger an experiment
- Due to cache, only need stages are re-run
- Due to cache, only need stages are re-run
- Use `dvc metrics diff` to check the change in metrics
- Use `dvc exp show` to view all experiments
- NOTE: the last experiment will always be applied to the workspace!
- Use `dvc exp show` to view all experiments
- NOTE: the last experiment will always be applied to the workspace!
- After running experiments, you can apply the the best model to workspace using `dvc exp apply [EXPERIMENT_NAME]`
- This experiment will have the corresponding .dvc files for the hashed model and data
- Use version control as normal
- git add, git commit etc

View file

@ -3,7 +3,7 @@ Second Pipieline step:
Once we have the features, we build a model
"""
import os
import os
import yaml
import pandas as pd
from typing import Union
@ -22,39 +22,42 @@ prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
def build_model(
dataclient: DataClient,
model: MLModel,
target: str,
model_save_location: str,
model_hyperparameters: dict,
train_location: Union[str, None] = None,
test_location: Union[str, None] = None,
train_data: Union[pd.DataFrame, None] = None,
test_data: Union[pd.DataFrame, None] = None,
pipeline_mode: bool = False
):
dataclient: DataClient,
model: MLModel,
target: str,
model_save_location: str,
model_hyperparameters: dict,
train_location: Union[str, None] = None,
test_location: Union[str, None] = None,
train_data: Union[pd.DataFrame, None] = None,
test_data: Union[pd.DataFrame, None] = None,
pipeline_mode: bool = False,
):
logger.info("--------------------------------------")
logger.info("--- Loading Data for build process ---")
logger.info("--------------------------------------")
if train_data is None:
# TODO: replace this with the data client to load
# TODO: replace this with the data client to load
if train_location is None:
raise ValueError(f"Need {train_location}")
raise ValueError(f"Need {train_location}")
train_data = pd.read_parquet(train_location)
if test_data is None:
# TODO: replace this with the data client to load
# TODO: replace this with the data client to load
if test_location is None:
raise ValueError(f"Need {test_location}")
raise ValueError(f"Need {test_location}")
test_data = pd.read_parquet(test_location)
logger.info("----------------------")
logger.info("--- Training model ---")
logger.info("----------------------")
model.train_model(data=train_data, target=target, model_hyperparameters=model_hyperparameters)
model.train_model(
data=train_data, target=target, model_hyperparameters=model_hyperparameters
)
logger.info("--------------------")
logger.info("--- Saving model ---")
@ -62,7 +65,7 @@ def build_model(
model.save_model(path=Path(model_save_location))
# TODO: replace this with the data client to load
# TODO: replace this with the data client to load
# TODO: can fine tune model here if need with the test data
@ -76,13 +79,13 @@ if __name__ == "__main__":
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(prepare_data_params['client_type'])
dataclient = dataclient_factory(prepare_data_params["client_type"])
logger.info("-------------------------")
logger.info(f"--- Initiate MLModel ---")
logger.info("-------------------------")
model_type = build_model_params['model_type']
model_type = build_model_params["model_type"]
model = model_factory(model_type)
logger.info("--------------------------")
@ -92,13 +95,13 @@ if __name__ == "__main__":
build_model(
dataclient=dataclient,
model=model,
target=build_model_params['target'],
model_save_location=build_model_params['model_save_location'],
target=build_model_params["target"],
model_save_location=build_model_params["model_save_location"],
model_hyperparameters=build_model_params[model_type],
train_location=prepare_data_params['output_train_filename'],
test_location=prepare_data_params['output_test_filename']
)
train_location=prepare_data_params["output_train_filename"],
test_location=prepare_data_params["output_test_filename"],
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------===---")
logger.info("-------------------------===---")

View file

@ -5,4 +5,4 @@ test_location: ./data/prepared_data/test.parquet
model_save_location: ./data/model/model.joblib
SKLearnSVMRegression:
kernel: "linear"
kernel: "linear"

View file

@ -1,3 +1,3 @@
"""
Stitch all yaml configuration files together, override some settings (such as bucket location) based off environment variables
"""
"""

View file

@ -1,4 +1,4 @@
metrics_type: Regression
test_data_location: ./data/prepared_data/
predictions_output_location: ./data/predictions/predictions.csv
metrics_output_location: ./metrics/metrics.json
metrics_output_location: ./metrics/metrics.json

View file

@ -5,4 +5,4 @@ output_location: ./data/prepared_data/
output_train_filename: train.parquet
output_test_filename: test.parquet
cache_o
# cache_o

View file

@ -6,6 +6,7 @@ import pandas as pd
from typing import List
from core.interface.InterfaceDataClient import DataClient
def dataclient_factory(dataclient_type: str) -> DataClient:
"""
Determine which dataclient to use
@ -17,7 +18,7 @@ def dataclient_factory(dataclient_type: str) -> DataClient:
if dataclient_type not in dataclients:
raise ValueError("Dataclient type specified is not in factory")
return dataclients[dataclient_type]
@ -25,12 +26,17 @@ def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class MinioClient:
"""
Using the Minio s3 client, to do local testing
"""
ACCEPTED_CONFIG_KEYS = ["aws_access_key_id", "aws_secret_access_key", "endpoint_url"]
ACCEPTED_CONFIG_KEYS = [
"aws_access_key_id",
"aws_secret_access_key",
"endpoint_url",
]
ACCEPTED_LOAD_CONFIG_KEYS = []
ACCEPTED_SAVE_CONFIG_KEYS = []
@ -38,10 +44,14 @@ class MinioClient:
"""
Load all configuration into the instance (self.config)
"""
validate_dict_keys(keys_1=list(config.keys()), keys_2=self.ACCEPTED_CONFIG_KEYS, config_type="config")
validate_dict_keys(
keys_1=list(config.keys()),
keys_2=self.ACCEPTED_CONFIG_KEYS,
config_type="config",
)
self.config = config
def establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
@ -53,14 +63,20 @@ class MinioClient:
"""
When the client is established, we can load data
"""
validate_dict_keys(keys_1=list(load_config.keys()), keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS, config_type="load_config")
validate_dict_keys(
keys_1=list(load_config.keys()),
keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS,
config_type="load_config",
)
return pd.DataFrame()
def save_data(self, obj: object, save_config: dict) -> None:
"""
When the client is established, we can save out objects
"""
validate_dict_keys(keys_1=list(save_config.keys()), keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS, config_type="save_config")
validate_dict_keys(
keys_1=list(save_config.keys()),
keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS,
config_type="save_config",
)

View file

@ -2,25 +2,27 @@
Logger that will be used throughout the application
"""
import logging
import logging
def setup_logger():
# Create a logger
logger = logging.getLogger()
# Set the log level
logger.setLevel(logging.INFO)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# Create a stream handler to direct logs to stdout
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
# Add the stream handler to the logger
logger.addHandler(stream_handler)
return logger
logger = setup_logger()
logger = setup_logger()

View file

@ -10,10 +10,11 @@ from sklearn.metrics import (
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error
mean_absolute_percentage_error,
)
from core.interface.InterfaceMetrics import MLMetrics
def metrics_factory(metrics_type: str) -> MLMetrics:
metrics = {
"Regression": RegressionMetrics(),
@ -22,9 +23,10 @@ def metrics_factory(metrics_type: str) -> MLMetrics:
if metrics_type not in metrics:
raise ValueError("Metrics type specified is not in factory")
return metrics[metrics_type]
class RegressionMetrics:
METRIC_TO_APPLY = [
@ -36,7 +38,7 @@ class RegressionMetrics:
]
def generate_metrics(
self, target: Union[pd.DataFrame, pd.Series], predictions: pd.Series
self, target: Union[pd.DataFrame, pd.Series], predictions: pd.Series
) -> dict:
"""
Method to generate metrics
@ -44,8 +46,6 @@ class RegressionMetrics:
metric_dict = {}
for metric_function in self.METRIC_TO_APPLY:
metric_dict[metric_function.__name__] = metric_function(
target, predictions
)
metric_dict[metric_function.__name__] = metric_function(target, predictions)
return metric_dict
return metric_dict

View file

@ -7,7 +7,7 @@ Implementations of MLModels, all of which will have four methods to:
"""
import os
import joblib
import joblib
import pandas as pd
from pathlib import Path
from typing import Union, List
@ -15,6 +15,7 @@ from sklearn import linear_model
from sklearn.svm import SVR
from core.interface.InterfaceModels import MLModel
def model_factory(model_type: str) -> MLModel:
"""
Determine which model to use from the model type
@ -27,7 +28,7 @@ def model_factory(model_type: str) -> MLModel:
if model_type not in models:
raise ValueError("Model type specified is not in factory")
return models[model_type]
@ -37,7 +38,6 @@ def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
class SKLearnLinearRegression:
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
@ -51,7 +51,7 @@ class SKLearnLinearRegression:
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
if not path.parent.exists():
os.mkdir(path.parent)
@ -60,7 +60,9 @@ class SKLearnLinearRegression:
return string_path
def train_model(self, data: pd.DataFrame, target: str, model_hyperparameters: dict) -> None:
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
@ -70,10 +72,9 @@ class SKLearnLinearRegression:
y_train = data[target]
self.model.fit(x_train, y_train)
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
Method to predict
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions
@ -82,21 +83,21 @@ class SKLearnLinearRegression:
class SKLearnSVMRegression:
MODEL_HYPERPARAMETERS = ["kernel"]
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
string_path = str(path)
self.model = joblib.load(string_path)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
if not path.parent.exists():
os.mkdir(path.parent)
@ -105,23 +106,28 @@ class SKLearnSVMRegression:
return string_path
def train_model(self, data: pd.DataFrame, target: str, model_hyperparameters: dict) -> None:
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
validate_dict_keys(list(model_hyperparameters.keys()), self.MODEL_HYPERPARAMETERS, config_type="Train_model_config")
self.model = SVR(kernel=model_hyperparameters['kernel'])
validate_dict_keys(
list(model_hyperparameters.keys()),
self.MODEL_HYPERPARAMETERS,
config_type="Train_model_config",
)
self.model = SVR(kernel=model_hyperparameters["kernel"])
x_train = data.iloc[:, data.columns != target]
y_train = data[target]
self.model.fit(x_train, y_train)
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
Method to predict
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions
return self.predictions

View file

@ -5,6 +5,7 @@ Interface for all DataClient i.e. s3, database, local etc
import pandas as pd
from typing import Protocol
class DataClient(Protocol):
"""
Declare the methods required for a DataClient
@ -15,7 +16,7 @@ class DataClient(Protocol):
Load all configuration into the instance (self.config)
"""
...
def establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)

View file

@ -5,15 +5,16 @@ Define the interface for creating metrics
import pandas as pd
from typing import Protocol, Union
class MLMetrics(Protocol):
"""
All metrics will need to have the following interface to interact with the ML Pipeline
"""
def generate_metrics(
self, target: Union[pd.DataFrame, pd.Series], predictions: pd.Series
self, target: Union[pd.DataFrame, pd.Series], predictions: pd.Series
) -> dict:
"""
Method to generate metrics
"""
...
...

View file

@ -2,10 +2,11 @@
Define the protocol for models in this pipeline
"""
import pandas as pd
import pandas as pd
from pathlib import Path
from typing import Protocol, Union
class MLModel(Protocol):
"""
All models will need to have the following interface to interact with the ML pipeline
@ -16,14 +17,16 @@ class MLModel(Protocol):
Method to load a model
"""
...
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
...
def train_model(self, data: pd.DataFrame, target: str, model_hyperparameters: dict) -> None:
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
@ -31,6 +34,6 @@ class MLModel(Protocol):
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
Method to predict
"""
...
...

View file

@ -10,7 +10,7 @@ import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
@ -37,7 +37,7 @@ def generate_metrics(
model_location: str,
test_data_location: str,
predictions_output_location: str,
metrics_output_location: str
metrics_output_location: str,
):
"""
For a given model, we generate prediction and evaluate this against the true target
@ -61,7 +61,9 @@ def generate_metrics(
logger.info("------------------------------")
# Clean test data for now
prediction_data = test_data.drop(columns=target) if target in test_data.columns else test_data
prediction_data = (
test_data.drop(columns=target) if target in test_data.columns else test_data
)
predictions = model.predict(data=prediction_data)
@ -73,7 +75,7 @@ def generate_metrics(
if not Path(predictions_output_location).parent.exists():
os.mkdir(Path(predictions_output_location).parent)
predictions.to_json(predictions_output_location)
logger.info("--------------------------")
@ -92,27 +94,30 @@ def generate_metrics(
if not Path(metrics_output_location).parent.exists():
os.mkdir(Path(metrics_output_location).parent)
with open(metrics_output_location, "w") as f:
json.dump(metrics_output, f)
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
model = model_factory(build_model_params['model_type'])
dataclient = dataclient_factory(prepare_data_params['dataclient_type'])
metrics = metrics_factory(generate_metrics_params['metrics_type'])
model = model_factory(build_model_params["model_type"])
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
metrics = metrics_factory(generate_metrics_params["metrics_type"])
generate_metrics(
dataclient=dataclient,
model=model,
metrics=metrics,
target=build_model_params["target"],
model_location=build_model_params["model_save_location"],
test_data_location=generate_metrics_params["test_data_location"],
predictions_output_location=generate_metrics_params["predictions_output_location"],
metrics_output_location=generate_metrics_params["metrics_output_location"]
)
dataclient=dataclient,
model=model,
metrics=metrics,
target=build_model_params["target"],
model_location=build_model_params["model_save_location"],
test_data_location=generate_metrics_params["test_data_location"],
predictions_output_location=generate_metrics_params[
"predictions_output_location"
],
metrics_output_location=generate_metrics_params["metrics_output_location"],
)

View file

@ -23,16 +23,22 @@ params = yaml.safe_load(open(params_path))
def use_dummy_data() -> pd.DataFrame:
diabetes_data = load_diabetes()
x_data = pd.DataFrame(diabetes_data['data'], columns=diabetes_data['feature_names']) # type: ignore
y_data = pd.DataFrame(diabetes_data['target'], columns=['target']) # type: ignore
x_data = pd.DataFrame(diabetes_data["data"], columns=diabetes_data["feature_names"]) # type: ignore
y_data = pd.DataFrame(diabetes_data["target"], columns=["target"]) # type: ignore
data = pd.concat([x_data, y_data], axis=1)
return data
def prepare_data(dataclient: DataClient, train_proportion: float, output_location: str, output_train_filename: str = "train.parquet", output_test_filename: str = "test.parquet") -> Tuple[pd.DataFrame, pd.DataFrame]:
def prepare_data(
dataclient: DataClient,
train_proportion: float,
output_location: str,
output_train_filename: str = "train.parquet",
output_test_filename: str = "test.parquet",
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Given a client and location, load data into the pipeline
Given a client and location, load data into the pipeline
:param dataclient: DataClient, Determines how to get data from the given provider (cloud or local)
:param pipeline_mode: bool, Default False, this caches out the file for experimentation, objects returned in pipeline mode
"""
@ -49,15 +55,13 @@ def prepare_data(dataclient: DataClient, train_proportion: float, output_locatio
logger.info("----------------------")
train, test = train_test_split(
data, train_size=train_proportion, test_size=(1-train_proportion)
data, train_size=train_proportion, test_size=(1 - train_proportion)
)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
logger.info("--------------------------")
logger.info("-----------------------")
logger.info("--- Outputting data ---")
logger.info("-----------------------")
@ -69,13 +73,14 @@ def prepare_data(dataclient: DataClient, train_proportion: float, output_locatio
logger.info("--- Outputting train and test data ---")
train.to_csv(output_path / output_train_filename, index=False)
test.to_csv(output_path/ output_test_filename, index=False)
test.to_csv(output_path / output_test_filename, index=False)
# client.save_data(obj=train)
# client.save_data(obj=test)
return train, test
if __name__ == "__main__":
logger.info("----------------------------")
@ -86,16 +91,16 @@ if __name__ == "__main__":
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(params['dataclient_type'])
dataclient = dataclient_factory(params["dataclient_type"])
logger.info("---------------------------")
logger.info(f"--- Prepare Data Stage ---")
logger.info("---------------------------")
prepare_data(
dataclient=dataclient,
train_proportion=params['train_proportion'],
output_location=params['output_location']
dataclient=dataclient,
train_proportion=params["train_proportion"],
output_location=params["output_location"],
)
logger.info("-------------------------------")

View file

@ -6,4 +6,4 @@ gto==1.0.4
scikit-learn==1.3.0
pre-commit==3.3.3
sphinx==7.2.5
sphinx_rtd_theme==1.3.0
sphinx_rtd_theme==1.3.0

View file

@ -2,4 +2,4 @@ boto3==1.28.41
pandas==1.5.3
dvc==3.18.0
gto==1.0.4
scikit-learn==1.3.0
scikit-learn==1.3.0