start refactor of dataclient

This commit is contained in:
Michael Duong 2023-09-17 10:50:26 +00:00
parent 17f9697108
commit c62f32c1e5
7 changed files with 256 additions and 240 deletions

View file

@ -1,5 +1,5 @@
model_type: AutogluonAutoML
model_save_filepath: ./data/model/autogluonmodel/
model_type: SKLearnLinearRegression
model_save_filepath: ./data/model/model.joblib
SKLearnLinearRegression: null
@ -10,6 +10,6 @@ AutogluonAutoML:
output_filepath: ./data/model/autogluonmodel/
problem_type: regression
eval_metric: mean_absolute_error
time_limit: 200
presets: medium_quality
excluded_model_types: null
time_limit: 400
presets: high_quality
excluded_model_types: ['KNN']

View file

@ -4,5 +4,5 @@ feature_processor_config:
subsample_seed: 0
target: RDSAP_CHANGE
drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE", "CARBON_CHANGE"]
# retain_features: ["TOTAL_FLOOR_AREA_STARTING", "SAP_STARTING", "TOTAL_FLOOR_AREA_ENDING"]
retain_features: null
retain_features: ["TOTAL_FLOOR_AREA_STARTING", "SAP_STARTING", "HEAT_DEMAND_STARTING", "CARBON_STARTING", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS", "FIXED_LIGHTING_OUTLETS_COUNT", "PHOTO_SUPPLY_STARTING", "MULTI_GLAZE_PROPORTION_STARTING", "LOW_ENERGY_LIGHTING_STARTING", "NUMBER_OPEN_FIREPLACES_STARTING", "EXTENSION_COUNT_STARTING", "FLOOR_HEIGHT_STARTING", "PHOTO_SUPPLY_ENDING", "MULTI_GLAZE_PROPORTION_ENDING", "LOW_ENERGY_LIGHTING_ENDING", "NUMBER_OPEN_FIREPLACES_ENDING", "EXTENSION_COUNT_ENDING", "TOTAL_FLOOR_AREA_ENDING", "FLOOR_HEIGHT_ENDING", "DAYS_TO_STARTING", "DAYS_TO_ENDING"]
# retain_features: null

View file

@ -2,7 +2,7 @@ input_dataclient_type: aws-s3
output_dataclient_type: local
datahandler_type: parquet
data_filepath: s3://retrofit-data-dev/sap_change_model/dataset.parquet
train_proportion: 0.1
train_proportion: 0.9
output_train_filepath: ./data/prepared_data/train.parquet
output_test_filepath: ./data/prepared_data/test.parquet

View file

@ -4,27 +4,34 @@ Implementations of the DataClient Protocol
import os
import boto3
import pandas as pd
from pathlib import Path
from io import BytesIO
from typing import List
from typing import List, Union
from core.interface.InterfaceDataClient import DataClient
from core.Logger import logger
def dataclient_factory(dataclient_type: str) -> DataClient:
def dataclient_factory(
dataclient_type: str, dataclient_config: Union[dict, None]
) -> DataClient:
"""
Determine which dataclient to use
"""
if dataclient_config is None:
dataclient_config = {}
dataclients = {
"local": LocalClient(),
"aws-s3": AWSS3Client(),
"local": LocalClient,
"aws-s3": AWSS3Client,
# ADD MORE DATACLIENTS HERE
}
if dataclient_type not in dataclients:
raise ValueError("Dataclient type specified is not in factory")
return dataclients[dataclient_type]
return dataclients[dataclient_type](**dataclient_config)
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
@ -32,144 +39,142 @@ def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
raise ValueError(f"Incorrect {config_type} keys specified")
# class MinioClient:
# """
# Using the Minio s3 client, to do local testing
# """
# ACCEPTED_CONFIG_KEYS = [
# "aws_access_key_id",
# "aws_secret_access_key",
# "endpoint_url",
# ]
# ACCEPTED_LOAD_CONFIG_KEYS = []
# ACCEPTED_SAVE_CONFIG_KEYS = []
# def ingest_configurations(self, config: dict) -> None:
# """
# Load all configuration into the instance (self.config)
# """
# validate_dict_keys(
# keys_1=list(config.keys()),
# keys_2=self.ACCEPTED_CONFIG_KEYS,
# config_type="config",
# )
# self.config = config
# def establish_client(self) -> None:
# """
# With the given configurations, create the connection to the client (self.client)
# """
# ...
# def download_data(self, download_config: dict) -> pd.DataFrame:
# """
# When the client is established, we can load data
# """
# validate_dict_keys(
# keys_1=list(download_config.keys()),
# keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS,
# config_type="load_config",
# )
# return pd.DataFrame()
# def save_data(self, obj: object, save_config: dict) -> None:
# """
# When the client is established, we can save out objects
# """
# validate_dict_keys(
# keys_1=list(save_config.keys()),
# keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS,
# config_type="save_config",
# )
class AWSS3Client:
"""
Using Boto3, set up the AWS client
"""
ACCEPTED_CONFIG_KEYS = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
]
def __init__(
self,
AWS_ACCESS_KEY_ID: Union[str, None],
AWS_SECRET_ACCESS_KEY: Union[str, None],
ENDPOINT_URL: Union[str, None],
):
self.AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID
self.AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY
self.ENDPOINT_URL = ENDPOINT_URL
self._establish_client()
ACCEPTED_LOAD_CONFIG_KEYS = []
ACCEPTED_SAVE_CONFIG_KEYS = []
def ingest_configurations(self, config: dict) -> None:
"""
Load all configuration into the instance (self.config)
"""
validate_dict_keys(
keys_1=self.ACCEPTED_CONFIG_KEYS,
keys_2=list(config.keys()),
config_type="Ingest Config",
)
self.config = config
def establish_client(self) -> None:
def _establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
logger.info(f"Establishing S3 Client")
session = boto3.Session()
if (
self.config["AWS_ACCESS_KEY_ID"] is None
and self.config["AWS_SECRET_ACCESS_KEY"] is None
):
if self.AWS_ACCESS_KEY_ID is None and self.AWS_SECRET_ACCESS_KEY is None:
self.client = session.client(service_name="s3") # Using local credentials
else:
self.client = session.client(
service_name="s3",
aws_access_key_id=self.config["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=self.config["AWS_SECRET_ACCESS_KEY"],
aws_access_key_id=self.AWS_ACCESS_KEY_ID,
aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
endpoint_url=self.ENDPOINT_URL,
)
def download_data(self, location: dict) -> None:
def load_data(
self, location: str, filetype: str, load_config: Union[dict, None] = None
) -> pd.DataFrame:
"""
When the client is established, we can download data to a local file
Generic to load data
"""
...
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data in a buffer
"""
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
if load_config is None:
load_config = {}
load_methods = {
".parquet": self._load_parquet,
# "": _load_directory(**load_config),
# ADD MORE load_methods HERE
}
if filetype not in load_methods:
raise ValueError("load methods specified is not in factory")
return load_methods[filetype](location=location, load_config=load_config)
def save_data(
self,
obj: object,
location: str,
filetype: str,
save_config: Union[dict, None] = None,
) -> None:
"""
Generic to save data
"""
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
if save_config is None:
save_config = {}
save_methods = {
".parquet": self._save_parquet,
# "": _save_directory(**save_config),
# ADD MORE save_methods HERE
}
if filetype not in save_methods:
raise ValueError("save_methods specified is not in factory")
return save_methods[filetype](
obj=obj, location=location, save_config=save_config
)
def _save_parquet(self, obj: object, location: str, save_config: dict):
"""
Save object as parquet
"""
buffer = BytesIO()
obj.to_parquet(buffer, index=False)
bucket, key = location.strip("s3://").split("/", 1)
self.client.upload_fileobj(buffer, bucket, key)
def _load_parquet(self, location: str, load_config: dict) -> pd.DataFrame:
"""
Load a parquet file
"""
bucket, key = location.strip("s3://").split("/", 1)
buffer = BytesIO()
self.client.download_fileobj(bucket, key, buffer)
buffer.seek(0)
return buffer
df = pd.read_parquet(buffer, **load_config)
def load_database(self, database_location: dict) -> None:
"""
When the client is established, we can read from a database
"""
...
return df
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects from a local file
"""
...
# def load_data_as_buffer(self, location: str) -> BytesIO:
# """
# When the client is established, we can load data in a buffer
# """
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects from a buffer
"""
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
# bucket, key = location.strip("s3://").split("/", 1)
# buffer = BytesIO()
# self.client.download_fileobj(bucket, key, buffer)
# buffer.seek(0)
bucket, key = location.strip("s3://").split("/", 1)
self.client.upload_fileobj(buffer, bucket, key)
# return buffer
# def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
# """
# When the client is established, we can save out objects from a buffer
# """
# if not location.startswith("s3://"):
# raise ValueError("S3 file path specified without s3://")
# bucket, key = location.strip("s3://").split("/", 1)
# self.client.upload_fileobj(buffer, bucket, key)
class LocalClient:
@ -177,54 +182,103 @@ class LocalClient:
Interacting with data locally
"""
def ingest_configurations(self, config: dict) -> None:
def __init__(self):
"""
Load all configuration into the instance (self.config)
No initialisation needed for local client
"""
logger.info("Local - No configuration required")
self._establish_client()
def establish_client(self) -> None:
def _establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
logger.info("Local - No establishing client required")
def download_data(self, location: dict) -> None:
def load_data(
self, location: str, filetype: str, load_config: Union[dict, None] = None
) -> pd.DataFrame:
"""
When the client is established, we can download data to a file
Generic to load data
"""
...
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data from a buffer
"""
with open(location, "rb") as file:
# Read the entire file into a BytesIO object
buffer = BytesIO(file.read())
buffer.seek(0)
if load_config is None:
load_config = {}
return buffer
load_methods = {
".parquet": self._load_parquet,
# "": _load_directory(**load_config),
# ADD MORE load_methods HERE
}
def load_database(self, database_location: dict) -> None:
"""
When the client is established, we can read from a database
"""
...
if filetype not in load_methods:
raise ValueError("load methods specified is not in factory")
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects from a file
"""
...
return load_methods[filetype](location=location, load_config=load_config)
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
def save_data(
self,
obj: object,
location: str,
filetype: str,
save_config: Union[dict, None] = None,
) -> None:
"""
When the client is established, we can save out objects from a buffer
Generic to save data
"""
if not Path(location).parent.exists():
os.makedirs(Path(location).parent)
# Write the contents of the buffer to the local file
with open(location, "wb") as f:
f.write(buffer.getvalue())
if save_config is None:
save_config = {}
save_methods = {
".parquet": self._save_parquet,
# "": _save_directory(**save_config),
# ADD MORE save_methods HERE
}
if filetype not in save_methods:
raise ValueError("save_methods specified is not in factory")
return save_methods[filetype](
obj=obj, location=location, save_config=save_config
)
def _load_parquet(self, location: str, load_config: dict) -> pd.DataFrame:
"""
Load a parquet file
"""
df = pd.read_parquet(location, **load_config)
return df
def _save_parquet(self, obj: object, location: str, save_config: dict):
"""
Save object as parquet
"""
obj.to_parquet(location, **save_config)
# def load_data_as_buffer(self, location: str) -> BytesIO:
# """
# When the client is established, we can load data from a buffer
# """
# with open(location, "rb") as file:
# # Read the entire file into a BytesIO object
# buffer = BytesIO(file.read())
# buffer.seek(0)
# return buffer
# def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
# """
# When the client is established, we can save out objects from a buffer
# """
# if not Path(location).parent.exists():
# os.makedirs(Path(location).parent)
# # Write the contents of the buffer to the local file
# with open(location, "wb") as f:
# f.write(buffer.getvalue())

View file

@ -4,7 +4,7 @@ Interface for all DataClient i.e. s3, database, local etc
import pandas as pd
from io import BytesIO
from typing import Protocol
from typing import Protocol, Union
class DataClient(Protocol):
@ -12,44 +12,22 @@ class DataClient(Protocol):
Declare the methods required for a DataClient
"""
def ingest_configurations(self, config: dict) -> None:
"""
Load all configuration into the instance (self.config)
"""
...
def establish_client(self) -> None:
def _establish_client(self) -> None:
"""
With the given configurations, create the connection to the client (self.client)
"""
...
def download_data(self, location: dict) -> None:
def load_data(
self, location: str, filetype: str, load_config: Union[dict, None]
) -> pd.DataFrame:
"""
When the client is established, we can load data
Generic to load data
"""
...
def load_data_as_buffer(self, location: str) -> BytesIO:
def save_data(
self, obj: object, location: str, filetype: str, save_config: Union[dict, None]
) -> None:
"""
When the client is established, we can load data
Generic to save data
"""
...
def load_database(self, database_location: dict) -> None:
"""
When the client is established, we can read from a database
"""
...
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects
"""
...
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects
"""
...

View file

@ -11,12 +11,12 @@ stages:
configs/prepare_data.yaml:
output_test_filepath: ./data/prepared_data/test.parquet
output_train_filepath: ./data/prepared_data/train.parquet
train_proportion: 0.1
train_proportion: 0.9
outs:
- path: data/prepared_data/
hash: md5
md5: febdc8362200167078dfa578cf2bc889.dir
size: 24296908
md5: 5cbabd20ff23b9d6734c5c68684dc8dc.dir
size: 11982694
nfiles: 2
build_model:
cmd: python build_model.py
@ -27,8 +27,8 @@ stages:
size: 3948
- path: data/prepared_data
hash: md5
md5: febdc8362200167078dfa578cf2bc889.dir
size: 24296908
md5: 5cbabd20ff23b9d6734c5c68684dc8dc.dir
size: 11982694
nfiles: 2
params:
configs/build_model.yaml:
@ -36,32 +36,33 @@ stages:
output_filepath: ./data/model/autogluonmodel/
problem_type: regression
eval_metric: mean_absolute_error
time_limit: 200
presets: medium_quality
time_limit: 400
presets: high_quality
excluded_model_types:
- KNN
SKLearnLinearRegression:
SKLearnSVMRegression:
kernel: linear
model_save_filepath: ./data/model/autogluonmodel/
model_type: AutogluonAutoML
model_save_filepath: ./data/model/model.joblib
model_type: SKLearnLinearRegression
outs:
- path: data/model/
hash: md5
md5: 154f823d56a9892948a633789d9b08a5.dir
size: 680552724
nfiles: 18
md5: f53ceced818ffe9e3ae327492d5a049a.dir
size: 1832
nfiles: 1
generate_predictions:
cmd: python generate_predictions.py
deps:
- path: data/model
hash: md5
md5: 154f823d56a9892948a633789d9b08a5.dir
size: 680552724
nfiles: 18
md5: f53ceced818ffe9e3ae327492d5a049a.dir
size: 1832
nfiles: 1
- path: data/prepared_data
hash: md5
md5: febdc8362200167078dfa578cf2bc889.dir
size: 24296908
md5: 5cbabd20ff23b9d6734c5c68684dc8dc.dir
size: 11982694
nfiles: 2
- path: generate_predictions.py
hash: md5
@ -77,21 +78,21 @@ stages:
outs:
- path: data/predictions/
hash: md5
md5: d8abefde18d78588158ef6acf282e2ed.dir
size: 2948553
md5: e71d1d864228b3f3994217bfcdbcc5b7.dir
size: 643090
nfiles: 1
generate_metrics:
cmd: python generate_metrics.py
deps:
- path: data/predictions
hash: md5
md5: d8abefde18d78588158ef6acf282e2ed.dir
size: 2948553
md5: e71d1d864228b3f3994217bfcdbcc5b7.dir
size: 643090
nfiles: 1
- path: data/prepared_data
hash: md5
md5: febdc8362200167078dfa578cf2bc889.dir
size: 24296908
md5: 5cbabd20ff23b9d6734c5c68684dc8dc.dir
size: 11982694
nfiles: 2
- path: generate_metrics.py
hash: md5
@ -107,8 +108,8 @@ stages:
outs:
- path: metrics/metrics.json
hash: md5
md5: f5aaae75ea74241500cd1ce76751c579
size: 182
md5: 915100dc1b46b4517a3e1d71d211849d
size: 179
startup_cleanup:
cmd: python startup_cleanup.py
deps:

View file

@ -6,17 +6,14 @@ Loading data from a client
import os
import yaml
import pandas as pd
from typing import Optional, Tuple, Union
from typing import Tuple, Union
from pathlib import Path
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.interface.InterfaceFeatureProcessor import FeatureProcessor
from configs.feature_processor_logic import business_logic, new_feature_funcs
from core.Logger import logger
from core.DataClient import dataclient_factory
from core.DataHandler import datahandler_factory
from core.FeatureProcessor import feature_processor_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
@ -31,20 +28,9 @@ feature_process_path = Path(__file__).parent / "configs" / "feature_processor.ya
feature_process_params = yaml.safe_load(open(feature_process_path))
def use_dummy_data() -> pd.DataFrame:
diabetes_data = load_diabetes()
x_data = pd.DataFrame(diabetes_data["data"], columns=diabetes_data["feature_names"]) # type: ignore
y_data = pd.DataFrame(diabetes_data["target"], columns=["target"]) # type: ignore
data = pd.concat([x_data, y_data], axis=1)
return data
def prepare_data(
input_dataclient: DataClient,
output_dataclient: DataClient,
datahandler: DataHandler,
feature_processor: FeatureProcessor,
data_filepath: str,
train_proportion: float,
@ -64,7 +50,11 @@ def prepare_data(
logger.info("--- Loading data ---")
logger.info("--------------------")
data = datahandler.load_data(dataclient=input_dataclient, location=data_filepath)
data_filetype = Path(data_filepath).suffix
data = input_dataclient.load_data(
location=data_filepath, filetype=data_filetype, load_config={}
)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
@ -93,13 +83,15 @@ def prepare_data(
logger.info("--- Outputting data ---")
logger.info("-----------------------")
datahandler.save_data(
dataclient=output_dataclient, obj=train, location=output_train_filepath
output_train_filetype = Path(output_train_filepath).suffix
output_dataclient.save_data(
obj=train, location=output_train_filepath, filetype=output_train_filetype
)
if test is not None:
datahandler.save_data(
dataclient=output_dataclient, obj=test, location=output_test_filepath
output_test_filetype = Path(output_test_filepath).suffix
output_dataclient.save_data(
obj=test, location=output_test_filepath, filetype=output_test_filetype
)
return train, test
@ -118,22 +110,14 @@ if __name__ == "__main__":
input_dataclient_type = prepare_data_params["input_dataclient_type"]
output_dataclient_type = prepare_data_params["output_dataclient_type"]
input_dataclient = dataclient_factory(input_dataclient_type)
output_dataclient = dataclient_factory(output_dataclient_type)
input_dataclient.ingest_configurations(config=client_params[input_dataclient_type])
input_dataclient.establish_client()
output_dataclient.ingest_configurations(
config=client_params[output_dataclient_type]
input_dataclient = dataclient_factory(
dataclient_type=input_dataclient_type,
dataclient_config=client_params[input_dataclient_type],
)
output_dataclient = dataclient_factory(
dataclient_type=output_dataclient_type,
dataclient_config=client_params[output_dataclient_type],
)
output_dataclient.establish_client()
logger.info("-----------------------------")
logger.info(f"--- Initiate DataHandler ---")
logger.info("-----------------------------")
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
logger.info("----------------------------------")
logger.info(f"--- Initiate FeatureProcessor ---")
@ -150,7 +134,6 @@ if __name__ == "__main__":
prepare_data(
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
datahandler=datahandler,
feature_processor=feature_processor,
data_filepath=prepare_data_params["data_filepath"],
train_proportion=prepare_data_params["train_proportion"],