using dataclient everywhere

This commit is contained in:
Michael Duong 2023-09-12 12:51:50 +01:00
parent d717f85354
commit b3c9bc8fd7
14 changed files with 164 additions and 74 deletions

Binary file not shown.

View file

@ -12,6 +12,7 @@ from core.Logger import logger
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.DataClient import dataclient_factory
from core.DataHandler import datahandler_factory
from core.MLModels import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
@ -22,6 +23,9 @@ prepare_data_params = yaml.safe_load(open(prepare_data_path))
build_model_path = Path(__file__).parent / "configs" / "build_model.yaml"
build_model_params = yaml.safe_load(open(build_model_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def build_model(
dataclient: DataClient,
@ -40,16 +44,16 @@ def build_model(
logger.info("--------------------------------------")
if train_data is None:
# TODO: replace this with the data client to load
if train_filepath is None:
raise ValueError(f"Need {train_filepath}")
train_data = pd.read_parquet(train_filepath)
raise ValueError(f"Need {train_filepath} if no data supplied")
train_data = datahandler.load_data(
dataclient=dataclient, location=train_filepath
)
if test_data is None:
# TODO: replace this with the data client to load
if test_filepath is None:
raise ValueError(f"Need {test_filepath}")
test_data = pd.read_parquet(test_filepath)
raise ValueError(f"Need {test_filepath} if no data supplied")
test_data = datahandler.load_data(dataclient=dataclient, location=test_filepath)
logger.info("----------------------")
logger.info("--- Training model ---")
@ -76,7 +80,13 @@ if __name__ == "__main__":
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
dataclient = dataclient_factory(prepare_data_params["output_dataclient_type"])
logger.info("-----------------------------")
logger.info(f"--- Initiate DataHandler ---")
logger.info("-----------------------------")
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
logger.info("-------------------------")
logger.info(f"--- Initiate MLModel ---")
@ -92,7 +102,7 @@ if __name__ == "__main__":
build_model(
dataclient=dataclient,
model=model,
target=build_model_params["target"],
target=feature_process_params["feature_processor_config"]["target"],
model_save_location=build_model_params["model_save_filepath"],
model_hyperparameters=build_model_params[model_type],
train_filepath=prepare_data_params["output_train_filepath"],

View file

@ -1,5 +1,4 @@
model_type: SKLearnLinearRegression
target: target
model_save_filepath: ./data/model/model.joblib
SKLearnLinearRegression: null

View file

@ -4,4 +4,4 @@ feature_processor_config:
subsample_seed: 0
target: RDSAP_CHANGE
drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE"]
retain_features: null
retain_features: ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]

View file

@ -1,2 +1,5 @@
dataclient_type: local
input_datahandler_type: parquet
output_datahandler_type: json
metrics_type: Regression
metrics_output_filepath: ./metrics/metrics.json

View file

@ -1,3 +1,5 @@
input_dataclient_type: local
output_dataclient_type: local
test_data_filepath: ./data/prepared_data/test.parquet
predictions_output_filepath: ./data/predictions/predictions.parquet
predictions_column_name: predictions

View file

@ -2,7 +2,9 @@
Implementations of the DataClient Protocol
"""
import os
import boto3
from pathlib import Path
from io import BytesIO
from typing import List
from core.interface.InterfaceDataClient import DataClient
@ -197,7 +199,12 @@ class LocalClient:
"""
When the client is established, we can load data from a buffer
"""
...
with open(location, "rb") as file:
# Read the entire file into a BytesIO object
buffer = BytesIO(file.read())
buffer.seek(0)
return buffer
def load_database(self, database_location: dict) -> None:
"""
@ -215,6 +222,8 @@ class LocalClient:
"""
When the client is established, we can save out objects from a buffer
"""
if not Path(location).parent.exists():
os.makedirs(Path(location).parent)
# Write the contents of the buffer to the local file
with open(location, "wb") as f:

View file

@ -2,6 +2,7 @@
Implementations of the datahandler Protocol
"""
import json
import pandas as pd
from io import BytesIO
from typing import List
@ -15,6 +16,7 @@ def datahandler_factory(datahandler_type: str) -> DataHandler:
"""
datahandler = {
"parquet": ParquetHandler(),
"json": JSONHandler()
# ADD MORE DATACLIENTS HERE
}
@ -52,3 +54,33 @@ class ParquetHandler:
obj.to_parquet(parquet_buffer, index=False)
dataclient.upload_data_from_buffer(buffer=parquet_buffer, location=location)
class JSONHandler:
"""
Load and save Parquet datasets
"""
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
"""
When the client is established, we can load data
"""
...
def save_data(self, dataclient: DataClient, obj: dict, location: str) -> None:
"""
When the client is established, we can save out objects
"""
# Serialize the dictionary to a JSON-formatted string
json_string = json.dumps(obj) # indent for pretty formatting
# Convert the JSON string to bytes (UTF-8 encoding)
json_bytes = json_string.encode("utf-8")
# Create a BytesIO object and write the JSON bytes to it
buffer = BytesIO()
buffer.write(json_bytes)
buffer.seek(0)
dataclient.upload_data_from_buffer(buffer=buffer, location=location)

View file

@ -54,7 +54,9 @@ class DataFrameFeatureProcessor:
return df
@staticmethod
def retain_features(df: pd.DataFrame, retain_features: List[str] | None = None):
def retain_features(
df: pd.DataFrame, target: str, retain_features: List[str] | None = None
) -> pd.DataFrame:
"""
Determine which columns to keep for modelling
"""
@ -62,8 +64,8 @@ class DataFrameFeatureProcessor:
retain_features = df.columns.to_list()
else:
if not set(retain_features).issubset(df.columns):
logger.error("Features defined is not contained in data")
exit(1)
raise ValueError("Features defined is not contained in data")
retain_features = [target] + retain_features
df = df[retain_features]
@ -83,7 +85,7 @@ class DataFrameFeatureProcessor:
@staticmethod
def apply_business_logic(
df: pd.DataFrame, business_logic: Union[dict[str, Callable], None]
):
) -> pd.DataFrame:
"""
If we need any additional business logic to be applied, post data cleaning
"""
@ -99,7 +101,7 @@ class DataFrameFeatureProcessor:
@staticmethod
def generate_new_features(
df: pd.DataFrame, new_feature_funcs: Union[dict[str, Callable], None]
):
) -> pd.DataFrame:
"""
We can iterative over all keys (new feature column names), and apply their Calleabl function
"""
@ -137,7 +139,9 @@ class DataFrameFeatureProcessor:
df, drop_columns=feature_processor_config["drop_columns"]
)
df = self.retain_features(
df, retain_features=feature_processor_config["retain_features"]
df,
retain_features=feature_processor_config["retain_features"],
target=feature_processor_config["target"],
)
df = self.apply_business_logic(df, business_logic=business_logic)
df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs)

View file

@ -19,7 +19,7 @@ class DataHandler(Protocol):
...
def save_data(
self, dataclient: DataClient, obj: Union[pd.DataFrame, Any], location: str
self, dataclient: DataClient, obj: Union[pd.DataFrame, dict, Any], location: str
) -> None:
"""
When the client is established, we can save out objects

View file

@ -1,4 +1,3 @@
/prepared_data
/model
/predictions
.DS_Store

View file

@ -5,8 +5,8 @@ stages:
deps:
- path: prepare_data.py
hash: md5
md5: 38b0836237bfa25ea0d71ca259610f4d
size: 3623
md5: 87a83e62512bff93c89f3e93c1ed248d
size: 5593
params:
configs/prepare_data.yaml:
output_test_filepath: ./data/prepared_data/test.parquet
@ -15,20 +15,20 @@ stages:
outs:
- path: data/prepared_data/
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
build_model:
cmd: python build_model.py
deps:
- path: build_model.py
hash: md5
md5: 152d52b7754b4c6f96f3481dc26562fc
size: 3576
md5: 58315ea127dcc127e2c22ab1205fddb2
size: 3925
- path: data/prepared_data
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
params:
configs/build_model.yaml:
@ -37,64 +37,68 @@ stages:
kernel: linear
model_save_filepath: ./data/model/model.joblib
model_type: SKLearnLinearRegression
target: target
outs:
- path: data/model/
hash: md5
md5: fb7ae4137b445dc91e840b794d72e940.dir
size: 1096
md5: 40fa511f4f401f9d2c7da814afe198ef.dir
size: 920
nfiles: 1
generate_predictions:
cmd: python generate_predictions.py
deps:
- path: data/model
hash: md5
md5: fb7ae4137b445dc91e840b794d72e940.dir
size: 1096
md5: 40fa511f4f401f9d2c7da814afe198ef.dir
size: 920
nfiles: 1
- path: data/prepared_data
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
- path: generate_predictions.py
hash: md5
md5: 424b9d89045eaf8a5a167ab2e0e363ae
size: 3400
md5: 13e920c0bae8ac51dd907631578f7045
size: 4126
params:
configs/generate_predictions.yaml:
input_dataclient_type: local
output_dataclient_type: local
predictions_column_name: predictions
predictions_output_filepath: ./data/predictions/predictions.parquet
test_data_filepath: ./data/prepared_data/test.parquet
outs:
- path: data/predictions/
hash: md5
md5: 4d5854903b25bdae15d99c934ebcfb99.dir
size: 2531
md5: 01e8d3483e1f90b5d92022ee4a65bbd7.dir
size: 945933
nfiles: 1
generate_metrics:
cmd: python generate_metrics.py
deps:
- path: data/predictions
hash: md5
md5: 4d5854903b25bdae15d99c934ebcfb99.dir
size: 2531
md5: 01e8d3483e1f90b5d92022ee4a65bbd7.dir
size: 945933
nfiles: 1
- path: data/prepared_data
hash: md5
md5: f0d462fe6b1a856a827409a745539285.dir
size: 36169
md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir
size: 4428909
nfiles: 2
- path: generate_metrics.py
hash: md5
md5: b456e207b152298428ba79c083d1b6ff
size: 3728
md5: 6276995b5e860d0f0bb4545aa5f5d347
size: 4259
params:
configs/generate_metrics.yaml:
dataclient_type: local
input_datahandler_type: parquet
metrics_output_filepath: ./metrics/metrics.json
metrics_type: Regression
output_datahandler_type: json
outs:
- path: metrics/metrics.json
hash: md5
md5: 3c9306e992b07491ff7e642949d6bc47
size: 182
md5: 995ccf3c6c3f6a975d22aa9bc9f4964e
size: 181

View file

@ -11,9 +11,11 @@ from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceMetrics import MLMetrics
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.DataHandler import datahandler_factory
from core.Logger import logger
@ -33,9 +35,14 @@ generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml"
generate_metrics_params = yaml.safe_load(open(generate_metrics_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def generate_metrics(
dataclient: DataClient,
input_datahandler: DataHandler,
output_datahandler: DataHandler,
model: MLModel,
metrics: MLMetrics,
target: str,
@ -52,15 +59,17 @@ def generate_metrics(
logger.info("--- Loading test data ---")
logger.info("-------------------------")
# TODO: replace with client loader here
test_data = pd.read_parquet(test_data_filepath)
test_data = input_datahandler.load_data(
dataclient=dataclient, location=test_data_filepath
)
logger.info("---------------------------")
logger.info("--- Loading predictions ---")
logger.info("---------------------------")
# TODO: replace with client loader here
predictions = pd.read_parquet(predictions_output_filepath)
predictions = input_datahandler.load_data(
dataclient=dataclient, location=predictions_output_filepath
)
logger.info("--------------------------")
logger.info("--- Generating metrics ---")
@ -75,13 +84,9 @@ def generate_metrics(
logger.info("--- Saving metrics ---")
logger.info("----------------------")
# TODO: replace with client
if not Path(metrics_output_filepath).parent.exists():
os.mkdir(Path(metrics_output_filepath).parent)
with open(metrics_output_filepath, "w") as f:
json.dump(metrics_output, f)
output_datahandler.save_data(
dataclient=dataclient, obj=metrics_output, location=metrics_output_filepath
)
if __name__ == "__main__":
@ -91,14 +96,22 @@ if __name__ == "__main__":
logger.info("----------------------------")
model = model_factory(build_model_params["model_type"])
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
dataclient = dataclient_factory(generate_metrics_params["dataclient_type"])
input_datahandler = datahandler_factory(
generate_metrics_params["input_datahandler_type"]
)
output_datahandler = datahandler_factory(
generate_metrics_params["output_datahandler_type"]
)
metrics = metrics_factory(generate_metrics_params["metrics_type"])
generate_metrics(
dataclient=dataclient,
input_datahandler=input_datahandler,
output_datahandler=output_datahandler,
model=model,
metrics=metrics,
target=build_model_params["target"],
target=feature_process_params["feature_processor_config"]["target"],
test_data_filepath=generate_predictions_params["test_data_filepath"],
predictions_output_filepath=generate_predictions_params[
"predictions_output_filepath"

View file

@ -10,9 +10,10 @@ import pandas as pd
from pathlib import Path
from core.interface.InterfaceModels import MLModel
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.DataClient import dataclient_factory
from core.MLModels import model_factory
from core.MLMetrics import metrics_factory
from core.DataHandler import datahandler_factory
from core.Logger import logger
@ -29,9 +30,14 @@ generate_predictions_path = (
)
generate_predictions_params = yaml.safe_load(open(generate_predictions_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def generate_predictions(
dataclient: DataClient,
input_dataclient: DataClient,
output_dataclient: DataClient,
datahandler: DataHandler,
model: MLModel,
target: str,
model_filepath: str,
@ -47,8 +53,9 @@ def generate_predictions(
logger.info("--- Loading test data ---")
logger.info("-------------------------")
# TODO: replace with client loader here
test_data = pd.read_parquet(test_data_filepath)
test_data = datahandler.load_data(
dataclient=input_dataclient, location=test_data_filepath
)
logger.info("---------------------")
logger.info("--- Loading model ---")
@ -60,7 +67,6 @@ def generate_predictions(
logger.info("--- Generating predictions ---")
logger.info("------------------------------")
# Clean test data for now
prediction_data = (
test_data.drop(columns=target) if target in test_data.columns else test_data
)
@ -71,13 +77,11 @@ def generate_predictions(
logger.info("--- Saving predictions ---")
logger.info("--------------------------")
# TODO: replace with client
if not Path(predictions_output_filepath).parent.exists():
os.mkdir(Path(predictions_output_filepath).parent)
pd.DataFrame(predictions, columns=[predictions_column_name]).to_parquet(
predictions_output_filepath
predictions_df = pd.DataFrame(predictions, columns=[predictions_column_name])
datahandler.save_data(
dataclient=output_dataclient,
obj=predictions_df,
location=predictions_output_filepath,
)
@ -88,12 +92,23 @@ if __name__ == "__main__":
logger.info("----------------------------")
model = model_factory(build_model_params["model_type"])
dataclient = dataclient_factory(prepare_data_params["dataclient_type"])
# We may have different locations of loading hence why we use one specified in generate_predictions.yaml
# I.e. for metric runs, this will be a local data client
# For predictions, we will want a cloud data client
input_dataclient = dataclient_factory(
generate_predictions_params["input_dataclient_type"]
)
output_dataclient = dataclient_factory(
generate_predictions_params["output_dataclient_type"]
)
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
generate_predictions(
dataclient=dataclient,
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
datahandler=datahandler,
model=model,
target=build_model_params["target"],
target=feature_process_params["feature_processor_config"]["target"],
model_filepath=build_model_params["model_save_filepath"],
test_data_filepath=generate_predictions_params["test_data_filepath"],
predictions_output_filepath=generate_predictions_params[