diff --git a/modules/ml-pipeline/src/pipeline/src/configs/feature_processor.yaml b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor.yaml new file mode 100644 index 0000000..d84de4d --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor.yaml @@ -0,0 +1,7 @@ +feature_processor_type: dataframe +feature_processor_config: + subsample_amount: null + subsample_seed: 0 + target: RDSAP_CHANGE + drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE"] + retain_features: null diff --git a/modules/ml-pipeline/src/pipeline/src/configs/feature_processor_logic.py b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor_logic.py new file mode 100644 index 0000000..4a7d5e1 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor_logic.py @@ -0,0 +1,13 @@ +""" +During the feature processor step, we can apply additional business logic and feature generation by defining them here +""" + +""" +Business Logic dict + functions +""" +business_logic = {} + +""" +New features dict + function +""" +new_feature_funcs = {} diff --git a/modules/ml-pipeline/src/pipeline/src/core/DataClient.py b/modules/ml-pipeline/src/pipeline/src/core/DataClient.py index 065fbe0..955800f 100644 --- a/modules/ml-pipeline/src/pipeline/src/core/DataClient.py +++ b/modules/ml-pipeline/src/pipeline/src/core/DataClient.py @@ -2,9 +2,7 @@ Implementations of the DataClient Protocol """ -import os import boto3 -import pandas as pd from io import BytesIO from typing import List from core.interface.InterfaceDataClient import DataClient @@ -129,15 +127,15 @@ class AWSS3Client: aws_secret_access_key=self.config["AWS_SECRET_ACCESS_KEY"], ) - def download_data(self, location: dict) -> pd.DataFrame: + def download_data(self, location: dict) -> None: """ - When the client is established, we can load data + When the client is established, we can download data to a local file """ ... def load_data_as_buffer(self, location: str) -> BytesIO: """ - When the client is established, we can load data + When the client is established, we can load data in a buffer """ if not location.startswith("s3://"): raise ValueError("S3 file path specified without s3://") @@ -157,13 +155,13 @@ class AWSS3Client: def upload_data(self, location: str) -> None: """ - When the client is established, we can save out objects + When the client is established, we can save out objects from a local file """ ... def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None: """ - When the client is established, we can save out objects + When the client is established, we can save out objects from a buffer """ if not location.startswith("s3://"): raise ValueError("S3 file path specified without s3://") @@ -189,15 +187,15 @@ class LocalClient: """ logger.info("Local - No establishing client required") - def download_data(self, location: dict) -> pd.DataFrame: + def download_data(self, location: dict) -> None: """ - When the client is established, we can load data + When the client is established, we can download data to a file """ ... def load_data_as_buffer(self, location: str) -> BytesIO: """ - When the client is established, we can load data + When the client is established, we can load data from a buffer """ ... @@ -209,13 +207,13 @@ class LocalClient: def upload_data(self, location: str) -> None: """ - When the client is established, we can save out objects + When the client is established, we can save out objects from a file """ ... def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None: """ - When the client is established, we can save out objects + When the client is established, we can save out objects from a buffer """ # Write the contents of the buffer to the local file diff --git a/modules/ml-pipeline/src/pipeline/src/core/FeatureProcessor.py b/modules/ml-pipeline/src/pipeline/src/core/FeatureProcessor.py new file mode 100644 index 0000000..0b8568f --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/core/FeatureProcessor.py @@ -0,0 +1,144 @@ +""" +Interface for all FeatureProcessors +""" + +""" +Create additional features from the dataset +""" + +import pandas as pd +from typing import List, Callable, Union +from core.interface.InterfaceFeatureProcessor import FeatureProcessor +from core.Logger import logger + + +def feature_processor_factory(feature_processor_type: str) -> FeatureProcessor: + """ + Determine which dataclient to use + """ + feature_processor = { + "dataframe": DataFrameFeatureProcessor(), + # ADD MORE DATACLIENTS HERE + } + + if feature_processor_type not in feature_processor: + raise ValueError("Dataloader type specified is not in factory") + + return feature_processor[feature_processor_type] + + +def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str): + if not set(keys_1).issubset(keys_2): + raise ValueError(f"Incorrect {config_type} keys specified") + + +class DataFrameFeatureProcessor: + """ + Handle all feature manipulation before modelling + """ + + ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS = [ + "subsample_amount", + "subsample_seed", + "target", + "drop_columns", + "retain_features", + ] + + @staticmethod + def drop_unused_columns(df: pd.DataFrame, drop_columns: List[str]) -> pd.DataFrame: + """ + Remove the unused columns for RDS + """ + df = df.drop(columns=drop_columns) + return df + + @staticmethod + def retain_features(df: pd.DataFrame, retain_features: List[str] | None = None): + """ + Determine which columns to keep for modelling + """ + if retain_features is None: + retain_features = df.columns.to_list() + else: + if not set(retain_features).issubset(df.columns): + logger.error("Features defined is not contained in data") + exit(1) + + df = df[retain_features] + + return df + + @staticmethod + def subsample_data( + df: pd.DataFrame, subsample_amount: int | None = None, subsample_seed: int = 0 + ) -> pd.DataFrame: + """ + Sample data to reduce number of rows for model building if needed + """ + if subsample_amount: + df = df.sample(subsample_amount, random_state=subsample_seed) + return df + + @staticmethod + def apply_business_logic( + df: pd.DataFrame, business_logic: Union[dict[str, Callable], None] + ): + """ + If we need any additional business logic to be applied, post data cleaning + """ + if business_logic is None or len(business_logic) == 0: + return df + + # TODO: to test + for _, value in business_logic.items(): + df = value(df) + + return df + + @staticmethod + def generate_new_features( + df: pd.DataFrame, new_feature_funcs: Union[dict[str, Callable], None] + ): + """ + We can iterative over all keys (new feature column names), and apply their Calleabl function + """ + if new_feature_funcs is None or len(new_feature_funcs) == 0: + return df + + # TODO: to test + for key, value in new_feature_funcs.items(): + df[key] = value(df) + + return df + + def feature_process( + self, + obj: pd.DataFrame, + feature_processor_config: dict, + business_logic: dict | None = None, + new_feature_funcs: dict | None = None, + ) -> pd.DataFrame: + """ + Pipeline to get data ready for building a model + """ + validate_dict_keys( + self.ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS, + list(feature_processor_config.keys()), + config_type="Feature Process Config", + ) + + df = self.subsample_data( + obj, + subsample_amount=feature_processor_config["subsample_amount"], + subsample_seed=feature_processor_config["subsample_seed"], + ) + df = self.drop_unused_columns( + df, drop_columns=feature_processor_config["drop_columns"] + ) + df = self.retain_features( + df, retain_features=feature_processor_config["retain_features"] + ) + df = self.apply_business_logic(df, business_logic=business_logic) + df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs) + return df diff --git a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py index eb57356..3350714 100644 --- a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py +++ b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py @@ -24,7 +24,7 @@ class DataClient(Protocol): """ ... - def download_data(self, location: dict) -> pd.DataFrame: + def download_data(self, location: dict) -> None: """ When the client is established, we can load data """ diff --git a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceFeatureProcessor.py b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceFeatureProcessor.py new file mode 100644 index 0000000..6c83c75 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceFeatureProcessor.py @@ -0,0 +1,23 @@ +""" +Interface for all FeatureProcessors +""" + +import pandas as pd +from typing import Protocol, Union, Any + + +class FeatureProcessor(Protocol): + """ + Declare the methods required for a FeatureProcessor + """ + + def feature_process( + self, + obj: Union[pd.DataFrame, Any], + feature_processor_config: dict, + business_logic: dict, + new_feature_funcs: dict, + ) -> Union[pd.DataFrame, Any]: + """ + Apply all transformation to the data and return the same object back (for now) + """ diff --git a/modules/ml-pipeline/src/pipeline/src/prepare_data.py b/modules/ml-pipeline/src/pipeline/src/prepare_data.py index a51ac50..53ff8fc 100644 --- a/modules/ml-pipeline/src/pipeline/src/prepare_data.py +++ b/modules/ml-pipeline/src/pipeline/src/prepare_data.py @@ -12,14 +12,20 @@ from sklearn.datasets import load_diabetes from sklearn.model_selection import train_test_split from core.interface.InterfaceDataClient import DataClient from core.interface.InterfaceDataHandler import DataHandler +from core.interface.InterfaceFeatureProcessor import FeatureProcessor +from configs.feature_processor_logic import business_logic, new_feature_funcs from core.Logger import logger from core.DataClient import dataclient_factory from core.DataHandler import datahandler_factory +from core.FeatureProcessor import feature_processor_factory RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") -params_path = Path(__file__).parent / "configs" / "prepare_data.yaml" -params = yaml.safe_load(open(params_path)) +prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml" +prepare_data_params = yaml.safe_load(open(prepare_data_path)) + +feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml" +feature_process_params = yaml.safe_load(open(feature_process_path)) def use_dummy_data() -> pd.DataFrame: @@ -36,8 +42,12 @@ def prepare_data( input_dataclient: DataClient, output_dataclient: DataClient, datahandler: DataHandler, + feature_processor: FeatureProcessor, data_filepath: str, train_proportion: float, + feature_processor_config: dict, + business_logic: dict, + new_feature_funcs: dict, output_train_filepath: str = "train.parquet", output_test_filepath: str = "test.parquet", ) -> Tuple[pd.DataFrame, pd.DataFrame]: @@ -53,6 +63,17 @@ def prepare_data( data = datahandler.load_data(dataclient=input_dataclient, location=data_filepath) + logger.info("--------------------------") + logger.info("--- Feature Processing ---") + logger.info("--------------------------") + + data = feature_processor.feature_process( + data, + feature_processor_config=feature_processor_config, + business_logic=business_logic, + new_feature_funcs=new_feature_funcs, + ) + logger.info("----------------------") logger.info("--- Splitting data ---") logger.info("----------------------") @@ -61,10 +82,6 @@ def prepare_data( data, train_size=train_proportion, test_size=(1 - train_proportion) ) - logger.info("--------------------------") - logger.info("--- Feature Processing ---") - logger.info("--------------------------") - logger.info("-----------------------") logger.info("--- Outputting data ---") logger.info("-----------------------") @@ -89,20 +106,34 @@ if __name__ == "__main__": logger.info(f"--- Initiate DataClient ---") logger.info("----------------------------") - input_dataclient = dataclient_factory(params["input_dataclient_type"]) - output_dataclient = dataclient_factory(params["output_dataclient_type"]) + input_dataclient = dataclient_factory(prepare_data_params["input_dataclient_type"]) + output_dataclient = dataclient_factory( + prepare_data_params["output_dataclient_type"] + ) - input_dataclient.ingest_configurations(config=params["input_dataclient"]) + input_dataclient.ingest_configurations( + config=prepare_data_params["input_dataclient"] + ) input_dataclient.establish_client() - output_dataclient.ingest_configurations(config=params["output_dataclient"]) + output_dataclient.ingest_configurations( + config=prepare_data_params["output_dataclient"] + ) output_dataclient.establish_client() logger.info("-----------------------------") logger.info(f"--- Initiate DataHandler ---") logger.info("-----------------------------") - datahandler = datahandler_factory(params["datahandler_type"]) + datahandler = datahandler_factory(prepare_data_params["datahandler_type"]) + + logger.info("----------------------------------") + logger.info(f"--- Initiate FeatureProcessor ---") + logger.info("----------------------------------") + + feature_processor = feature_processor_factory( + feature_process_params["feature_processor_type"] + ) logger.info("---------------------------") logger.info(f"--- Prepare Data Stage ---") @@ -112,10 +143,14 @@ if __name__ == "__main__": input_dataclient=input_dataclient, output_dataclient=output_dataclient, datahandler=datahandler, - data_filepath=params["data_filepath"], - train_proportion=params["train_proportion"], - output_train_filepath=params["output_train_filepath"], - output_test_filepath=params["output_test_filepath"], + feature_processor=feature_processor, + data_filepath=prepare_data_params["data_filepath"], + train_proportion=prepare_data_params["train_proportion"], + output_train_filepath=prepare_data_params["output_train_filepath"], + output_test_filepath=prepare_data_params["output_test_filepath"], + feature_processor_config=feature_process_params["feature_processor_config"], + business_logic=business_logic, + new_feature_funcs=new_feature_funcs, ) logger.info("-------------------------------")