diff --git a/modules/ml-pipeline/.dvc/config b/modules/ml-pipeline/.dvc/config index e69de29..03ccfbc 100644 --- a/modules/ml-pipeline/.dvc/config +++ b/modules/ml-pipeline/.dvc/config @@ -0,0 +1,2 @@ +['remote "myremote"'] + url = /tmp/dvcstore diff --git a/modules/ml-pipeline/.gitignore b/modules/ml-pipeline/.gitignore index 2a3b661..cd87845 100644 --- a/modules/ml-pipeline/.gitignore +++ b/modules/ml-pipeline/.gitignore @@ -1,3 +1,2 @@ .dev_env/ -data/ __pycache__/ diff --git a/modules/ml-pipeline/src/pipeline/training/build_model.py b/modules/ml-pipeline/src/pipeline/training/build_model.py index 4ab4838..77dd25e 100644 --- a/modules/ml-pipeline/src/pipeline/training/build_model.py +++ b/modules/ml-pipeline/src/pipeline/training/build_model.py @@ -29,8 +29,8 @@ def build_model( target: str, model_save_location: str, model_hyperparameters: dict, - train_location: Union[str, None] = None, - test_location: Union[str, None] = None, + train_filepath: Union[str, None] = None, + test_filepath: Union[str, None] = None, train_data: Union[pd.DataFrame, None] = None, test_data: Union[pd.DataFrame, None] = None, pipeline_mode: bool = False, @@ -41,15 +41,15 @@ def build_model( if train_data is None: # TODO: replace this with the data client to load - if train_location is None: - raise ValueError(f"Need {train_location}") - train_data = pd.read_parquet(train_location) + if train_filepath is None: + raise ValueError(f"Need {train_filepath}") + train_data = pd.read_parquet(train_filepath) if test_data is None: # TODO: replace this with the data client to load - if test_location is None: - raise ValueError(f"Need {test_location}") - test_data = pd.read_parquet(test_location) + if test_filepath is None: + raise ValueError(f"Need {test_filepath}") + test_data = pd.read_parquet(test_filepath) logger.info("----------------------") logger.info("--- Training model ---") @@ -79,7 +79,7 @@ if __name__ == "__main__": logger.info(f"--- Initiate DataClient ---") logger.info("----------------------------") - dataclient = dataclient_factory(prepare_data_params["client_type"]) + dataclient = dataclient_factory(prepare_data_params["dataclient_type"]) logger.info("-------------------------") logger.info(f"--- Initiate MLModel ---") @@ -98,8 +98,8 @@ if __name__ == "__main__": target=build_model_params["target"], model_save_location=build_model_params["model_save_location"], model_hyperparameters=build_model_params[model_type], - train_location=prepare_data_params["output_train_filename"], - test_location=prepare_data_params["output_test_filename"], + train_filepath=prepare_data_params["output_train_filepath"], + test_filepath=prepare_data_params["output_test_filepath"], ) logger.info("-------------------------------") diff --git a/modules/ml-pipeline/src/pipeline/training/configs/build_model.yaml b/modules/ml-pipeline/src/pipeline/training/configs/build_model.yaml index 94e6aa8..940f5ce 100644 --- a/modules/ml-pipeline/src/pipeline/training/configs/build_model.yaml +++ b/modules/ml-pipeline/src/pipeline/training/configs/build_model.yaml @@ -4,5 +4,8 @@ target: target test_location: ./data/prepared_data/test.parquet model_save_location: ./data/model/model.joblib + +SKLearnLinearRegression: null + SKLearnSVMRegression: kernel: "linear" diff --git a/modules/ml-pipeline/src/pipeline/training/configs/generate_metrics.yaml b/modules/ml-pipeline/src/pipeline/training/configs/generate_metrics.yaml index a032918..a370f9f 100644 --- a/modules/ml-pipeline/src/pipeline/training/configs/generate_metrics.yaml +++ b/modules/ml-pipeline/src/pipeline/training/configs/generate_metrics.yaml @@ -1,4 +1,2 @@ metrics_type: Regression -test_data_location: ./data/prepared_data/ -predictions_output_location: ./data/predictions/predictions.csv -metrics_output_location: ./metrics/metrics.json +metrics_output_filepath: ./metrics/metrics.json diff --git a/modules/ml-pipeline/src/pipeline/training/configs/generate_predictions.yaml b/modules/ml-pipeline/src/pipeline/training/configs/generate_predictions.yaml new file mode 100644 index 0000000..89444ad --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/configs/generate_predictions.yaml @@ -0,0 +1,2 @@ +test_data_filepath: ./data/prepared_data/test.parquet +predictions_output_filepath: ./data/predictions/predictions.parquet diff --git a/modules/ml-pipeline/src/pipeline/training/configs/prepare_data.yaml b/modules/ml-pipeline/src/pipeline/training/configs/prepare_data.yaml index 17b36ce..9a0c3bd 100644 --- a/modules/ml-pipeline/src/pipeline/training/configs/prepare_data.yaml +++ b/modules/ml-pipeline/src/pipeline/training/configs/prepare_data.yaml @@ -1,8 +1,7 @@ dataclient_type: minio data_location: s3://dev_bucket train_proportion: 0.8 -output_location: ./data/prepared_data/ -output_train_filename: train.parquet -output_test_filename: test.parquet +output_train_filepath: ./data/prepared_data/train.parquet +output_test_filepath: ./data/prepared_data/test.parquet # cache_o diff --git a/modules/ml-pipeline/src/pipeline/training/data/model/.gitignore b/modules/ml-pipeline/src/pipeline/training/data/model/.gitignore new file mode 100644 index 0000000..565a9d5 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/model/.gitignore @@ -0,0 +1 @@ +/model.joblib diff --git a/modules/ml-pipeline/src/pipeline/training/data/model/model.joblib.dvc b/modules/ml-pipeline/src/pipeline/training/data/model/model.joblib.dvc new file mode 100644 index 0000000..d86a291 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/model/model.joblib.dvc @@ -0,0 +1,5 @@ +outs: +- md5: 1edb002d64bf2b5e9853b1f81ab4c47c + size: 1096 + hash: md5 + path: model.joblib diff --git a/modules/ml-pipeline/src/pipeline/training/data/predictions/.gitignore b/modules/ml-pipeline/src/pipeline/training/data/predictions/.gitignore new file mode 100644 index 0000000..c481d45 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/predictions/.gitignore @@ -0,0 +1 @@ +/predictions.parquet diff --git a/modules/ml-pipeline/src/pipeline/training/data/predictions/predictions.parquet.dvc b/modules/ml-pipeline/src/pipeline/training/data/predictions/predictions.parquet.dvc new file mode 100644 index 0000000..e0c1b42 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/predictions/predictions.parquet.dvc @@ -0,0 +1,5 @@ +outs: +- md5: dc6e327c8048a3b485baf73bd5a0284c + size: 2531 + hash: md5 + path: predictions.parquet diff --git a/modules/ml-pipeline/src/pipeline/training/data/prepared_data/.gitignore b/modules/ml-pipeline/src/pipeline/training/data/prepared_data/.gitignore new file mode 100644 index 0000000..8b913b6 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/prepared_data/.gitignore @@ -0,0 +1,2 @@ +/test.parquet +/train.parquet diff --git a/modules/ml-pipeline/src/pipeline/training/data/prepared_data/test.parquet.dvc b/modules/ml-pipeline/src/pipeline/training/data/prepared_data/test.parquet.dvc new file mode 100644 index 0000000..fb9df54 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/prepared_data/test.parquet.dvc @@ -0,0 +1,5 @@ +outs: +- md5: 13c6c812693b10cfb32daf286a8dd372 + size: 13587 + hash: md5 + path: test.parquet diff --git a/modules/ml-pipeline/src/pipeline/training/data/prepared_data/train.parquet.dvc b/modules/ml-pipeline/src/pipeline/training/data/prepared_data/train.parquet.dvc new file mode 100644 index 0000000..4b908ad --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/data/prepared_data/train.parquet.dvc @@ -0,0 +1,5 @@ +outs: +- md5: 92b2085257087bf509e437b23cf51270 + size: 22565 + hash: md5 + path: train.parquet diff --git a/modules/ml-pipeline/src/pipeline/training/dvc.yaml b/modules/ml-pipeline/src/pipeline/training/dvc.yaml new file mode 100644 index 0000000..e69de29 diff --git a/modules/ml-pipeline/src/pipeline/training/generate_metrics.py b/modules/ml-pipeline/src/pipeline/training/generate_metrics.py index 7a77e2a..e1a4638 100644 --- a/modules/ml-pipeline/src/pipeline/training/generate_metrics.py +++ b/modules/ml-pipeline/src/pipeline/training/generate_metrics.py @@ -25,6 +25,11 @@ prepare_data_params = yaml.safe_load(open(prepare_data_path)) build_model_path = Path(__file__).parent / "configs" / "build_model.yaml" build_model_params = yaml.safe_load(open(build_model_path)) +generate_predictions_path = ( + Path(__file__).parent / "configs" / "generate_predictions.yaml" +) +generate_predictions_params = yaml.safe_load(open(generate_predictions_path)) + generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml" generate_metrics_params = yaml.safe_load(open(generate_metrics_path)) @@ -34,10 +39,9 @@ def generate_metrics( model: MLModel, metrics: MLMetrics, target: str, - model_location: str, - test_data_location: str, - predictions_output_location: str, - metrics_output_location: str, + test_data_filepath: str, + predictions_output_filepath: str, + metrics_output_filepath: str, ): """ For a given model, we generate prediction and evaluate this against the true target @@ -48,35 +52,14 @@ def generate_metrics( logger.info("-------------------------") # TODO: replace with client loader here - test_data = pd.read_parquet(test_data_location) + test_data = pd.read_parquet(test_data_filepath) - logger.info("---------------------") - logger.info("--- Loading model ---") - logger.info("---------------------") + logger.info("---------------------------") + logger.info("--- Loading predictions ---") + logger.info("---------------------------") - model.load_model(model_location) - - logger.info("------------------------------") - logger.info("--- Generating predictions ---") - logger.info("------------------------------") - - # Clean test data for now - prediction_data = ( - test_data.drop(columns=target) if target in test_data.columns else test_data - ) - - predictions = model.predict(data=prediction_data) - - logger.info("--------------------------") - logger.info("--- Saving predictions ---") - logger.info("--------------------------") - - # TODO: replace with client - - if not Path(predictions_output_location).parent.exists(): - os.mkdir(Path(predictions_output_location).parent) - - predictions.to_json(predictions_output_location) + # TODO: replace with client loader here + predictions = pd.read_parquet(predictions_output_filepath) logger.info("--------------------------") logger.info("--- Generating metrics ---") @@ -92,10 +75,10 @@ def generate_metrics( # TODO: replace with client - if not Path(metrics_output_location).parent.exists(): - os.mkdir(Path(metrics_output_location).parent) + if not Path(metrics_output_filepath).parent.exists(): + os.mkdir(Path(metrics_output_filepath).parent) - with open(metrics_output_location, "w") as f: + with open(metrics_output_filepath, "w") as f: json.dump(metrics_output, f) @@ -114,10 +97,9 @@ if __name__ == "__main__": model=model, metrics=metrics, target=build_model_params["target"], - model_location=build_model_params["model_save_location"], - test_data_location=generate_metrics_params["test_data_location"], - predictions_output_location=generate_metrics_params[ - "predictions_output_location" + test_data_filepath=generate_predictions_params["test_data_filepath"], + predictions_output_filepath=generate_predictions_params[ + "predictions_output_filepath" ], - metrics_output_location=generate_metrics_params["metrics_output_location"], + metrics_output_filepath=generate_metrics_params["metrics_output_filepath"], ) diff --git a/modules/ml-pipeline/src/pipeline/training/generate_predictions.py b/modules/ml-pipeline/src/pipeline/training/generate_predictions.py new file mode 100644 index 0000000..ff67344 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/generate_predictions.py @@ -0,0 +1,101 @@ +""" +Third part of the pipeline: +After the model is built, we can evaluate its performance +""" + +import os +import yaml +import json +import pandas as pd +from pathlib import Path +from core.interface.InterfaceModels import MLModel +from core.interface.InterfaceDataClient import DataClient +from core.DataClient import dataclient_factory +from core.MLModels import model_factory +from core.MLMetrics import metrics_factory +from core.Logger import logger + + +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") + +prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml" +prepare_data_params = yaml.safe_load(open(prepare_data_path)) + +build_model_path = Path(__file__).parent / "configs" / "build_model.yaml" +build_model_params = yaml.safe_load(open(build_model_path)) + +generate_predictions_path = ( + Path(__file__).parent / "configs" / "generate_predictions.yaml" +) +generate_predictions_params = yaml.safe_load(open(generate_predictions_path)) + + +def generate_predictions( + dataclient: DataClient, + model: MLModel, + target: str, + model_location: str, + test_data_filepath: str, + predictions_output_filepath: str, +): + """ + For a given model, we generate prediction and evaluate this against the true target + """ + + logger.info("-------------------------") + logger.info("--- Loading test data ---") + logger.info("-------------------------") + + # TODO: replace with client loader here + test_data = pd.read_parquet(test_data_filepath) + + logger.info("---------------------") + logger.info("--- Loading model ---") + logger.info("---------------------") + + model.load_model(model_location) + + logger.info("------------------------------") + logger.info("--- Generating predictions ---") + logger.info("------------------------------") + + # Clean test data for now + prediction_data = ( + test_data.drop(columns=target) if target in test_data.columns else test_data + ) + + predictions = model.predict(data=prediction_data) + + logger.info("--------------------------") + logger.info("--- Saving predictions ---") + logger.info("--------------------------") + + # TODO: replace with client + + if not Path(predictions_output_filepath).parent.exists(): + os.mkdir(Path(predictions_output_filepath).parent) + + pd.DataFrame(predictions, columns=["predictions"]).to_parquet( + predictions_output_filepath + ) + + +if __name__ == "__main__": + + logger.info("----------------------------") + logger.info(f"--- {__file__} - Start! ---") + logger.info("----------------------------") + + model = model_factory(build_model_params["model_type"]) + dataclient = dataclient_factory(prepare_data_params["dataclient_type"]) + + generate_predictions( + dataclient=dataclient, + model=model, + target=build_model_params["target"], + model_location=build_model_params["model_save_location"], + test_data_filepath=generate_predictions_params["test_data_filepath"], + predictions_output_filepath=generate_predictions_params[ + "predictions_output_filepath" + ], + ) diff --git a/modules/ml-pipeline/src/pipeline/training/metrics/metrics.json b/modules/ml-pipeline/src/pipeline/training/metrics/metrics.json new file mode 100644 index 0000000..97a3c80 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/training/metrics/metrics.json @@ -0,0 +1 @@ +{"mean_absolute_error": 44.778045422034474, "median_absolute_error": 34.43498827111321, "mean_squared_error": 3157.322503544217, "mean_absolute_percentage_error": 0.3775608927188337} diff --git a/modules/ml-pipeline/src/pipeline/training/prepare_data.py b/modules/ml-pipeline/src/pipeline/training/prepare_data.py index d25efd9..dca9b57 100644 --- a/modules/ml-pipeline/src/pipeline/training/prepare_data.py +++ b/modules/ml-pipeline/src/pipeline/training/prepare_data.py @@ -33,9 +33,8 @@ def use_dummy_data() -> pd.DataFrame: def prepare_data( dataclient: DataClient, train_proportion: float, - output_location: str, - output_train_filename: str = "train.parquet", - output_test_filename: str = "test.parquet", + output_train_filepath: str = "train.parquet", + output_test_filepath: str = "test.parquet", ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Given a client and location, load data into the pipeline @@ -67,13 +66,17 @@ def prepare_data( logger.info("-----------------------") # TODO: REPLACE WITH CLIENT - output_path = Path(output_location) - if not output_path.exists(): - os.makedirs(output_path) + output_directory = Path(output_train_filepath) + if not output_directory.parent.exists(): + os.makedirs(output_directory.parent) + + output_directory = Path(output_test_filepath) + if not output_directory.parent.exists(): + os.makedirs(output_directory.parent) logger.info("--- Outputting train and test data ---") - train.to_csv(output_path / output_train_filename, index=False) - test.to_csv(output_path / output_test_filename, index=False) + train.to_parquet(output_train_filepath) + test.to_parquet(output_test_filepath) # client.save_data(obj=train) # client.save_data(obj=test) @@ -100,7 +103,8 @@ if __name__ == "__main__": prepare_data( dataclient=dataclient, train_proportion=params["train_proportion"], - output_location=params["output_location"], + output_train_filepath=params["output_train_filepath"], + output_test_filepath=params["output_test_filepath"], ) logger.info("-------------------------------") diff --git a/modules/ml-pipeline/src/pipeline/training/requirements/requirements-dev.txt b/modules/ml-pipeline/src/pipeline/training/requirements/requirements-dev.txt index 66281aa..f15e78f 100644 --- a/modules/ml-pipeline/src/pipeline/training/requirements/requirements-dev.txt +++ b/modules/ml-pipeline/src/pipeline/training/requirements/requirements-dev.txt @@ -4,6 +4,7 @@ pandas==1.5.3 dvc==3.18.0 gto==1.0.4 scikit-learn==1.3.0 +pyarrow==13.0.0 pre-commit==3.3.3 sphinx==7.2.5 sphinx_rtd_theme==1.3.0