Merge pull request #10 from Hestia-Homes/model-test

Model test
This commit is contained in:
quandanrepo 2023-09-12 14:57:03 +01:00 committed by GitHub
commit d0a27cc99b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 869 additions and 134 deletions

Binary file not shown.

View file

@ -12,6 +12,7 @@ from core.Logger import logger
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.DataHandler import datahandler_factory
from core.MLModels import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
@ -22,6 +23,9 @@ prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def build_model(
dataclient: DataClient,
@ -40,16 +44,16 @@ def build_model(
logger.info("--------------------------------------")
if train_data is None:
# TODO: replace this with the data client to load
if train_filepath is None:
raise ValueError(f"Need {train_filepath}")
train_data = pd.read_parquet(train_filepath)
raise ValueError(f"Need {train_filepath} if no data supplied")
train_data = datahandler.load_data(
dataclient=dataclient, location=train_filepath
)
if test_data is None:
# TODO: replace this with the data client to load
if test_filepath is None:
raise ValueError(f"Need {test_filepath}")
test_data = pd.read_parquet(test_filepath)
raise ValueError(f"Need {test_filepath} if no data supplied")
test_data = datahandler.load_data(dataclient=dataclient, location=test_filepath)
logger.info("----------------------")
logger.info("--- Training model ---")
@ -65,9 +69,6 @@ def build_model(
model.save_model(path=Path(model_save_location))
# TODO: replace this with the data client to load
# TODO: can fine tune model here if need with the test data
if __name__ == "__main__":
@ -79,7 +80,13 @@ if __name__ == "__main__":
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
dataclient = dataclient_factory(prepare_data_params["output_dataclient_type"])
logger.info("-----------------------------")
logger.info(f"--- Initiate DataHandler ---")
logger.info("-----------------------------")
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
logger.info("-------------------------")
logger.info(f"--- Initiate MLModel ---")
@ -95,7 +102,7 @@ if __name__ == "__main__":
build_model(
dataclient=dataclient,
model=model,
target=build_model_params["target"],
target=feature_process_params["feature_processor_config"]["target"],
model_save_location=build_model_params["model_save_filepath"],
model_hyperparameters=build_model_params[model_type],
train_filepath=prepare_data_params["output_train_filepath"],

View file

@ -1,8 +1,15 @@
model_type: SKLearnLinearRegression
target: target
model_save_filepath: ./data/model/model.joblib
model_type: AutogluonAutoML
model_save_filepath: ./data/model/autogluonmodel/
SKLearnLinearRegression: null
SKLearnSVMRegression:
kernel: "linear"
AutogluonAutoML:
output_filepath: ./data/model/autogluonmodel/
problem_type: regression
eval_metric: mean_absolute_error
time_limit: 200
presets: medium_quality
excluded_model_types: null

View file

@ -0,0 +1,8 @@
feature_processor_type: dataframe
feature_processor_config:
subsample_amount: null
subsample_seed: 0
target: RDSAP_CHANGE
drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE"]
# retain_features: ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
retain_features: null

View file

@ -0,0 +1,13 @@
"""
During the feature processor step, we can apply additional business logic and feature generation by defining them here
"""
"""
Business Logic dict + functions
"""
business_logic = {}
"""
New features dict + function
"""
new_feature_funcs = {}

View file

@ -1,2 +1,5 @@
dataclient_type: local
input_datahandler_type: parquet
output_datahandler_type: json
metrics_type: Regression
metrics_output_filepath: ./metrics/metrics.json

View file

@ -1,3 +1,5 @@
input_dataclient_type: local
output_dataclient_type: local
test_data_filepath: ./data/prepared_data/test.parquet
predictions_output_filepath: ./data/predictions/predictions.parquet
predictions_column_name: predictions

View file

@ -1,5 +1,13 @@
dataclient_type: minio
data_location: s3://dev_bucket
input_dataclient_type: aws-s3
input_dataclient:
AWS_ACCESS_KEY_ID: null
AWS_SECRET_ACCESS_KEY: null
ENDPOINT_URL: null
output_dataclient_type: local
output_dataclient:
null
datahandler_type: parquet
data_filepath: s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet
train_proportion: 0.8
output_train_filepath: ./data/prepared_data/train.parquet
output_test_filepath: ./data/prepared_data/test.parquet

View file

@ -0,0 +1,2 @@
artefacts: ./data
metrics: ./metrics

View file

@ -2,9 +2,13 @@
Implementations of the DataClient Protocol
"""
import pandas as pd
import os
import boto3
from pathlib import Path
from io import BytesIO
from typing import List
from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger
def dataclient_factory(dataclient_type: str) -> DataClient:
@ -12,7 +16,8 @@ def dataclient_factory(dataclient_type: str) -> DataClient:
Determine which dataclient to use
"""
dataclients = {
"minio": MinioClient(),
"local": LocalClient(),
"aws-s3": AWSS3Client(),
# ADD MORE DATACLIENTS HERE
}
@ -27,15 +32,69 @@ def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
raise ValueError(f"Incorrect {config_type} keys specified")
class MinioClient:
# class MinioClient:
# """
# Using the Minio s3 client, to do local testing
# """
# ACCEPTED_CONFIG_KEYS = [
# "aws_access_key_id",
# "aws_secret_access_key",
# "endpoint_url",
# ]
# ACCEPTED_LOAD_CONFIG_KEYS = []
# ACCEPTED_SAVE_CONFIG_KEYS = []
# def ingest_configurations(self, config: dict) -> None:
# """
# Load all configuration into the instance (self.config)
# """
# validate_dict_keys(
# keys_1=list(config.keys()),
# keys_2=self.ACCEPTED_CONFIG_KEYS,
# config_type="config",
# )
# self.config = config
# def establish_client(self) -> None:
# """
# With the given configurations, create the connection to the client (self.client)
# """
# ...
# def download_data(self, download_config: dict) -> pd.DataFrame:
# """
# When the client is established, we can load data
# """
# validate_dict_keys(
# keys_1=list(download_config.keys()),
# keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS,
# config_type="load_config",
# )
# return pd.DataFrame()
# def save_data(self, obj: object, save_config: dict) -> None:
# """
# When the client is established, we can save out objects
# """
# validate_dict_keys(
# keys_1=list(save_config.keys()),
# keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS,
# config_type="save_config",
# )
class AWSS3Client:
"""
Using the Minio s3 client, to do local testing
Using Boto3, set up the AWS client
"""
ACCEPTED_CONFIG_KEYS = [
"aws_access_key_id",
"aws_secret_access_key",
"endpoint_url",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
]
ACCEPTED_LOAD_CONFIG_KEYS = []
ACCEPTED_SAVE_CONFIG_KEYS = []
@ -45,38 +104,127 @@ class MinioClient:
Load all configuration into the instance (self.config)
"""
validate_dict_keys(
keys_1=list(config.keys()),
keys_2=self.ACCEPTED_CONFIG_KEYS,
config_type="config",
keys_1=self.ACCEPTED_CONFIG_KEYS,
keys_2=list(config.keys()),
config_type="Ingest Config",
)
self.config = config
def establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
logger.info(f"Establishing S3 Client")
session = boto3.Session()
if (
self.config["AWS_ACCESS_KEY_ID"] is None
and self.config["AWS_SECRET_ACCESS_KEY"] is None
):
self.client = session.client(service_name="s3") # Using local credentials
else:
self.client = session.client(
service_name="s3",
aws_access_key_id=self.config["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=self.config["AWS_SECRET_ACCESS_KEY"],
)
def download_data(self, location: dict) -> None:
"""
When the client is established, we can download data to a local file
"""
...
def load_data(self, load_config: dict) -> pd.DataFrame:
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data
When the client is established, we can load data in a buffer
"""
validate_dict_keys(
keys_1=list(load_config.keys()),
keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS,
config_type="load_config",
)
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
return pd.DataFrame()
bucket, key = location.strip("s3://").split("/", 1)
buffer = BytesIO()
self.client.download_fileobj(bucket, key, buffer)
buffer.seek(0)
def save_data(self, obj: object, save_config: dict) -> None:
return buffer
def load_database(self, database_location: dict) -> None:
"""
When the client is established, we can save out objects
When the client is established, we can read from a database
"""
validate_dict_keys(
keys_1=list(save_config.keys()),
keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS,
config_type="save_config",
)
...
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects from a local file
"""
...
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects from a buffer
"""
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
bucket, key = location.strip("s3://").split("/", 1)
self.client.upload_fileobj(buffer, bucket, key)
class LocalClient:
"""
Interacting with data locally
"""
def ingest_configurations(self, config: dict) -> None:
"""
Load all configuration into the instance (self.config)
"""
logger.info("Local - No configuration required")
def establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
logger.info("Local - No establishing client required")
def download_data(self, location: dict) -> None:
"""
When the client is established, we can download data to a file
"""
...
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data from a buffer
"""
with open(location, "rb") as file:
# Read the entire file into a BytesIO object
buffer = BytesIO(file.read())
buffer.seek(0)
return buffer
def load_database(self, database_location: dict) -> None:
"""
When the client is established, we can read from a database
"""
...
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects from a file
"""
...
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects from a buffer
"""
if not Path(location).parent.exists():
os.makedirs(Path(location).parent)
# Write the contents of the buffer to the local file
with open(location, "wb") as f:
f.write(buffer.getvalue())

View file

@ -0,0 +1,86 @@
"""
Implementations of the datahandler Protocol
"""
import json
import pandas as pd
from io import BytesIO
from typing import List
from core.interface.InterfaceDataHandler import DataHandler
from core.interface.InterfaceDataClient import DataClient
def datahandler_factory(datahandler_type: str) -> DataHandler:
"""
Determine which dataclient to use
"""
datahandler = {
"parquet": ParquetHandler(),
"json": JSONHandler()
# ADD MORE DATACLIENTS HERE
}
if datahandler_type not in datahandler:
raise ValueError("Dataloader type specified is not in factory")
return datahandler[datahandler_type]
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class ParquetHandler:
"""
Load and save Parquet datasets
"""
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
"""
When the client is established, we can load data
"""
df = pd.read_parquet(dataclient.load_data_as_buffer(location=location))
return df
def save_data(
self, dataclient: DataClient, obj: pd.DataFrame, location: str
) -> None:
"""
When the client is established, we can save out objects
"""
# Convert the Pandas DataFrame to a Parquet buffer
parquet_buffer = BytesIO()
obj.to_parquet(parquet_buffer, index=False)
dataclient.upload_data_from_buffer(buffer=parquet_buffer, location=location)
class JSONHandler:
"""
Load and save Parquet datasets
"""
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
"""
When the client is established, we can load data
"""
...
def save_data(self, dataclient: DataClient, obj: dict, location: str) -> None:
"""
When the client is established, we can save out objects
"""
# Serialize the dictionary to a JSON-formatted string
json_string = json.dumps(obj) # indent for pretty formatting
# Convert the JSON string to bytes (UTF-8 encoding)
json_bytes = json_string.encode("utf-8")
# Create a BytesIO object and write the JSON bytes to it
buffer = BytesIO()
buffer.write(json_bytes)
buffer.seek(0)
dataclient.upload_data_from_buffer(buffer=buffer, location=location)

View file

@ -0,0 +1,148 @@
"""
Interface for all FeatureProcessors
"""
"""
Create additional features from the dataset
"""
import pandas as pd
from typing import List, Callable, Union
from core.interface.InterfaceFeatureProcessor import FeatureProcessor
from core.Logger import logger
def feature_processor_factory(feature_processor_type: str) -> FeatureProcessor:
"""
Determine which dataclient to use
"""
feature_processor = {
"dataframe": DataFrameFeatureProcessor(),
# ADD MORE DATACLIENTS HERE
}
if feature_processor_type not in feature_processor:
raise ValueError("Dataloader type specified is not in factory")
return feature_processor[feature_processor_type]
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class DataFrameFeatureProcessor:
"""
Handle all feature manipulation before modelling
"""
ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS = [
"subsample_amount",
"subsample_seed",
"target",
"drop_columns",
"retain_features",
]
@staticmethod
def drop_unused_columns(df: pd.DataFrame, drop_columns: List[str]) -> pd.DataFrame:
"""
Remove the unused columns for RDS
"""
df = df.drop(columns=drop_columns)
return df
@staticmethod
def retain_features(
df: pd.DataFrame, target: str, retain_features: List[str] | None = None
) -> pd.DataFrame:
"""
Determine which columns to keep for modelling
"""
if retain_features is None:
retain_features = df.columns.to_list()
else:
if not set(retain_features).issubset(df.columns):
raise ValueError("Features defined is not contained in data")
retain_features = [target] + retain_features
df = df[retain_features]
return df
@staticmethod
def subsample_data(
df: pd.DataFrame, subsample_amount: int | None = None, subsample_seed: int = 0
) -> pd.DataFrame:
"""
Sample data to reduce number of rows for model building if needed
"""
if subsample_amount:
df = df.sample(subsample_amount, random_state=subsample_seed)
return df
@staticmethod
def apply_business_logic(
df: pd.DataFrame, business_logic: Union[dict[str, Callable], None]
) -> pd.DataFrame:
"""
If we need any additional business logic to be applied, post data cleaning
"""
if business_logic is None or len(business_logic) == 0:
return df
# TODO: to test
for _, value in business_logic.items():
df = value(df)
return df
@staticmethod
def generate_new_features(
df: pd.DataFrame, new_feature_funcs: Union[dict[str, Callable], None]
) -> pd.DataFrame:
"""
We can iterative over all keys (new feature column names), and apply their Calleabl function
"""
if new_feature_funcs is None or len(new_feature_funcs) == 0:
return df
# TODO: to test
for key, value in new_feature_funcs.items():
df[key] = value(df)
return df
def feature_process(
self,
obj: pd.DataFrame,
feature_processor_config: dict,
business_logic: dict | None = None,
new_feature_funcs: dict | None = None,
) -> pd.DataFrame:
"""
Pipeline to get data ready for building a model
"""
validate_dict_keys(
self.ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS,
list(feature_processor_config.keys()),
config_type="Feature Process Config",
)
df = self.subsample_data(
obj,
subsample_amount=feature_processor_config["subsample_amount"],
subsample_seed=feature_processor_config["subsample_seed"],
)
df = self.drop_unused_columns(
df, drop_columns=feature_processor_config["drop_columns"]
)
df = self.retain_features(
df,
retain_features=feature_processor_config["retain_features"],
target=feature_processor_config["target"],
)
df = self.apply_business_logic(df, business_logic=business_logic)
df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs)
return df

View file

@ -13,7 +13,9 @@ from pathlib import Path
from typing import Union, List
from sklearn import linear_model
from sklearn.svm import SVR
from autogluon.tabular import TabularDataset, TabularPredictor
from core.interface.InterfaceModels import MLModel
from core.Logger import logger
def model_factory(model_type: str) -> MLModel:
@ -23,6 +25,7 @@ def model_factory(model_type: str) -> MLModel:
models = {
"SKLearnLinearRegression": SKLearnLinearRegression(),
"SKLearnSVMRegression": SKLearnSVMRegression(),
"AutogluonAutoML": AutogluonAutoML()
# ADD OTHER MODELS HERE
}
@ -131,3 +134,78 @@ class SKLearnSVMRegression:
"""
self.predictions = pd.Series(self.model.predict(data))
return self.predictions
class AutogluonAutoML:
ACCEPTED_MODEL_HYPERPAREMETERS = [
"output_filepath",
"problem_type",
"eval_metric",
"time_limit",
"presets",
"excluded_model_types",
]
def load_model(self, path: Union[Path, str]) -> None:
"""
Method to load a model
"""
filepath = str(path)
self.model = TabularPredictor.load(path=filepath)
def save_model(self, path: Path) -> str:
"""
Method to save a model
"""
if self.model is None:
raise KeyError("No model trained/ loaded - unable to save")
logger.info("In local development mode - no need for s3 client")
logger.info("Using AutoGluon Model - Model saving already occured")
return str(path)
def train_model(
self, data: pd.DataFrame, target: str, model_hyperparameters: dict
) -> None:
"""
Method to train a model
"""
validate_dict_keys(
keys_1=list(model_hyperparameters.keys()),
keys_2=self.ACCEPTED_MODEL_HYPERPAREMETERS,
config_type="Model Hyperparameters",
)
if model_hyperparameters["output_filepath"] is None:
logger.error("Please specify a output_filepath in order to train a model")
exit(1)
AGdata = TabularDataset(data=data)
self.model = TabularPredictor(
label=target,
path=model_hyperparameters["output_filepath"],
problem_type=model_hyperparameters["problem_type"],
eval_metric=model_hyperparameters["eval_metric"],
).fit(
AGdata,
time_limit=model_hyperparameters["time_limit"],
presets=model_hyperparameters["presets"],
excluded_model_types=model_hyperparameters["excluded_model_types"],
)
def predict(self, data: pd.DataFrame) -> pd.Series:
"""
Method to predict
"""
if self.model is None:
print("No model loaded/ trained")
exit(1)
predictions = pd.Series(self.model.predict(data))
return predictions

View file

@ -3,6 +3,7 @@ Interface for all DataClient i.e. s3, database, local etc
"""
import pandas as pd
from io import BytesIO
from typing import Protocol
@ -23,13 +24,32 @@ class DataClient(Protocol):
"""
...
def load_data(self, load_config: dict) -> pd.DataFrame:
def download_data(self, location: dict) -> None:
"""
When the client is established, we can load data
"""
...
def save_data(self, obj: object, save_config: dict) -> None:
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data
"""
...
def load_database(self, database_location: dict) -> None:
"""
When the client is established, we can read from a database
"""
...
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects
"""
...
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects
"""
...

View file

@ -0,0 +1,26 @@
"""
Interface for all DataHandler i.e. Parquet data, csv data
"""
import pandas as pd
from typing import Protocol, Union, Any
from core.interface.InterfaceDataClient import DataClient
class DataHandler(Protocol):
"""
Declare the methods required for a DataClient
"""
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
"""
When the client is established, we can load data
"""
...
def save_data(
self, dataclient: DataClient, obj: Union[pd.DataFrame, dict, Any], location: str
) -> None:
"""
When the client is established, we can save out objects
"""

View file

@ -0,0 +1,23 @@
"""
Interface for all FeatureProcessors
"""
import pandas as pd
from typing import Protocol, Union, Any
class FeatureProcessor(Protocol):
"""
Declare the methods required for a FeatureProcessor
"""
def feature_process(
self,
obj: Union[pd.DataFrame, Any],
feature_processor_config: dict,
business_logic: dict,
new_feature_funcs: dict,
) -> Union[pd.DataFrame, Any]:
"""
Apply all transformation to the data and return the same object back (for now)
"""

View file

@ -5,8 +5,8 @@ stages:
deps:
- path: prepare_data.py
hash: md5
md5: 38b0836237bfa25ea0d71ca259610f4d
size: 3623
md5: 87a83e62512bff93c89f3e93c1ed248d
size: 5593
params:
configs/prepare_data.yaml:
output_test_filepath: ./data/prepared_data/test.parquet
@ -15,86 +15,108 @@ stages:
outs:
- path: data/prepared_data/
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
build_model:
cmd: python build_model.py
deps:
- path: build_model.py
hash: md5
md5: 152d52b7754b4c6f96f3481dc26562fc
size: 3576
md5: 662cd6b1562fbbc2c7d30dd0f2375a66
size: 3948
- path: data/prepared_data
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
params:
configs/build_model.yaml:
AutogluonAutoML:
output_filepath: ./data/model/autogluonmodel/
problem_type: regression
eval_metric: mean_absolute_error
time_limit: 200
presets: medium_quality
excluded_model_types:
SKLearnLinearRegression:
SKLearnSVMRegression:
kernel: linear
model_save_filepath: ./data/model/model.joblib
model_type: SKLearnLinearRegression
target: target
model_save_filepath: ./data/model/autogluonmodel/
model_type: AutogluonAutoML
outs:
- path: data/model/
hash: md5
md5: fb7ae4137b445dc91e840b794d72e940.dir
size: 1096
nfiles: 1
md5: 04a1e3bc625e7934c9f57a3fa2f1ea5c.dir
size: 1264795580
nfiles: 28
generate_predictions:
cmd: python generate_predictions.py
deps:
- path: data/model
hash: md5
md5: fb7ae4137b445dc91e840b794d72e940.dir
size: 1096
nfiles: 1
md5: 04a1e3bc625e7934c9f57a3fa2f1ea5c.dir
size: 1264795580
nfiles: 28
- path: data/prepared_data
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
- path: generate_predictions.py
hash: md5
md5: 424b9d89045eaf8a5a167ab2e0e363ae
size: 3400
md5: 76c45e7575ec979e6c4c8e2cf754a720
size: 4225
params:
configs/generate_predictions.yaml:
input_dataclient_type: local
output_dataclient_type: local
predictions_column_name: predictions
predictions_output_filepath: ./data/predictions/predictions.parquet
test_data_filepath: ./data/prepared_data/test.parquet
outs:
- path: data/predictions/
hash: md5
md5: 4d5854903b25bdae15d99c934ebcfb99.dir
size: 2531
md5: 44c298a28a0bb1367bb82d5da1a5dbd0.dir
size: 672577
nfiles: 1
generate_metrics:
cmd: python generate_metrics.py
deps:
- path: data/predictions
hash: md5
md5: 4d5854903b25bdae15d99c934ebcfb99.dir
size: 2531
md5: 44c298a28a0bb1367bb82d5da1a5dbd0.dir
size: 672577
nfiles: 1
- path: data/prepared_data
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
- path: generate_metrics.py
hash: md5
md5: b456e207b152298428ba79c083d1b6ff
size: 3728
md5: cc368845f62523575a9ed5c791e27815
size: 4329
params:
configs/generate_metrics.yaml:
dataclient_type: local
input_datahandler_type: parquet
metrics_output_filepath: ./metrics/metrics.json
metrics_type: Regression
output_datahandler_type: json
outs:
- path: metrics/metrics.json
hash: md5
md5: 3c9306e992b07491ff7e642949d6bc47
md5: 3f03e50a419af6730351a5016e2ae98a
size: 182
startup_cleanup:
cmd: python startup_cleanup.py
deps:
- path: startup_cleanup.py
hash: md5
md5: f7fe2ca33004b34530da0a3ab48c1790
size: 1458
params:
configs/startup_cleanup.yaml:
artefacts: ./data
metrics: ./metrics

View file

@ -1,4 +1,12 @@
stages:
startup_cleanup:
cmd: python startup_cleanup.py
deps:
- startup_cleanup.py
params:
- configs/startup_cleanup.yaml:
- artefacts
- metrics
prepare_data:
cmd: python prepare_data.py
deps:

View file

@ -11,9 +11,11 @@ from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.DataHandler import datahandler_factory
from core.Logger import logger
@ -33,9 +35,14 @@ generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml"
generate_metrics_params = yaml.safe_load(open(generate_metrics_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def generate_metrics(
dataclient: DataClient,
input_datahandler: DataHandler,
output_datahandler: DataHandler,
model: MLModel,
metrics: MLMetrics,
target: str,
@ -52,15 +59,17 @@ def generate_metrics(
logger.info("--- Loading test data ---")
logger.info("-------------------------")
# TODO: replace with client loader here
test_data = pd.read_parquet(test_data_filepath)
test_data = input_datahandler.load_data(
dataclient=dataclient, location=test_data_filepath
)
logger.info("---------------------------")
logger.info("--- Loading predictions ---")
logger.info("---------------------------")
# TODO: replace with client loader here
predictions = pd.read_parquet(predictions_output_filepath)
predictions = input_datahandler.load_data(
dataclient=dataclient, location=predictions_output_filepath
)
logger.info("--------------------------")
logger.info("--- Generating metrics ---")
@ -75,13 +84,9 @@ def generate_metrics(
logger.info("--- Saving metrics ---")
logger.info("----------------------")
# TODO: replace with client
if not Path(metrics_output_filepath).parent.exists():
os.mkdir(Path(metrics_output_filepath).parent)
with open(metrics_output_filepath, "w") as f:
json.dump(metrics_output, f)
output_datahandler.save_data(
dataclient=dataclient, obj=metrics_output, location=metrics_output_filepath
)
if __name__ == "__main__":
@ -91,14 +96,22 @@ if __name__ == "__main__":
logger.info("----------------------------")
model = model_factory(build_model_params["model_type"])
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
dataclient = dataclient_factory(generate_metrics_params["dataclient_type"])
input_datahandler = datahandler_factory(
generate_metrics_params["input_datahandler_type"]
)
output_datahandler = datahandler_factory(
generate_metrics_params["output_datahandler_type"]
)
metrics = metrics_factory(generate_metrics_params["metrics_type"])
generate_metrics(
dataclient=dataclient,
input_datahandler=input_datahandler,
output_datahandler=output_datahandler,
model=model,
metrics=metrics,
target=build_model_params["target"],
target=feature_process_params["feature_processor_config"]["target"],
test_data_filepath=generate_predictions_params["test_data_filepath"],
predictions_output_filepath=generate_predictions_params[
"predictions_output_filepath"

View file

@ -10,9 +10,10 @@ import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.DataHandler import datahandler_factory
from core.Logger import logger
@ -29,9 +30,14 @@ generate_predictions_path = (
)
generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def generate_predictions(
dataclient: DataClient,
input_dataclient: DataClient,
output_dataclient: DataClient,
datahandler: DataHandler,
model: MLModel,
target: str,
model_filepath: str,
@ -47,8 +53,9 @@ def generate_predictions(
logger.info("--- Loading test data ---")
logger.info("-------------------------")
# TODO: replace with client loader here
test_data = pd.read_parquet(test_data_filepath)
test_data = datahandler.load_data(
dataclient=input_dataclient, location=test_data_filepath
)
logger.info("---------------------")
logger.info("--- Loading model ---")
@ -60,7 +67,6 @@ def generate_predictions(
logger.info("--- Generating predictions ---")
logger.info("------------------------------")
# Clean test data for now
prediction_data = (
test_data.drop(columns=target) if target in test_data.columns else test_data
)
@ -71,13 +77,13 @@ def generate_predictions(
logger.info("--- Saving predictions ---")
logger.info("--------------------------")
# TODO: replace with client
predictions_df = pd.DataFrame(predictions)
predictions_df.columns = [predictions_column_name]
if not Path(predictions_output_filepath).parent.exists():
os.mkdir(Path(predictions_output_filepath).parent)
pd.DataFrame(predictions, columns=[predictions_column_name]).to_parquet(
predictions_output_filepath
datahandler.save_data(
dataclient=output_dataclient,
obj=predictions_df,
location=predictions_output_filepath,
)
@ -88,12 +94,23 @@ if __name__ == "__main__":
logger.info("----------------------------")
model = model_factory(build_model_params["model_type"])
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
# We may have different locations of loading hence why we use one specified in generate_predictions.yaml
# I.e. for metric runs, this will be a local data client
# For predictions, we will want a cloud data client
input_dataclient = dataclient_factory(
generate_predictions_params["input_dataclient_type"]
)
output_dataclient = dataclient_factory(
generate_predictions_params["output_dataclient_type"]
)
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
generate_predictions(
dataclient=dataclient,
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
datahandler=datahandler,
model=model,
target=build_model_params["target"],
target=feature_process_params["feature_processor_config"]["target"],
model_filepath=build_model_params["model_save_filepath"],
test_data_filepath=generate_predictions_params["test_data_filepath"],
predictions_output_filepath=generate_predictions_params[

View file

@ -11,13 +11,21 @@ from pathlib import Path
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.interface.InterfaceFeatureProcessor import FeatureProcessor
from configs.feature_processor_logic import business_logic, new_feature_funcs
from core.Logger import logger
from core.DataClient import dataclient_factory
from core.DataHandler import datahandler_factory
from core.FeatureProcessor import feature_processor_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
params_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
params = yaml.safe_load(open(params_path))
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def use_dummy_data() -> pd.DataFrame:
@ -31,8 +39,15 @@ def use_dummy_data() -> pd.DataFrame:
def prepare_data(
dataclient: DataClient,
input_dataclient: DataClient,
output_dataclient: DataClient,
datahandler: DataHandler,
feature_processor: FeatureProcessor,
data_filepath: str,
train_proportion: float,
feature_processor_config: dict,
business_logic: dict,
new_feature_funcs: dict,
output_train_filepath: str = "train.parquet",
output_test_filepath: str = "test.parquet",
) -> Tuple[pd.DataFrame, pd.DataFrame]:
@ -46,8 +61,18 @@ def prepare_data(
logger.info("--- Loading data ---")
logger.info("--------------------")
# TODO: REPLACE THIS WIL CLIENT AND LOAD DATA
data = use_dummy_data()
data = datahandler.load_data(dataclient=input_dataclient, location=data_filepath)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
logger.info("--------------------------")
data = feature_processor.feature_process(
data,
feature_processor_config=feature_processor_config,
business_logic=business_logic,
new_feature_funcs=new_feature_funcs,
)
logger.info("----------------------")
logger.info("--- Splitting data ---")
@ -57,29 +82,16 @@ def prepare_data(
data, train_size=train_proportion, test_size=(1 - train_proportion)
)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
logger.info("--------------------------")
logger.info("-----------------------")
logger.info("--- Outputting data ---")
logger.info("-----------------------")
# TODO: REPLACE WITH CLIENT
output_directory = Path(output_train_filepath)
if not output_directory.parent.exists():
os.makedirs(output_directory.parent)
output_directory = Path(output_test_filepath)
if not output_directory.parent.exists():
os.makedirs(output_directory.parent)
logger.info("--- Outputting train and test data ---")
train.to_parquet(output_train_filepath)
test.to_parquet(output_test_filepath)
# client.save_data(obj=train)
# client.save_data(obj=test)
datahandler.save_data(
dataclient=output_dataclient, obj=train, location=output_train_filepath
)
datahandler.save_data(
dataclient=output_dataclient, obj=test, location=output_test_filepath
)
return train, test
@ -94,17 +106,51 @@ if __name__ == "__main__":
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(params["dataclient_type"])
input_dataclient = dataclient_factory(prepare_data_params["input_dataclient_type"])
output_dataclient = dataclient_factory(
prepare_data_params["output_dataclient_type"]
)
input_dataclient.ingest_configurations(
config=prepare_data_params["input_dataclient"]
)
input_dataclient.establish_client()
output_dataclient.ingest_configurations(
config=prepare_data_params["output_dataclient"]
)
output_dataclient.establish_client()
logger.info("-----------------------------")
logger.info(f"--- Initiate DataHandler ---")
logger.info("-----------------------------")
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
logger.info("----------------------------------")
logger.info(f"--- Initiate FeatureProcessor ---")
logger.info("----------------------------------")
feature_processor = feature_processor_factory(
feature_process_params["feature_processor_type"]
)
logger.info("---------------------------")
logger.info(f"--- Prepare Data Stage ---")
logger.info("---------------------------")
prepare_data(
dataclient=dataclient,
train_proportion=params["train_proportion"],
output_train_filepath=params["output_train_filepath"],
output_test_filepath=params["output_test_filepath"],
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
datahandler=datahandler,
feature_processor=feature_processor,
data_filepath=prepare_data_params["data_filepath"],
train_proportion=prepare_data_params["train_proportion"],
output_train_filepath=prepare_data_params["output_train_filepath"],
output_test_filepath=prepare_data_params["output_test_filepath"],
feature_processor_config=feature_process_params["feature_processor_config"],
business_logic=business_logic,
new_feature_funcs=new_feature_funcs,
)
logger.info("-------------------------------")

View file

@ -1,7 +1,7 @@
joblib==1.3.2
boto3==1.28.17
pandas==1.5.3
scikit-learn==1.3.0
autogluon==0.8.2
pyarrow==13.0.0
pre-commit==3.3.3
sphinx==7.2.5

View file

@ -1,6 +1,6 @@
joblib==1.3.2
boto3==1.28.17
pandas==1.5.3
scikit-learn==1.3.0
autogluon==0.8.2
pyarrow==13.0.0
PyYAML==6.0.1

View file

@ -1,7 +1,7 @@
joblib==1.3.2
boto3==1.28.17
pandas==1.5.3
scikit-learn==1.3.0
autogluon==0.8.2
pyarrow==13.0.0
pre-commit==3.3.3
sphinx==7.2.5

View file

@ -1,3 +1,3 @@
boto3==1.28.41
pandas==1.5.3
scikit-learn==1.3.0
autogluon==0.8.2

View file

@ -0,0 +1,50 @@
"""
We remove all previous artefacts in the data folder for a dvc run
"""
import shutil
import yaml
from pathlib import Path
from core.Logger import logger
startup_cleanup_path = Path(__file__).parent / "configs" / "startup_cleanup.yaml"
startup_cleanup_params = yaml.safe_load(open(startup_cleanup_path))
def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None:
"""
Remove the directory where artefacts are stored
"""
artefact_directory_path = Path(artefacts_directory)
if artefact_directory_path.exists():
logger.info(f"Removing the directory: {artefacts_directory}")
shutil.rmtree(artefact_directory_path)
metrics_directory_path = Path(metrics_directory)
if metrics_directory_path.exists():
logger.info(f"Removing the directory: {metrics_directory}")
shutil.rmtree(metrics_directory_path)
if __name__ == "__main__":
logger.info("----------------------------")
logger.info(f"--- {__file__} - Start! ---")
logger.info("----------------------------")
logger.info("---------------------")
logger.info(f"--- Run Clean up ---")
logger.info("---------------------")
run_cleanup(
artefacts_directory=startup_cleanup_params["artefacts"],
metrics_directory=startup_cleanup_params["metrics"],
)
logger.info("-------------------------------")
logger.info(f"--- {__file__} - Complete! ---")
logger.info("-------------------------------")