add feature processor

This commit is contained in:
Michael Duong 2023-09-12 09:55:47 +01:00
parent 027f9700c1
commit d717f85354
7 changed files with 248 additions and 28 deletions

View file

@ -0,0 +1,7 @@
feature_processor_type: dataframe
feature_processor_config:
subsample_amount: null
subsample_seed: 0
target: RDSAP_CHANGE
drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE"]
retain_features: null

View file

@ -0,0 +1,13 @@
"""
During the feature processor step, we can apply additional business logic and feature generation by defining them here
"""
"""
Business Logic dict + functions
"""
business_logic = {}
"""
New features dict + function
"""
new_feature_funcs = {}

View file

@ -2,9 +2,7 @@
Implementations of the DataClient Protocol
"""
import os
import boto3
import pandas as pd
from io import BytesIO
from typing import List
from core.interface.InterfaceDataClient import DataClient
@ -129,15 +127,15 @@ class AWSS3Client:
aws_secret_access_key=self.config["AWS_SECRET_ACCESS_KEY"],
)
def download_data(self, location: dict) -> pd.DataFrame:
def download_data(self, location: dict) -> None:
"""
When the client is established, we can load data
When the client is established, we can download data to a local file
"""
...
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data
When the client is established, we can load data in a buffer
"""
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
@ -157,13 +155,13 @@ class AWSS3Client:
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects
When the client is established, we can save out objects from a local file
"""
...
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects
When the client is established, we can save out objects from a buffer
"""
if not location.startswith("s3://"):
raise ValueError("S3 file path specified without s3://")
@ -189,15 +187,15 @@ class LocalClient:
"""
logger.info("Local - No establishing client required")
def download_data(self, location: dict) -> pd.DataFrame:
def download_data(self, location: dict) -> None:
"""
When the client is established, we can load data
When the client is established, we can download data to a file
"""
...
def load_data_as_buffer(self, location: str) -> BytesIO:
"""
When the client is established, we can load data
When the client is established, we can load data from a buffer
"""
...
@ -209,13 +207,13 @@ class LocalClient:
def upload_data(self, location: str) -> None:
"""
When the client is established, we can save out objects
When the client is established, we can save out objects from a file
"""
...
def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None:
"""
When the client is established, we can save out objects
When the client is established, we can save out objects from a buffer
"""
# Write the contents of the buffer to the local file

View file

@ -0,0 +1,144 @@
"""
Interface for all FeatureProcessors
"""
"""
Create additional features from the dataset
"""
import pandas as pd
from typing import List, Callable, Union
from core.interface.InterfaceFeatureProcessor import FeatureProcessor
from core.Logger import logger
def feature_processor_factory(feature_processor_type: str) -> FeatureProcessor:
"""
Determine which dataclient to use
"""
feature_processor = {
"dataframe": DataFrameFeatureProcessor(),
# ADD MORE DATACLIENTS HERE
}
if feature_processor_type not in feature_processor:
raise ValueError("Dataloader type specified is not in factory")
return feature_processor[feature_processor_type]
def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str):
if not set(keys_1).issubset(keys_2):
raise ValueError(f"Incorrect {config_type} keys specified")
class DataFrameFeatureProcessor:
"""
Handle all feature manipulation before modelling
"""
ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS = [
"subsample_amount",
"subsample_seed",
"target",
"drop_columns",
"retain_features",
]
@staticmethod
def drop_unused_columns(df: pd.DataFrame, drop_columns: List[str]) -> pd.DataFrame:
"""
Remove the unused columns for RDS
"""
df = df.drop(columns=drop_columns)
return df
@staticmethod
def retain_features(df: pd.DataFrame, retain_features: List[str] | None = None):
"""
Determine which columns to keep for modelling
"""
if retain_features is None:
retain_features = df.columns.to_list()
else:
if not set(retain_features).issubset(df.columns):
logger.error("Features defined is not contained in data")
exit(1)
df = df[retain_features]
return df
@staticmethod
def subsample_data(
df: pd.DataFrame, subsample_amount: int | None = None, subsample_seed: int = 0
) -> pd.DataFrame:
"""
Sample data to reduce number of rows for model building if needed
"""
if subsample_amount:
df = df.sample(subsample_amount, random_state=subsample_seed)
return df
@staticmethod
def apply_business_logic(
df: pd.DataFrame, business_logic: Union[dict[str, Callable], None]
):
"""
If we need any additional business logic to be applied, post data cleaning
"""
if business_logic is None or len(business_logic) == 0:
return df
# TODO: to test
for _, value in business_logic.items():
df = value(df)
return df
@staticmethod
def generate_new_features(
df: pd.DataFrame, new_feature_funcs: Union[dict[str, Callable], None]
):
"""
We can iterative over all keys (new feature column names), and apply their Calleabl function
"""
if new_feature_funcs is None or len(new_feature_funcs) == 0:
return df
# TODO: to test
for key, value in new_feature_funcs.items():
df[key] = value(df)
return df
def feature_process(
self,
obj: pd.DataFrame,
feature_processor_config: dict,
business_logic: dict | None = None,
new_feature_funcs: dict | None = None,
) -> pd.DataFrame:
"""
Pipeline to get data ready for building a model
"""
validate_dict_keys(
self.ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS,
list(feature_processor_config.keys()),
config_type="Feature Process Config",
)
df = self.subsample_data(
obj,
subsample_amount=feature_processor_config["subsample_amount"],
subsample_seed=feature_processor_config["subsample_seed"],
)
df = self.drop_unused_columns(
df, drop_columns=feature_processor_config["drop_columns"]
)
df = self.retain_features(
df, retain_features=feature_processor_config["retain_features"]
)
df = self.apply_business_logic(df, business_logic=business_logic)
df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs)
return df

View file

@ -24,7 +24,7 @@ class DataClient(Protocol):
"""
...
def download_data(self, location: dict) -> pd.DataFrame:
def download_data(self, location: dict) -> None:
"""
When the client is established, we can load data
"""

View file

@ -0,0 +1,23 @@
"""
Interface for all FeatureProcessors
"""
import pandas as pd
from typing import Protocol, Union, Any
class FeatureProcessor(Protocol):
"""
Declare the methods required for a FeatureProcessor
"""
def feature_process(
self,
obj: Union[pd.DataFrame, Any],
feature_processor_config: dict,
business_logic: dict,
new_feature_funcs: dict,
) -> Union[pd.DataFrame, Any]:
"""
Apply all transformation to the data and return the same object back (for now)
"""

View file

@ -12,14 +12,20 @@ from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from core.interface.InterfaceDataClient import DataClient
from core.interface.InterfaceDataHandler import DataHandler
from core.interface.InterfaceFeatureProcessor import FeatureProcessor
from configs.feature_processor_logic import business_logic, new_feature_funcs
from core.Logger import logger
from core.DataClient import dataclient_factory
from core.DataHandler import datahandler_factory
from core.FeatureProcessor import feature_processor_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
params_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
params = yaml.safe_load(open(params_path))
prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml"
prepare_data_params = yaml.safe_load(open(prepare_data_path))
feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml"
feature_process_params = yaml.safe_load(open(feature_process_path))
def use_dummy_data() -> pd.DataFrame:
@ -36,8 +42,12 @@ def prepare_data(
input_dataclient: DataClient,
output_dataclient: DataClient,
datahandler: DataHandler,
feature_processor: FeatureProcessor,
data_filepath: str,
train_proportion: float,
feature_processor_config: dict,
business_logic: dict,
new_feature_funcs: dict,
output_train_filepath: str = "train.parquet",
output_test_filepath: str = "test.parquet",
) -> Tuple[pd.DataFrame, pd.DataFrame]:
@ -53,6 +63,17 @@ def prepare_data(
data = datahandler.load_data(dataclient=input_dataclient, location=data_filepath)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
logger.info("--------------------------")
data = feature_processor.feature_process(
data,
feature_processor_config=feature_processor_config,
business_logic=business_logic,
new_feature_funcs=new_feature_funcs,
)
logger.info("----------------------")
logger.info("--- Splitting data ---")
logger.info("----------------------")
@ -61,10 +82,6 @@ def prepare_data(
data, train_size=train_proportion, test_size=(1 - train_proportion)
)
logger.info("--------------------------")
logger.info("--- Feature Processing ---")
logger.info("--------------------------")
logger.info("-----------------------")
logger.info("--- Outputting data ---")
logger.info("-----------------------")
@ -89,20 +106,34 @@ if __name__ == "__main__":
logger.info(f"--- Initiate DataClient ---")
logger.info("----------------------------")
input_dataclient = dataclient_factory(params["input_dataclient_type"])
output_dataclient = dataclient_factory(params["output_dataclient_type"])
input_dataclient = dataclient_factory(prepare_data_params["input_dataclient_type"])
output_dataclient = dataclient_factory(
prepare_data_params["output_dataclient_type"]
)
input_dataclient.ingest_configurations(config=params["input_dataclient"])
input_dataclient.ingest_configurations(
config=prepare_data_params["input_dataclient"]
)
input_dataclient.establish_client()
output_dataclient.ingest_configurations(config=params["output_dataclient"])
output_dataclient.ingest_configurations(
config=prepare_data_params["output_dataclient"]
)
output_dataclient.establish_client()
logger.info("-----------------------------")
logger.info(f"--- Initiate DataHandler ---")
logger.info("-----------------------------")
datahandler = datahandler_factory(params["datahandler_type"])
datahandler = datahandler_factory(prepare_data_params["datahandler_type"])
logger.info("----------------------------------")
logger.info(f"--- Initiate FeatureProcessor ---")
logger.info("----------------------------------")
feature_processor = feature_processor_factory(
feature_process_params["feature_processor_type"]
)
logger.info("---------------------------")
logger.info(f"--- Prepare Data Stage ---")
@ -112,10 +143,14 @@ if __name__ == "__main__":
input_dataclient=input_dataclient,
output_dataclient=output_dataclient,
datahandler=datahandler,
data_filepath=params["data_filepath"],
train_proportion=params["train_proportion"],
output_train_filepath=params["output_train_filepath"],
output_test_filepath=params["output_test_filepath"],
feature_processor=feature_processor,
data_filepath=prepare_data_params["data_filepath"],
train_proportion=prepare_data_params["train_proportion"],
output_train_filepath=prepare_data_params["output_train_filepath"],
output_test_filepath=prepare_data_params["output_test_filepath"],
feature_processor_config=feature_process_params["feature_processor_config"],
business_logic=business_logic,
new_feature_funcs=new_feature_funcs,
)
logger.info("-------------------------------")