mirror of
https://github.com/Hestia-Homes/ML.git
synced 2026-06-08 11:17:25 +00:00
commit
dd1143a8b2
13 changed files with 293 additions and 421 deletions
14
.github/workflows/MLPipelinePostMerge.yml
vendored
14
.github/workflows/MLPipelinePostMerge.yml
vendored
|
|
@ -157,7 +157,13 @@ jobs:
|
|||
dvc push -r dev
|
||||
|
||||
Register-New-Model-Dev:
|
||||
if: github.event.pull_request.merged == true
|
||||
needs: [Register-Major-Model-Dev, Register-Minor-Model-Dev, Register-Patch-Model-Dev]
|
||||
if: |
|
||||
always() &&
|
||||
(needs.Register-Major-Model-Dev.result == 'success' || needs.Register-Major-Model-Dev.result == 'skipped') &&
|
||||
(needs.Register-Minor-Model-Dev.result == 'success' || needs.Register-Minor-Model-Dev.result == 'skipped') &&
|
||||
(needs.Register-Patch-Model-Dev.result == 'success' || needs.Register-Patch-Model-Dev.result == 'skipped')
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
|
@ -182,9 +188,10 @@ jobs:
|
|||
git config user.email "Github-Bot@no-reply.com"
|
||||
|
||||
latest_version=$(gto show ${REGISTER_MODEL_NAME}@latest --ref)
|
||||
|
||||
|
||||
new_tag=${latest_version}#dev
|
||||
|
||||
git pull #Get new model registry md file changes
|
||||
git tag -a ${new_tag} -m "Registering Latest Version to Dev"
|
||||
git push origin ${new_tag}
|
||||
|
||||
|
|
@ -195,8 +202,7 @@ jobs:
|
|||
|
||||
|
||||
Register-Prediction-Image-Dev:
|
||||
needs: Promote-Artefacts-To-Dev
|
||||
# needs: [Promote-Artefacts-To-Dev, Register-New-Model-Dev] WILL ADD BACK ONCE REGISTER WORKS
|
||||
needs: [Promote-Artefacts-To-Dev, Register-New-Model-Dev]
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ from core.Logger import logger
|
|||
from core.interface.InterfaceModels import MLModel
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
from core.DataClient import dataclient_factory
|
||||
from core.DataHandler import datahandler_factory
|
||||
from core.MLModels import model_factory
|
||||
|
||||
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
|
||||
|
|
@ -46,14 +45,12 @@ def build_model(
|
|||
if train_data is None:
|
||||
if train_filepath is None:
|
||||
raise ValueError(f"Need {train_filepath} if no data supplied")
|
||||
train_data = datahandler.load_data(
|
||||
dataclient=dataclient, location=train_filepath
|
||||
)
|
||||
train_data = dataclient.load_data(location=train_filepath)
|
||||
|
||||
if test_data is None:
|
||||
if test_filepath is None:
|
||||
raise ValueError(f"Need {test_filepath} if no data supplied")
|
||||
test_data = datahandler.load_data(dataclient=dataclient, location=test_filepath)
|
||||
test_data = dataclient.load_data(location=test_filepath)
|
||||
|
||||
logger.info("----------------------")
|
||||
logger.info("--- Training model ---")
|
||||
|
|
@ -80,14 +77,9 @@ if __name__ == "__main__":
|
|||
logger.info(f"--- Initiate DataClient ---")
|
||||
logger.info("----------------------------")
|
||||
|
||||
# Output of previous prepare data step, will be where the data is
|
||||
dataclient = dataclient_factory(prepare_data_params["output_dataclient_type"])
|
||||
|
||||
logger.info("-----------------------------")
|
||||
logger.info(f"--- Initiate DataHandler ---")
|
||||
logger.info("-----------------------------")
|
||||
|
||||
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
|
||||
|
||||
logger.info("-------------------------")
|
||||
logger.info(f"--- Initiate MLModel ---")
|
||||
logger.info("-------------------------")
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
model_type: AutogluonAutoML
|
||||
model_save_filepath: ./data/model/autogluonmodel/
|
||||
model_type: SKLearnLinearRegression
|
||||
model_save_filepath: ./data/model/model.joblib
|
||||
|
||||
SKLearnLinearRegression: null
|
||||
|
||||
|
|
@ -10,6 +10,6 @@ AutogluonAutoML:
|
|||
output_filepath: ./data/model/autogluonmodel/
|
||||
problem_type: regression
|
||||
eval_metric: mean_absolute_error
|
||||
time_limit: 200
|
||||
presets: medium_quality
|
||||
excluded_model_types: null
|
||||
time_limit: 400
|
||||
presets: high_quality
|
||||
excluded_model_types: ['KNN']
|
||||
|
|
|
|||
|
|
@ -4,5 +4,5 @@ feature_processor_config:
|
|||
subsample_seed: 0
|
||||
target: RDSAP_CHANGE
|
||||
drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE", "CARBON_CHANGE"]
|
||||
# retain_features: ["TOTAL_FLOOR_AREA_STARTING", "SAP_STARTING", "TOTAL_FLOOR_AREA_ENDING"]
|
||||
retain_features: null
|
||||
retain_features: ["TOTAL_FLOOR_AREA_STARTING", "SAP_STARTING", "HEAT_DEMAND_STARTING", "CARBON_STARTING", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "FIXED_LIGHTING_OUTLETS_COUNT", "PHOTO_SUPPLY_STARTING", "MULTI_GLAZE_PROPORTION_STARTING", "LOW_ENERGY_LIGHTING_STARTING", "NUMBER_OPEN_FIREPLACES_STARTING", "EXTENSION_COUNT_STARTING", "FLOOR_HEIGHT_STARTING", "PHOTO_SUPPLY_ENDING", "MULTI_GLAZE_PROPORTION_ENDING", "LOW_ENERGY_LIGHTING_ENDING", "NUMBER_OPEN_FIREPLACES_ENDING", "EXTENSION_COUNT_ENDING", "TOTAL_FLOOR_AREA_ENDING", "FLOOR_HEIGHT_ENDING", "DAYS_TO_STARTING", "DAYS_TO_ENDING"]
|
||||
# retain_features: null
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ input_dataclient_type: aws-s3
|
|||
output_dataclient_type: local
|
||||
datahandler_type: parquet
|
||||
data_filepath: s3://retrofit-data-dev/sap_change_model/dataset.parquet
|
||||
train_proportion: 0.1
|
||||
train_proportion: 0.9
|
||||
output_train_filepath: ./data/prepared_data/train.parquet
|
||||
output_test_filepath: ./data/prepared_data/test.parquet
|
||||
|
||||
|
|
|
|||
|
|
@ -3,28 +3,36 @@ Implementations of the DataClient Protocol
|
|||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import boto3
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from io import BytesIO
|
||||
from typing import List
|
||||
from typing import List, Union
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
from core.Logger import logger
|
||||
|
||||
|
||||
def dataclient_factory(dataclient_type: str) -> DataClient:
|
||||
def dataclient_factory(
|
||||
dataclient_type: str, dataclient_config: Union[dict, None] = None
|
||||
) -> DataClient:
|
||||
"""
|
||||
Determine which dataclient to use
|
||||
"""
|
||||
|
||||
if dataclient_config is None:
|
||||
dataclient_config = {}
|
||||
|
||||
dataclients = {
|
||||
"local": LocalClient(),
|
||||
"aws-s3": AWSS3Client(),
|
||||
"local": LocalClient,
|
||||
"aws-s3": AWSS3Client,
|
||||
# ADD MORE DATACLIENTS HERE
|
||||
}
|
||||
|
||||
if dataclient_type not in dataclients:
|
||||
raise ValueError("Dataclient type specified is not in factory")
|
||||
|
||||
return dataclients[dataclient_type]
|
||||
return dataclients[dataclient_type](**dataclient_config)
|
||||
|
||||
|
||||
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
|
||||
|
|
@ -32,144 +40,145 @@ def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
|
|||
raise ValueError(f"Incorrect {config_type} keys specified")
|
||||
|
||||
|
||||
# class MinioClient:
|
||||
# """
|
||||
# Using the Minio s3 client, to do local testing
|
||||
# """
|
||||
|
||||
# ACCEPTED_CONFIG_KEYS = [
|
||||
# "aws_access_key_id",
|
||||
# "aws_secret_access_key",
|
||||
# "endpoint_url",
|
||||
# ]
|
||||
# ACCEPTED_LOAD_CONFIG_KEYS = []
|
||||
# ACCEPTED_SAVE_CONFIG_KEYS = []
|
||||
|
||||
# def ingest_configurations(self, config: dict) -> None:
|
||||
# """
|
||||
# Load all configuration into the instance (self.config)
|
||||
# """
|
||||
# validate_dict_keys(
|
||||
# keys_1=list(config.keys()),
|
||||
# keys_2=self.ACCEPTED_CONFIG_KEYS,
|
||||
# config_type="config",
|
||||
# )
|
||||
|
||||
# self.config = config
|
||||
|
||||
# def establish_client(self) -> None:
|
||||
# """
|
||||
# With the given configurations, create the connection to the client (self.client)
|
||||
# """
|
||||
|
||||
# ...
|
||||
|
||||
# def download_data(self, download_config: dict) -> pd.DataFrame:
|
||||
# """
|
||||
# When the client is established, we can load data
|
||||
# """
|
||||
# validate_dict_keys(
|
||||
# keys_1=list(download_config.keys()),
|
||||
# keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS,
|
||||
# config_type="load_config",
|
||||
# )
|
||||
|
||||
# return pd.DataFrame()
|
||||
|
||||
# def save_data(self, obj: object, save_config: dict) -> None:
|
||||
# """
|
||||
# When the client is established, we can save out objects
|
||||
# """
|
||||
# validate_dict_keys(
|
||||
# keys_1=list(save_config.keys()),
|
||||
# keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS,
|
||||
# config_type="save_config",
|
||||
# )
|
||||
|
||||
|
||||
class AWSS3Client:
|
||||
"""
|
||||
Using Boto3, set up the AWS client
|
||||
"""
|
||||
|
||||
ACCEPTED_CONFIG_KEYS = [
|
||||
"AWS_ACCESS_KEY_ID",
|
||||
"AWS_SECRET_ACCESS_KEY",
|
||||
]
|
||||
def __init__(
|
||||
self,
|
||||
AWS_ACCESS_KEY_ID: Union[str, None],
|
||||
AWS_SECRET_ACCESS_KEY: Union[str, None],
|
||||
ENDPOINT_URL: Union[str, None],
|
||||
):
|
||||
self.AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID
|
||||
self.AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY
|
||||
self.ENDPOINT_URL = ENDPOINT_URL
|
||||
|
||||
self._establish_client()
|
||||
|
||||
ACCEPTED_LOAD_CONFIG_KEYS = []
|
||||
ACCEPTED_SAVE_CONFIG_KEYS = []
|
||||
|
||||
def ingest_configurations(self, config: dict) -> None:
|
||||
"""
|
||||
Load all configuration into the instance (self.config)
|
||||
"""
|
||||
validate_dict_keys(
|
||||
keys_1=self.ACCEPTED_CONFIG_KEYS,
|
||||
keys_2=list(config.keys()),
|
||||
config_type="Ingest Config",
|
||||
)
|
||||
self.config = config
|
||||
|
||||
def establish_client(self) -> None:
|
||||
def _establish_client(self) -> None:
|
||||
"""
|
||||
With the given configurations, create the connection to the client (self.client)
|
||||
"""
|
||||
logger.info(f"Establishing S3 Client")
|
||||
session = boto3.Session()
|
||||
|
||||
if (
|
||||
self.config["AWS_ACCESS_KEY_ID"] is None
|
||||
and self.config["AWS_SECRET_ACCESS_KEY"] is None
|
||||
):
|
||||
if self.AWS_ACCESS_KEY_ID is None and self.AWS_SECRET_ACCESS_KEY is None:
|
||||
self.client = session.client(service_name="s3") # Using local credentials
|
||||
else:
|
||||
self.client = session.client(
|
||||
service_name="s3",
|
||||
aws_access_key_id=self.config["AWS_ACCESS_KEY_ID"],
|
||||
aws_secret_access_key=self.config["AWS_SECRET_ACCESS_KEY"],
|
||||
aws_access_key_id=self.AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
|
||||
endpoint_url=self.ENDPOINT_URL,
|
||||
)
|
||||
|
||||
def download_data(self, location: dict) -> None:
|
||||
def load_data(
|
||||
self, location: str, load_config: Union[dict, None] = None
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
When the client is established, we can download data to a local file
|
||||
Generic to load data
|
||||
"""
|
||||
...
|
||||
|
||||
def load_data_as_buffer(self, location: str) -> BytesIO:
|
||||
"""
|
||||
When the client is established, we can load data in a buffer
|
||||
"""
|
||||
if not location.startswith("s3://"):
|
||||
raise ValueError("S3 file path specified without s3://")
|
||||
|
||||
if load_config is None:
|
||||
load_config = {}
|
||||
|
||||
filetype = Path(location).suffix
|
||||
|
||||
load_methods = {
|
||||
".parquet": self._load_parquet,
|
||||
# "": _load_directory(**load_config),
|
||||
# ADD MORE load_methods HERE
|
||||
}
|
||||
|
||||
if filetype not in load_methods:
|
||||
raise ValueError("load methods specified is not in factory")
|
||||
|
||||
return load_methods[filetype](location=location, load_config=load_config)
|
||||
|
||||
def save_data(
|
||||
self,
|
||||
obj: object,
|
||||
location: str,
|
||||
save_config: Union[dict, None] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Generic to save data
|
||||
"""
|
||||
|
||||
if not location.startswith("s3://"):
|
||||
raise ValueError("S3 file path specified without s3://")
|
||||
|
||||
if save_config is None:
|
||||
save_config = {}
|
||||
|
||||
filetype = Path(location).suffix
|
||||
|
||||
save_methods = {
|
||||
".parquet": self._save_parquet,
|
||||
# "": _save_directory(**save_config),
|
||||
# ADD MORE save_methods HERE
|
||||
}
|
||||
|
||||
if filetype not in save_methods:
|
||||
raise ValueError("save_methods specified is not in factory")
|
||||
|
||||
return save_methods[filetype](
|
||||
obj=obj, location=location, save_config=save_config
|
||||
)
|
||||
|
||||
def _save_parquet(self, obj: object, location: str, save_config: dict):
|
||||
"""
|
||||
Save object as parquet
|
||||
"""
|
||||
|
||||
buffer = BytesIO()
|
||||
obj.to_parquet(buffer, index=False)
|
||||
|
||||
bucket, key = location.strip("s3://").split("/", 1)
|
||||
self.client.upload_fileobj(buffer, bucket, key)
|
||||
|
||||
def _load_parquet(self, location: str, load_config: dict) -> pd.DataFrame:
|
||||
"""
|
||||
Load a parquet file
|
||||
"""
|
||||
|
||||
bucket, key = location.strip("s3://").split("/", 1)
|
||||
buffer = BytesIO()
|
||||
self.client.download_fileobj(bucket, key, buffer)
|
||||
buffer.seek(0)
|
||||
|
||||
return buffer
|
||||
df = pd.read_parquet(buffer, **load_config)
|
||||
|
||||
def load_database(self, database_location: dict) -> None:
|
||||
"""
|
||||
When the client is established, we can read from a database
|
||||
"""
|
||||
...
|
||||
return df
|
||||
|
||||
def upload_data(self, location: str) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects from a local file
|
||||
"""
|
||||
...
|
||||
# def load_data_as_buffer(self, location: str) -> BytesIO:
|
||||
# """
|
||||
# When the client is established, we can load data in a buffer
|
||||
# """
|
||||
|
||||
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects from a buffer
|
||||
"""
|
||||
if not location.startswith("s3://"):
|
||||
raise ValueError("S3 file path specified without s3://")
|
||||
# bucket, key = location.strip("s3://").split("/", 1)
|
||||
# buffer = BytesIO()
|
||||
# self.client.download_fileobj(bucket, key, buffer)
|
||||
# buffer.seek(0)
|
||||
|
||||
bucket, key = location.strip("s3://").split("/", 1)
|
||||
self.client.upload_fileobj(buffer, bucket, key)
|
||||
# return buffer
|
||||
|
||||
# def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
|
||||
# """
|
||||
# When the client is established, we can save out objects from a buffer
|
||||
# """
|
||||
# if not location.startswith("s3://"):
|
||||
# raise ValueError("S3 file path specified without s3://")
|
||||
|
||||
# bucket, key = location.strip("s3://").split("/", 1)
|
||||
# self.client.upload_fileobj(buffer, bucket, key)
|
||||
|
||||
|
||||
class LocalClient:
|
||||
|
|
@ -177,54 +186,105 @@ class LocalClient:
|
|||
Interacting with data locally
|
||||
"""
|
||||
|
||||
def ingest_configurations(self, config: dict) -> None:
|
||||
def __init__(self):
|
||||
"""
|
||||
Load all configuration into the instance (self.config)
|
||||
No initialisation needed for local client
|
||||
"""
|
||||
logger.info("Local - No configuration required")
|
||||
self._establish_client()
|
||||
|
||||
def establish_client(self) -> None:
|
||||
def _establish_client(self) -> None:
|
||||
"""
|
||||
With the given configurations, create the connection to the client (self.client)
|
||||
"""
|
||||
logger.info("Local - No establishing client required")
|
||||
|
||||
def download_data(self, location: dict) -> None:
|
||||
def load_data(
|
||||
self, location: str, load_config: Union[dict, None] = None
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
When the client is established, we can download data to a file
|
||||
Generic to load data
|
||||
"""
|
||||
...
|
||||
|
||||
def load_data_as_buffer(self, location: str) -> BytesIO:
|
||||
"""
|
||||
When the client is established, we can load data from a buffer
|
||||
"""
|
||||
with open(location, "rb") as file:
|
||||
# Read the entire file into a BytesIO object
|
||||
buffer = BytesIO(file.read())
|
||||
buffer.seek(0)
|
||||
if load_config is None:
|
||||
load_config = {}
|
||||
|
||||
return buffer
|
||||
filetype = Path(location).suffix
|
||||
|
||||
def load_database(self, database_location: dict) -> None:
|
||||
"""
|
||||
When the client is established, we can read from a database
|
||||
"""
|
||||
...
|
||||
load_methods = {
|
||||
".parquet": self._load_parquet,
|
||||
# "": _load_directory(**load_config),
|
||||
# ADD MORE load_methods HERE
|
||||
}
|
||||
|
||||
def upload_data(self, location: str) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects from a file
|
||||
"""
|
||||
...
|
||||
if filetype not in load_methods:
|
||||
raise ValueError("load methods specified is not in factory")
|
||||
|
||||
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
|
||||
return load_methods[filetype](location=location, load_config=load_config)
|
||||
|
||||
def save_data(
|
||||
self,
|
||||
obj: object,
|
||||
location: str,
|
||||
save_config: Union[dict, None] = None,
|
||||
) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects from a buffer
|
||||
Generic to save data
|
||||
"""
|
||||
if not Path(location).parent.exists():
|
||||
os.makedirs(Path(location).parent)
|
||||
|
||||
if save_config is None:
|
||||
save_config = {}
|
||||
|
||||
save_methods = {
|
||||
".parquet": self._save_parquet,
|
||||
".json": self._save_json
|
||||
# "": _save_directory(**save_config),
|
||||
# ADD MORE save_methods HERE
|
||||
}
|
||||
|
||||
filetype = Path(location).suffix
|
||||
|
||||
if filetype not in save_methods:
|
||||
raise ValueError("save_methods specified is not in factory")
|
||||
|
||||
return save_methods[filetype](
|
||||
obj=obj, location=location, save_config=save_config
|
||||
)
|
||||
|
||||
def _load_parquet(self, location: str, load_config: dict) -> pd.DataFrame:
|
||||
"""
|
||||
Load a parquet file
|
||||
"""
|
||||
|
||||
df = pd.read_parquet(location, **load_config)
|
||||
|
||||
return df
|
||||
|
||||
def _save_parquet(self, obj: pd.DataFrame, location: str, save_config: dict):
|
||||
"""
|
||||
Save object as parquet
|
||||
"""
|
||||
|
||||
obj.to_parquet(location, **save_config)
|
||||
|
||||
def _save_json(self, obj: dict, location: str, save_config: dict):
|
||||
"""
|
||||
Save object as json
|
||||
"""
|
||||
# Serialize the dictionary to a JSON-formatted string
|
||||
json_string = json.dumps(obj) # indent for pretty formatting
|
||||
|
||||
# Convert the JSON string to bytes (UTF-8 encoding)
|
||||
json_bytes = json_string.encode("utf-8")
|
||||
|
||||
# Create a BytesIO object and write the JSON bytes to it
|
||||
buffer = BytesIO()
|
||||
buffer.write(json_bytes)
|
||||
|
||||
buffer.seek(0)
|
||||
|
||||
# Write the contents of the buffer to the local file
|
||||
with open(location, "wb") as f:
|
||||
f.write(buffer.getvalue())
|
||||
|
|
|
|||
|
|
@ -1,86 +0,0 @@
|
|||
"""
|
||||
Implementations of the datahandler Protocol
|
||||
"""
|
||||
|
||||
import json
|
||||
import pandas as pd
|
||||
from io import BytesIO
|
||||
from typing import List
|
||||
from core.interface.InterfaceDataHandler import DataHandler
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
|
||||
|
||||
def datahandler_factory(datahandler_type: str) -> DataHandler:
|
||||
"""
|
||||
Determine which dataclient to use
|
||||
"""
|
||||
datahandler = {
|
||||
"parquet": ParquetHandler(),
|
||||
"json": JSONHandler()
|
||||
# ADD MORE DATACLIENTS HERE
|
||||
}
|
||||
|
||||
if datahandler_type not in datahandler:
|
||||
raise ValueError("Dataloader type specified is not in factory")
|
||||
|
||||
return datahandler[datahandler_type]
|
||||
|
||||
|
||||
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
|
||||
if not set(keys_1).issubset(keys_2):
|
||||
raise ValueError(f"Incorrect {config_type} keys specified")
|
||||
|
||||
|
||||
class ParquetHandler:
|
||||
"""
|
||||
Load and save Parquet datasets
|
||||
"""
|
||||
|
||||
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
|
||||
"""
|
||||
When the client is established, we can load data
|
||||
"""
|
||||
df = pd.read_parquet(dataclient.load_data_as_buffer(location=location))
|
||||
return df
|
||||
|
||||
def save_data(
|
||||
self, dataclient: DataClient, obj: pd.DataFrame, location: str
|
||||
) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects
|
||||
"""
|
||||
# Convert the Pandas DataFrame to a Parquet buffer
|
||||
parquet_buffer = BytesIO()
|
||||
obj.to_parquet(parquet_buffer, index=False)
|
||||
|
||||
dataclient.upload_data_from_buffer(buffer=parquet_buffer, location=location)
|
||||
|
||||
|
||||
class JSONHandler:
|
||||
"""
|
||||
Load and save Parquet datasets
|
||||
"""
|
||||
|
||||
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
|
||||
"""
|
||||
When the client is established, we can load data
|
||||
"""
|
||||
...
|
||||
|
||||
def save_data(self, dataclient: DataClient, obj: dict, location: str) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects
|
||||
"""
|
||||
# Serialize the dictionary to a JSON-formatted string
|
||||
json_string = json.dumps(obj) # indent for pretty formatting
|
||||
|
||||
# Convert the JSON string to bytes (UTF-8 encoding)
|
||||
json_bytes = json_string.encode("utf-8")
|
||||
|
||||
# Create a BytesIO object and write the JSON bytes to it
|
||||
buffer = BytesIO()
|
||||
buffer.write(json_bytes)
|
||||
|
||||
buffer.seek(0)
|
||||
|
||||
dataclient.upload_data_from_buffer(buffer=buffer, location=location)
|
||||
|
|
@ -4,7 +4,7 @@ Interface for all DataClient i.e. s3, database, local etc
|
|||
|
||||
import pandas as pd
|
||||
from io import BytesIO
|
||||
from typing import Protocol
|
||||
from typing import Protocol, Union
|
||||
|
||||
|
||||
class DataClient(Protocol):
|
||||
|
|
@ -12,44 +12,20 @@ class DataClient(Protocol):
|
|||
Declare the methods required for a DataClient
|
||||
"""
|
||||
|
||||
def ingest_configurations(self, config: dict) -> None:
|
||||
"""
|
||||
Load all configuration into the instance (self.config)
|
||||
"""
|
||||
...
|
||||
|
||||
def establish_client(self) -> None:
|
||||
def _establish_client(self) -> None:
|
||||
"""
|
||||
With the given configurations, create the connection to the client (self.client)
|
||||
"""
|
||||
...
|
||||
|
||||
def download_data(self, location: dict) -> None:
|
||||
def load_data(self, location: str, load_config: Union[dict, None]) -> pd.DataFrame:
|
||||
"""
|
||||
When the client is established, we can load data
|
||||
Generic to load data
|
||||
"""
|
||||
...
|
||||
|
||||
def load_data_as_buffer(self, location: str) -> BytesIO:
|
||||
def save_data(
|
||||
self, obj: object, location: str, save_config: Union[dict, None]
|
||||
) -> None:
|
||||
"""
|
||||
When the client is established, we can load data
|
||||
Generic to save data
|
||||
"""
|
||||
...
|
||||
|
||||
def load_database(self, database_location: dict) -> None:
|
||||
"""
|
||||
When the client is established, we can read from a database
|
||||
"""
|
||||
...
|
||||
|
||||
def upload_data(self, location: str) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects
|
||||
"""
|
||||
...
|
||||
|
||||
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects
|
||||
"""
|
||||
...
|
||||
|
|
|
|||
|
|
@ -1,26 +0,0 @@
|
|||
"""
|
||||
Interface for all DataHandler i.e. Parquet data, csv data
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
from typing import Protocol, Union, Any
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
|
||||
|
||||
class DataHandler(Protocol):
|
||||
"""
|
||||
Declare the methods required for a DataClient
|
||||
"""
|
||||
|
||||
def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame:
|
||||
"""
|
||||
When the client is established, we can load data
|
||||
"""
|
||||
...
|
||||
|
||||
def save_data(
|
||||
self, dataclient: DataClient, obj: Union[pd.DataFrame, dict, Any], location: str
|
||||
) -> None:
|
||||
"""
|
||||
When the client is established, we can save out objects
|
||||
"""
|
||||
|
|
@ -5,30 +5,30 @@ stages:
|
|||
deps:
|
||||
- path: prepare_data.py
|
||||
hash: md5
|
||||
md5: 9c31bfb1b75ea3c9685ec459cbb50e62
|
||||
size: 5921
|
||||
md5: 2cfe9e3012280e0cecdb84da12c974d9
|
||||
size: 5009
|
||||
params:
|
||||
configs/prepare_data.yaml:
|
||||
output_test_filepath: ./data/prepared_data/test.parquet
|
||||
output_train_filepath: ./data/prepared_data/train.parquet
|
||||
train_proportion: 0.1
|
||||
train_proportion: 0.9
|
||||
outs:
|
||||
- path: data/prepared_data/
|
||||
hash: md5
|
||||
md5: febdc8362200167078dfa578cf2bc889.dir
|
||||
size: 24296908
|
||||
md5: ea0a2baf3931e692d6344ba609331089.dir
|
||||
size: 13232732
|
||||
nfiles: 2
|
||||
build_model:
|
||||
cmd: python build_model.py
|
||||
deps:
|
||||
- path: build_model.py
|
||||
hash: md5
|
||||
md5: 662cd6b1562fbbc2c7d30dd0f2375a66
|
||||
size: 3948
|
||||
md5: 46bcc34f20c6851cd987640889eefde6
|
||||
size: 3671
|
||||
- path: data/prepared_data
|
||||
hash: md5
|
||||
md5: febdc8362200167078dfa578cf2bc889.dir
|
||||
size: 24296908
|
||||
md5: ea0a2baf3931e692d6344ba609331089.dir
|
||||
size: 13232732
|
||||
nfiles: 2
|
||||
params:
|
||||
configs/build_model.yaml:
|
||||
|
|
@ -36,37 +36,38 @@ stages:
|
|||
output_filepath: ./data/model/autogluonmodel/
|
||||
problem_type: regression
|
||||
eval_metric: mean_absolute_error
|
||||
time_limit: 200
|
||||
presets: medium_quality
|
||||
time_limit: 400
|
||||
presets: high_quality
|
||||
excluded_model_types:
|
||||
- KNN
|
||||
SKLearnLinearRegression:
|
||||
SKLearnSVMRegression:
|
||||
kernel: linear
|
||||
model_save_filepath: ./data/model/autogluonmodel/
|
||||
model_type: AutogluonAutoML
|
||||
model_save_filepath: ./data/model/model.joblib
|
||||
model_type: SKLearnLinearRegression
|
||||
outs:
|
||||
- path: data/model/
|
||||
hash: md5
|
||||
md5: 154f823d56a9892948a633789d9b08a5.dir
|
||||
size: 680552724
|
||||
nfiles: 18
|
||||
md5: eb2b910dec66481e75bb6058622f6e55.dir
|
||||
size: 1832
|
||||
nfiles: 1
|
||||
generate_predictions:
|
||||
cmd: python generate_predictions.py
|
||||
deps:
|
||||
- path: data/model
|
||||
hash: md5
|
||||
md5: 154f823d56a9892948a633789d9b08a5.dir
|
||||
size: 680552724
|
||||
nfiles: 18
|
||||
md5: eb2b910dec66481e75bb6058622f6e55.dir
|
||||
size: 1832
|
||||
nfiles: 1
|
||||
- path: data/prepared_data
|
||||
hash: md5
|
||||
md5: febdc8362200167078dfa578cf2bc889.dir
|
||||
size: 24296908
|
||||
md5: ea0a2baf3931e692d6344ba609331089.dir
|
||||
size: 13232732
|
||||
nfiles: 2
|
||||
- path: generate_predictions.py
|
||||
hash: md5
|
||||
md5: 32c0ecd082e1f8fc4426338d6629979c
|
||||
size: 4686
|
||||
md5: d412c8c9b48b59a29f569633280a6e7f
|
||||
size: 4237
|
||||
params:
|
||||
configs/generate_predictions.yaml:
|
||||
input_dataclient_type: local
|
||||
|
|
@ -77,26 +78,26 @@ stages:
|
|||
outs:
|
||||
- path: data/predictions/
|
||||
hash: md5
|
||||
md5: d8abefde18d78588158ef6acf282e2ed.dir
|
||||
size: 2948553
|
||||
md5: 85ec3fa0cb387a7775eccd23185f7966.dir
|
||||
size: 643406
|
||||
nfiles: 1
|
||||
generate_metrics:
|
||||
cmd: python generate_metrics.py
|
||||
deps:
|
||||
- path: data/predictions
|
||||
hash: md5
|
||||
md5: d8abefde18d78588158ef6acf282e2ed.dir
|
||||
size: 2948553
|
||||
md5: 85ec3fa0cb387a7775eccd23185f7966.dir
|
||||
size: 643406
|
||||
nfiles: 1
|
||||
- path: data/prepared_data
|
||||
hash: md5
|
||||
md5: febdc8362200167078dfa578cf2bc889.dir
|
||||
size: 24296908
|
||||
md5: ea0a2baf3931e692d6344ba609331089.dir
|
||||
size: 13232732
|
||||
nfiles: 2
|
||||
- path: generate_metrics.py
|
||||
hash: md5
|
||||
md5: 4709c42d93f8e717a3d9e4958e46cd76
|
||||
size: 4587
|
||||
md5: 5577a28107458dc1e6bcaaa098388095
|
||||
size: 4144
|
||||
params:
|
||||
configs/generate_metrics.yaml:
|
||||
dataclient_type: local
|
||||
|
|
@ -107,8 +108,8 @@ stages:
|
|||
outs:
|
||||
- path: metrics/metrics.json
|
||||
hash: md5
|
||||
md5: f5aaae75ea74241500cd1ce76751c579
|
||||
size: 182
|
||||
md5: d79f798a272e6b50597be4d08ae48fa8
|
||||
size: 180
|
||||
startup_cleanup:
|
||||
cmd: python startup_cleanup.py
|
||||
deps:
|
||||
|
|
|
|||
|
|
@ -5,17 +5,14 @@ After the model is built, we can evaluate its performance
|
|||
|
||||
import os
|
||||
import yaml
|
||||
import json
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from core.interface.InterfaceModels import MLModel
|
||||
from core.interface.InterfaceMetrics import MLMetrics
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
from core.interface.InterfaceDataHandler import DataHandler
|
||||
from core.DataClient import dataclient_factory
|
||||
from core.MLModels import model_factory
|
||||
from core.MLMetrics import metrics_factory
|
||||
from core.DataHandler import datahandler_factory
|
||||
from core.Logger import logger
|
||||
|
||||
|
||||
|
|
@ -43,9 +40,8 @@ feature_process_params = yaml.safe_load(open(feature_process_path))
|
|||
|
||||
|
||||
def generate_metrics(
|
||||
dataclient: DataClient,
|
||||
input_datahandler: DataHandler,
|
||||
output_datahandler: DataHandler,
|
||||
input_dataclient: DataClient,
|
||||
output_dataclient: DataClient,
|
||||
model: MLModel,
|
||||
metrics: MLMetrics,
|
||||
target: str,
|
||||
|
|
@ -62,17 +58,15 @@ def generate_metrics(
|
|||
logger.info("--- Loading test data ---")
|
||||
logger.info("-------------------------")
|
||||
|
||||
test_data = input_datahandler.load_data(
|
||||
dataclient=dataclient, location=test_data_filepath
|
||||
test_data = input_dataclient.load_data(
|
||||
location=test_data_filepath,
|
||||
)
|
||||
|
||||
logger.info("---------------------------")
|
||||
logger.info("--- Loading predictions ---")
|
||||
logger.info("---------------------------")
|
||||
|
||||
predictions = input_datahandler.load_data(
|
||||
dataclient=dataclient, location=predictions_output_filepath
|
||||
)
|
||||
predictions = input_dataclient.load_data(location=predictions_output_filepath)
|
||||
|
||||
logger.info("--------------------------")
|
||||
logger.info("--- Generating metrics ---")
|
||||
|
|
@ -87,9 +81,7 @@ def generate_metrics(
|
|||
logger.info("--- Saving metrics ---")
|
||||
logger.info("----------------------")
|
||||
|
||||
output_datahandler.save_data(
|
||||
dataclient=dataclient, obj=metrics_output, location=metrics_output_filepath
|
||||
)
|
||||
output_dataclient.save_data(obj=metrics_output, location=metrics_output_filepath)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
@ -100,23 +92,18 @@ if __name__ == "__main__":
|
|||
|
||||
model = model_factory(build_model_params["model_type"])
|
||||
|
||||
# Use data client for input and output, as we use dvc to cache later to the cloud
|
||||
dataclient_type = generate_metrics_params["dataclient_type"]
|
||||
dataclient = dataclient_factory(dataclient_type)
|
||||
dataclient.ingest_configurations(client_params[dataclient_type])
|
||||
dataclient.establish_client()
|
||||
dataclient = dataclient_factory(
|
||||
dataclient_type=dataclient_type,
|
||||
dataclient_config=client_params[dataclient_type],
|
||||
)
|
||||
|
||||
input_datahandler = datahandler_factory(
|
||||
generate_metrics_params["input_datahandler_type"]
|
||||
)
|
||||
output_datahandler = datahandler_factory(
|
||||
generate_metrics_params["output_datahandler_type"]
|
||||
)
|
||||
metrics = metrics_factory(generate_metrics_params["metrics_type"])
|
||||
|
||||
generate_metrics(
|
||||
dataclient=dataclient,
|
||||
input_datahandler=input_datahandler,
|
||||
output_datahandler=output_datahandler,
|
||||
input_dataclient=dataclient,
|
||||
output_dataclient=dataclient,
|
||||
model=model,
|
||||
metrics=metrics,
|
||||
target=feature_process_params["feature_processor_config"]["target"],
|
||||
|
|
|
|||
|
|
@ -5,15 +5,12 @@ After the model is built, we can evaluate its performance
|
|||
|
||||
import os
|
||||
import yaml
|
||||
import json
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from core.interface.InterfaceModels import MLModel
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
from core.interface.InterfaceDataHandler import DataHandler
|
||||
from core.DataClient import dataclient_factory
|
||||
from core.MLModels import model_factory
|
||||
from core.DataHandler import datahandler_factory
|
||||
from core.Logger import logger
|
||||
|
||||
|
||||
|
|
@ -40,7 +37,6 @@ feature_process_params = yaml.safe_load(open(feature_process_path))
|
|||
def generate_predictions(
|
||||
input_dataclient: DataClient,
|
||||
output_dataclient: DataClient,
|
||||
datahandler: DataHandler,
|
||||
model: MLModel,
|
||||
target: str,
|
||||
model_filepath: str,
|
||||
|
|
@ -56,9 +52,7 @@ def generate_predictions(
|
|||
logger.info("--- Loading test data ---")
|
||||
logger.info("-------------------------")
|
||||
|
||||
test_data = datahandler.load_data(
|
||||
dataclient=input_dataclient, location=test_data_filepath
|
||||
)
|
||||
test_data = input_dataclient.load_data(location=test_data_filepath)
|
||||
|
||||
logger.info("---------------------")
|
||||
logger.info("--- Loading model ---")
|
||||
|
|
@ -83,10 +77,8 @@ def generate_predictions(
|
|||
predictions_df = pd.DataFrame(predictions)
|
||||
predictions_df.columns = [predictions_column_name]
|
||||
|
||||
datahandler.save_data(
|
||||
dataclient=output_dataclient,
|
||||
obj=predictions_df,
|
||||
location=predictions_output_filepath,
|
||||
output_dataclient.save_data(
|
||||
obj=predictions_df, location=predictions_output_filepath
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -102,23 +94,20 @@ if __name__ == "__main__":
|
|||
# For predictions, we will want a cloud data client
|
||||
|
||||
input_dataclient_type = generate_predictions_params["input_dataclient_type"]
|
||||
input_dataclient = dataclient_factory(input_dataclient_type)
|
||||
input_dataclient.ingest_configurations(config=client_params[input_dataclient_type])
|
||||
input_dataclient.establish_client()
|
||||
input_dataclient = dataclient_factory(
|
||||
dataclient_type=input_dataclient_type,
|
||||
dataclient_config=client_params[input_dataclient_type],
|
||||
)
|
||||
|
||||
output_dataclient_type = generate_predictions_params["output_dataclient_type"]
|
||||
output_dataclient = dataclient_factory(output_dataclient_type)
|
||||
output_dataclient.ingest_configurations(
|
||||
config=client_params[output_dataclient_type]
|
||||
output_dataclient = dataclient_factory(
|
||||
dataclient_type=output_dataclient_type,
|
||||
dataclient_config=client_params[output_dataclient_type],
|
||||
)
|
||||
output_dataclient.establish_client()
|
||||
|
||||
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
|
||||
|
||||
generate_predictions(
|
||||
input_dataclient=input_dataclient,
|
||||
output_dataclient=output_dataclient,
|
||||
datahandler=datahandler,
|
||||
model=model,
|
||||
target=feature_process_params["feature_processor_config"]["target"],
|
||||
model_filepath=build_model_params["model_save_filepath"],
|
||||
|
|
|
|||
|
|
@ -6,17 +6,14 @@ Loading data from a client
|
|||
import os
|
||||
import yaml
|
||||
import pandas as pd
|
||||
from typing import Optional, Tuple, Union
|
||||
from typing import Tuple, Union
|
||||
from pathlib import Path
|
||||
from sklearn.datasets import load_diabetes
|
||||
from sklearn.model_selection import train_test_split
|
||||
from core.interface.InterfaceDataClient import DataClient
|
||||
from core.interface.InterfaceDataHandler import DataHandler
|
||||
from core.interface.InterfaceFeatureProcessor import FeatureProcessor
|
||||
from configs.feature_processor_logic import business_logic, new_feature_funcs
|
||||
from core.Logger import logger
|
||||
from core.DataClient import dataclient_factory
|
||||
from core.DataHandler import datahandler_factory
|
||||
from core.FeatureProcessor import feature_processor_factory
|
||||
|
||||
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
|
||||
|
|
@ -31,20 +28,9 @@ feature_process_path = Path(__file__).parent / "configs" / "feature_processor.ya
|
|||
feature_process_params = yaml.safe_load(open(feature_process_path))
|
||||
|
||||
|
||||
def use_dummy_data() -> pd.DataFrame:
|
||||
diabetes_data = load_diabetes()
|
||||
|
||||
x_data = pd.DataFrame(diabetes_data["data"], columns=diabetes_data["feature_names"]) # type: ignore
|
||||
y_data = pd.DataFrame(diabetes_data["target"], columns=["target"]) # type: ignore
|
||||
|
||||
data = pd.concat([x_data, y_data], axis=1)
|
||||
return data
|
||||
|
||||
|
||||
def prepare_data(
|
||||
input_dataclient: DataClient,
|
||||
output_dataclient: DataClient,
|
||||
datahandler: DataHandler,
|
||||
feature_processor: FeatureProcessor,
|
||||
data_filepath: str,
|
||||
train_proportion: float,
|
||||
|
|
@ -64,7 +50,7 @@ def prepare_data(
|
|||
logger.info("--- Loading data ---")
|
||||
logger.info("--------------------")
|
||||
|
||||
data = datahandler.load_data(dataclient=input_dataclient, location=data_filepath)
|
||||
data = input_dataclient.load_data(location=data_filepath, load_config={})
|
||||
|
||||
logger.info("--------------------------")
|
||||
logger.info("--- Feature Processing ---")
|
||||
|
|
@ -93,14 +79,10 @@ def prepare_data(
|
|||
logger.info("--- Outputting data ---")
|
||||
logger.info("-----------------------")
|
||||
|
||||
datahandler.save_data(
|
||||
dataclient=output_dataclient, obj=train, location=output_train_filepath
|
||||
)
|
||||
output_dataclient.save_data(obj=train, location=output_train_filepath)
|
||||
|
||||
if test is not None:
|
||||
datahandler.save_data(
|
||||
dataclient=output_dataclient, obj=test, location=output_test_filepath
|
||||
)
|
||||
output_dataclient.save_data(obj=test, location=output_test_filepath)
|
||||
|
||||
return train, test
|
||||
|
||||
|
|
@ -118,22 +100,14 @@ if __name__ == "__main__":
|
|||
input_dataclient_type = prepare_data_params["input_dataclient_type"]
|
||||
output_dataclient_type = prepare_data_params["output_dataclient_type"]
|
||||
|
||||
input_dataclient = dataclient_factory(input_dataclient_type)
|
||||
output_dataclient = dataclient_factory(output_dataclient_type)
|
||||
|
||||
input_dataclient.ingest_configurations(config=client_params[input_dataclient_type])
|
||||
input_dataclient.establish_client()
|
||||
|
||||
output_dataclient.ingest_configurations(
|
||||
config=client_params[output_dataclient_type]
|
||||
input_dataclient = dataclient_factory(
|
||||
dataclient_type=input_dataclient_type,
|
||||
dataclient_config=client_params[input_dataclient_type],
|
||||
)
|
||||
output_dataclient = dataclient_factory(
|
||||
dataclient_type=output_dataclient_type,
|
||||
dataclient_config=client_params[output_dataclient_type],
|
||||
)
|
||||
output_dataclient.establish_client()
|
||||
|
||||
logger.info("-----------------------------")
|
||||
logger.info(f"--- Initiate DataHandler ---")
|
||||
logger.info("-----------------------------")
|
||||
|
||||
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
|
||||
|
||||
logger.info("----------------------------------")
|
||||
logger.info(f"--- Initiate FeatureProcessor ---")
|
||||
|
|
@ -150,7 +124,6 @@ if __name__ == "__main__":
|
|||
prepare_data(
|
||||
input_dataclient=input_dataclient,
|
||||
output_dataclient=output_dataclient,
|
||||
datahandler=datahandler,
|
||||
feature_processor=feature_processor,
|
||||
data_filepath=prepare_data_params["data_filepath"],
|
||||
train_proportion=prepare_data_params["train_proportion"],
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue