diff --git a/modules/ml-pipeline/src/pipeline/src/.DS_Store b/modules/ml-pipeline/src/pipeline/src/.DS_Store new file mode 100644 index 0000000..4dff51a Binary files /dev/null and b/modules/ml-pipeline/src/pipeline/src/.DS_Store differ diff --git a/modules/ml-pipeline/src/pipeline/src/build_model.py b/modules/ml-pipeline/src/pipeline/src/build_model.py index de9b522..dde3035 100644 --- a/modules/ml-pipeline/src/pipeline/src/build_model.py +++ b/modules/ml-pipeline/src/pipeline/src/build_model.py @@ -12,6 +12,7 @@ from core.Logger import logger from core.interface.InterfaceModels import MLModel from core.interface.InterfaceDataClient import DataClient from core.DataClient import dataclient_factory +from core.DataHandler import datahandler_factory from core.MLModels import model_factory RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") @@ -22,6 +23,9 @@ prepare_data_params = yaml.safe_load(open(prepare_data_path)) build_model_path = Path(__file__).parent / "configs" / "build_model.yaml" build_model_params = yaml.safe_load(open(build_model_path)) +feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml" +feature_process_params = yaml.safe_load(open(feature_process_path)) + def build_model( dataclient: DataClient, @@ -40,16 +44,16 @@ def build_model( logger.info("--------------------------------------") if train_data is None: - # TODO: replace this with the data client to load if train_filepath is None: - raise ValueError(f"Need {train_filepath}") - train_data = pd.read_parquet(train_filepath) + raise ValueError(f"Need {train_filepath} if no data supplied") + train_data = datahandler.load_data( + dataclient=dataclient, location=train_filepath + ) if test_data is None: - # TODO: replace this with the data client to load if test_filepath is None: - raise ValueError(f"Need {test_filepath}") - test_data = pd.read_parquet(test_filepath) + raise ValueError(f"Need {test_filepath} if no data supplied") + test_data = datahandler.load_data(dataclient=dataclient, location=test_filepath) logger.info("----------------------") logger.info("--- Training model ---") @@ -65,9 +69,6 @@ def build_model( model.save_model(path=Path(model_save_location)) - # TODO: replace this with the data client to load - # TODO: can fine tune model here if need with the test data - if __name__ == "__main__": @@ -79,7 +80,13 @@ if __name__ == "__main__": logger.info(f"--- Initiate DataClient ---") logger.info("----------------------------") - dataclient = dataclient_factory(prepare_data_params["dataclient_type"]) + dataclient = dataclient_factory(prepare_data_params["output_dataclient_type"]) + + logger.info("-----------------------------") + logger.info(f"--- Initiate DataHandler ---") + logger.info("-----------------------------") + + datahandler = datahandler_factory(prepare_data_params["datahandler_type"]) logger.info("-------------------------") logger.info(f"--- Initiate MLModel ---") @@ -95,7 +102,7 @@ if __name__ == "__main__": build_model( dataclient=dataclient, model=model, - target=build_model_params["target"], + target=feature_process_params["feature_processor_config"]["target"], model_save_location=build_model_params["model_save_filepath"], model_hyperparameters=build_model_params[model_type], train_filepath=prepare_data_params["output_train_filepath"], diff --git a/modules/ml-pipeline/src/pipeline/src/configs/build_model.yaml b/modules/ml-pipeline/src/pipeline/src/configs/build_model.yaml index 8a16027..a1307c1 100644 --- a/modules/ml-pipeline/src/pipeline/src/configs/build_model.yaml +++ b/modules/ml-pipeline/src/pipeline/src/configs/build_model.yaml @@ -1,8 +1,15 @@ -model_type: SKLearnLinearRegression -target: target -model_save_filepath: ./data/model/model.joblib +model_type: AutogluonAutoML +model_save_filepath: ./data/model/autogluonmodel/ SKLearnLinearRegression: null SKLearnSVMRegression: kernel: "linear" + +AutogluonAutoML: + output_filepath: ./data/model/autogluonmodel/ + problem_type: regression + eval_metric: mean_absolute_error + time_limit: 200 + presets: medium_quality + excluded_model_types: null diff --git a/modules/ml-pipeline/src/pipeline/src/configs/feature_processor.yaml b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor.yaml new file mode 100644 index 0000000..233a329 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor.yaml @@ -0,0 +1,8 @@ +feature_processor_type: dataframe +feature_processor_config: + subsample_amount: null + subsample_seed: 0 + target: RDSAP_CHANGE + drop_columns: ["UPRN", "HEAT_DEMAND_CHANGE"] + # retain_features: ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"] + retain_features: null diff --git a/modules/ml-pipeline/src/pipeline/src/configs/feature_processor_logic.py b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor_logic.py new file mode 100644 index 0000000..4a7d5e1 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/configs/feature_processor_logic.py @@ -0,0 +1,13 @@ +""" +During the feature processor step, we can apply additional business logic and feature generation by defining them here +""" + +""" +Business Logic dict + functions +""" +business_logic = {} + +""" +New features dict + function +""" +new_feature_funcs = {} diff --git a/modules/ml-pipeline/src/pipeline/src/configs/generate_metrics.yaml b/modules/ml-pipeline/src/pipeline/src/configs/generate_metrics.yaml index a370f9f..84f5897 100644 --- a/modules/ml-pipeline/src/pipeline/src/configs/generate_metrics.yaml +++ b/modules/ml-pipeline/src/pipeline/src/configs/generate_metrics.yaml @@ -1,2 +1,5 @@ +dataclient_type: local +input_datahandler_type: parquet +output_datahandler_type: json metrics_type: Regression metrics_output_filepath: ./metrics/metrics.json diff --git a/modules/ml-pipeline/src/pipeline/src/configs/generate_predictions.yaml b/modules/ml-pipeline/src/pipeline/src/configs/generate_predictions.yaml index c7f1b2d..404c33f 100644 --- a/modules/ml-pipeline/src/pipeline/src/configs/generate_predictions.yaml +++ b/modules/ml-pipeline/src/pipeline/src/configs/generate_predictions.yaml @@ -1,3 +1,5 @@ +input_dataclient_type: local +output_dataclient_type: local test_data_filepath: ./data/prepared_data/test.parquet predictions_output_filepath: ./data/predictions/predictions.parquet predictions_column_name: predictions diff --git a/modules/ml-pipeline/src/pipeline/src/configs/prepare_data.yaml b/modules/ml-pipeline/src/pipeline/src/configs/prepare_data.yaml index 9a0c3bd..736f5d2 100644 --- a/modules/ml-pipeline/src/pipeline/src/configs/prepare_data.yaml +++ b/modules/ml-pipeline/src/pipeline/src/configs/prepare_data.yaml @@ -1,5 +1,13 @@ -dataclient_type: minio -data_location: s3://dev_bucket +input_dataclient_type: aws-s3 +input_dataclient: + AWS_ACCESS_KEY_ID: null + AWS_SECRET_ACCESS_KEY: null + ENDPOINT_URL: null +output_dataclient_type: local +output_dataclient: + null +datahandler_type: parquet +data_filepath: s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet train_proportion: 0.8 output_train_filepath: ./data/prepared_data/train.parquet output_test_filepath: ./data/prepared_data/test.parquet diff --git a/modules/ml-pipeline/src/pipeline/src/configs/startup_cleanup.yaml b/modules/ml-pipeline/src/pipeline/src/configs/startup_cleanup.yaml new file mode 100644 index 0000000..909fb4b --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/configs/startup_cleanup.yaml @@ -0,0 +1,2 @@ +artefacts: ./data +metrics: ./metrics diff --git a/modules/ml-pipeline/src/pipeline/src/core/DataClient.py b/modules/ml-pipeline/src/pipeline/src/core/DataClient.py index f185b91..cb5b8d7 100644 --- a/modules/ml-pipeline/src/pipeline/src/core/DataClient.py +++ b/modules/ml-pipeline/src/pipeline/src/core/DataClient.py @@ -2,9 +2,13 @@ Implementations of the DataClient Protocol """ -import pandas as pd +import os +import boto3 +from pathlib import Path +from io import BytesIO from typing import List from core.interface.InterfaceDataClient import DataClient +from core.Logger import logger def dataclient_factory(dataclient_type: str) -> DataClient: @@ -12,7 +16,8 @@ def dataclient_factory(dataclient_type: str) -> DataClient: Determine which dataclient to use """ dataclients = { - "minio": MinioClient(), + "local": LocalClient(), + "aws-s3": AWSS3Client(), # ADD MORE DATACLIENTS HERE } @@ -27,15 +32,69 @@ def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str): raise ValueError(f"Incorrect {config_type} keys specified") -class MinioClient: +# class MinioClient: +# """ +# Using the Minio s3 client, to do local testing +# """ + +# ACCEPTED_CONFIG_KEYS = [ +# "aws_access_key_id", +# "aws_secret_access_key", +# "endpoint_url", +# ] +# ACCEPTED_LOAD_CONFIG_KEYS = [] +# ACCEPTED_SAVE_CONFIG_KEYS = [] + +# def ingest_configurations(self, config: dict) -> None: +# """ +# Load all configuration into the instance (self.config) +# """ +# validate_dict_keys( +# keys_1=list(config.keys()), +# keys_2=self.ACCEPTED_CONFIG_KEYS, +# config_type="config", +# ) + +# self.config = config + +# def establish_client(self) -> None: +# """ +# With the given configurations, create the connection to the client (self.client) +# """ + +# ... + +# def download_data(self, download_config: dict) -> pd.DataFrame: +# """ +# When the client is established, we can load data +# """ +# validate_dict_keys( +# keys_1=list(download_config.keys()), +# keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS, +# config_type="load_config", +# ) + +# return pd.DataFrame() + +# def save_data(self, obj: object, save_config: dict) -> None: +# """ +# When the client is established, we can save out objects +# """ +# validate_dict_keys( +# keys_1=list(save_config.keys()), +# keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS, +# config_type="save_config", +# ) + + +class AWSS3Client: """ - Using the Minio s3 client, to do local testing + Using Boto3, set up the AWS client """ ACCEPTED_CONFIG_KEYS = [ - "aws_access_key_id", - "aws_secret_access_key", - "endpoint_url", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", ] ACCEPTED_LOAD_CONFIG_KEYS = [] ACCEPTED_SAVE_CONFIG_KEYS = [] @@ -45,38 +104,127 @@ class MinioClient: Load all configuration into the instance (self.config) """ validate_dict_keys( - keys_1=list(config.keys()), - keys_2=self.ACCEPTED_CONFIG_KEYS, - config_type="config", + keys_1=self.ACCEPTED_CONFIG_KEYS, + keys_2=list(config.keys()), + config_type="Ingest Config", ) - self.config = config def establish_client(self) -> None: """ With the given configurations, create the connection to the client (self.client) """ + logger.info(f"Establishing S3 Client") + session = boto3.Session() + if ( + self.config["AWS_ACCESS_KEY_ID"] is None + and self.config["AWS_SECRET_ACCESS_KEY"] is None + ): + self.client = session.client(service_name="s3") # Using local credentials + else: + self.client = session.client( + service_name="s3", + aws_access_key_id=self.config["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=self.config["AWS_SECRET_ACCESS_KEY"], + ) + + def download_data(self, location: dict) -> None: + """ + When the client is established, we can download data to a local file + """ ... - def load_data(self, load_config: dict) -> pd.DataFrame: + def load_data_as_buffer(self, location: str) -> BytesIO: """ - When the client is established, we can load data + When the client is established, we can load data in a buffer """ - validate_dict_keys( - keys_1=list(load_config.keys()), - keys_2=self.ACCEPTED_LOAD_CONFIG_KEYS, - config_type="load_config", - ) + if not location.startswith("s3://"): + raise ValueError("S3 file path specified without s3://") - return pd.DataFrame() + bucket, key = location.strip("s3://").split("/", 1) + buffer = BytesIO() + self.client.download_fileobj(bucket, key, buffer) + buffer.seek(0) - def save_data(self, obj: object, save_config: dict) -> None: + return buffer + + def load_database(self, database_location: dict) -> None: """ - When the client is established, we can save out objects + When the client is established, we can read from a database """ - validate_dict_keys( - keys_1=list(save_config.keys()), - keys_2=self.ACCEPTED_SAVE_CONFIG_KEYS, - config_type="save_config", - ) + ... + + def upload_data(self, location: str) -> None: + """ + When the client is established, we can save out objects from a local file + """ + ... + + def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None: + """ + When the client is established, we can save out objects from a buffer + """ + if not location.startswith("s3://"): + raise ValueError("S3 file path specified without s3://") + + bucket, key = location.strip("s3://").split("/", 1) + self.client.upload_fileobj(buffer, bucket, key) + + +class LocalClient: + """ + Interacting with data locally + """ + + def ingest_configurations(self, config: dict) -> None: + """ + Load all configuration into the instance (self.config) + """ + logger.info("Local - No configuration required") + + def establish_client(self) -> None: + """ + With the given configurations, create the connection to the client (self.client) + """ + logger.info("Local - No establishing client required") + + def download_data(self, location: dict) -> None: + """ + When the client is established, we can download data to a file + """ + ... + + def load_data_as_buffer(self, location: str) -> BytesIO: + """ + When the client is established, we can load data from a buffer + """ + with open(location, "rb") as file: + # Read the entire file into a BytesIO object + buffer = BytesIO(file.read()) + buffer.seek(0) + + return buffer + + def load_database(self, database_location: dict) -> None: + """ + When the client is established, we can read from a database + """ + ... + + def upload_data(self, location: str) -> None: + """ + When the client is established, we can save out objects from a file + """ + ... + + def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None: + """ + When the client is established, we can save out objects from a buffer + """ + if not Path(location).parent.exists(): + os.makedirs(Path(location).parent) + + # Write the contents of the buffer to the local file + with open(location, "wb") as f: + f.write(buffer.getvalue()) diff --git a/modules/ml-pipeline/src/pipeline/src/core/DataHandler.py b/modules/ml-pipeline/src/pipeline/src/core/DataHandler.py new file mode 100644 index 0000000..f5c07c1 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/core/DataHandler.py @@ -0,0 +1,86 @@ +""" +Implementations of the datahandler Protocol +""" + +import json +import pandas as pd +from io import BytesIO +from typing import List +from core.interface.InterfaceDataHandler import DataHandler +from core.interface.InterfaceDataClient import DataClient + + +def datahandler_factory(datahandler_type: str) -> DataHandler: + """ + Determine which dataclient to use + """ + datahandler = { + "parquet": ParquetHandler(), + "json": JSONHandler() + # ADD MORE DATACLIENTS HERE + } + + if datahandler_type not in datahandler: + raise ValueError("Dataloader type specified is not in factory") + + return datahandler[datahandler_type] + + +def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str): + if not set(keys_1).issubset(keys_2): + raise ValueError(f"Incorrect {config_type} keys specified") + + +class ParquetHandler: + """ + Load and save Parquet datasets + """ + + def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame: + """ + When the client is established, we can load data + """ + df = pd.read_parquet(dataclient.load_data_as_buffer(location=location)) + return df + + def save_data( + self, dataclient: DataClient, obj: pd.DataFrame, location: str + ) -> None: + """ + When the client is established, we can save out objects + """ + # Convert the Pandas DataFrame to a Parquet buffer + parquet_buffer = BytesIO() + obj.to_parquet(parquet_buffer, index=False) + + dataclient.upload_data_from_buffer(buffer=parquet_buffer, location=location) + + +class JSONHandler: + """ + Load and save Parquet datasets + """ + + def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame: + """ + When the client is established, we can load data + """ + ... + + def save_data(self, dataclient: DataClient, obj: dict, location: str) -> None: + """ + When the client is established, we can save out objects + """ + # Serialize the dictionary to a JSON-formatted string + json_string = json.dumps(obj) # indent for pretty formatting + + # Convert the JSON string to bytes (UTF-8 encoding) + json_bytes = json_string.encode("utf-8") + + # Create a BytesIO object and write the JSON bytes to it + buffer = BytesIO() + buffer.write(json_bytes) + + buffer.seek(0) + + dataclient.upload_data_from_buffer(buffer=buffer, location=location) diff --git a/modules/ml-pipeline/src/pipeline/src/core/FeatureProcessor.py b/modules/ml-pipeline/src/pipeline/src/core/FeatureProcessor.py new file mode 100644 index 0000000..7f14e03 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/core/FeatureProcessor.py @@ -0,0 +1,148 @@ +""" +Interface for all FeatureProcessors +""" + +""" +Create additional features from the dataset +""" + +import pandas as pd +from typing import List, Callable, Union +from core.interface.InterfaceFeatureProcessor import FeatureProcessor +from core.Logger import logger + + +def feature_processor_factory(feature_processor_type: str) -> FeatureProcessor: + """ + Determine which dataclient to use + """ + feature_processor = { + "dataframe": DataFrameFeatureProcessor(), + # ADD MORE DATACLIENTS HERE + } + + if feature_processor_type not in feature_processor: + raise ValueError("Dataloader type specified is not in factory") + + return feature_processor[feature_processor_type] + + +def validate_dict_keys(keys_1: List[str], keys_2: List[str], config_type: str): + if not set(keys_1).issubset(keys_2): + raise ValueError(f"Incorrect {config_type} keys specified") + + +class DataFrameFeatureProcessor: + """ + Handle all feature manipulation before modelling + """ + + ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS = [ + "subsample_amount", + "subsample_seed", + "target", + "drop_columns", + "retain_features", + ] + + @staticmethod + def drop_unused_columns(df: pd.DataFrame, drop_columns: List[str]) -> pd.DataFrame: + """ + Remove the unused columns for RDS + """ + df = df.drop(columns=drop_columns) + return df + + @staticmethod + def retain_features( + df: pd.DataFrame, target: str, retain_features: List[str] | None = None + ) -> pd.DataFrame: + """ + Determine which columns to keep for modelling + """ + if retain_features is None: + retain_features = df.columns.to_list() + else: + if not set(retain_features).issubset(df.columns): + raise ValueError("Features defined is not contained in data") + retain_features = [target] + retain_features + + df = df[retain_features] + + return df + + @staticmethod + def subsample_data( + df: pd.DataFrame, subsample_amount: int | None = None, subsample_seed: int = 0 + ) -> pd.DataFrame: + """ + Sample data to reduce number of rows for model building if needed + """ + if subsample_amount: + df = df.sample(subsample_amount, random_state=subsample_seed) + return df + + @staticmethod + def apply_business_logic( + df: pd.DataFrame, business_logic: Union[dict[str, Callable], None] + ) -> pd.DataFrame: + """ + If we need any additional business logic to be applied, post data cleaning + """ + if business_logic is None or len(business_logic) == 0: + return df + + # TODO: to test + for _, value in business_logic.items(): + df = value(df) + + return df + + @staticmethod + def generate_new_features( + df: pd.DataFrame, new_feature_funcs: Union[dict[str, Callable], None] + ) -> pd.DataFrame: + """ + We can iterative over all keys (new feature column names), and apply their Calleabl function + """ + if new_feature_funcs is None or len(new_feature_funcs) == 0: + return df + + # TODO: to test + for key, value in new_feature_funcs.items(): + df[key] = value(df) + + return df + + def feature_process( + self, + obj: pd.DataFrame, + feature_processor_config: dict, + business_logic: dict | None = None, + new_feature_funcs: dict | None = None, + ) -> pd.DataFrame: + """ + Pipeline to get data ready for building a model + """ + validate_dict_keys( + self.ACCEPTED_FEATURE_PROCESS_CONFIG_KEYS, + list(feature_processor_config.keys()), + config_type="Feature Process Config", + ) + + df = self.subsample_data( + obj, + subsample_amount=feature_processor_config["subsample_amount"], + subsample_seed=feature_processor_config["subsample_seed"], + ) + df = self.drop_unused_columns( + df, drop_columns=feature_processor_config["drop_columns"] + ) + df = self.retain_features( + df, + retain_features=feature_processor_config["retain_features"], + target=feature_processor_config["target"], + ) + df = self.apply_business_logic(df, business_logic=business_logic) + df = self.generate_new_features(df, new_feature_funcs=new_feature_funcs) + return df diff --git a/modules/ml-pipeline/src/pipeline/src/core/MLModels.py b/modules/ml-pipeline/src/pipeline/src/core/MLModels.py index 984c340..2c237ba 100644 --- a/modules/ml-pipeline/src/pipeline/src/core/MLModels.py +++ b/modules/ml-pipeline/src/pipeline/src/core/MLModels.py @@ -13,7 +13,9 @@ from pathlib import Path from typing import Union, List from sklearn import linear_model from sklearn.svm import SVR +from autogluon.tabular import TabularDataset, TabularPredictor from core.interface.InterfaceModels import MLModel +from core.Logger import logger def model_factory(model_type: str) -> MLModel: @@ -23,6 +25,7 @@ def model_factory(model_type: str) -> MLModel: models = { "SKLearnLinearRegression": SKLearnLinearRegression(), "SKLearnSVMRegression": SKLearnSVMRegression(), + "AutogluonAutoML": AutogluonAutoML() # ADD OTHER MODELS HERE } @@ -131,3 +134,78 @@ class SKLearnSVMRegression: """ self.predictions = pd.Series(self.model.predict(data)) return self.predictions + + +class AutogluonAutoML: + + ACCEPTED_MODEL_HYPERPAREMETERS = [ + "output_filepath", + "problem_type", + "eval_metric", + "time_limit", + "presets", + "excluded_model_types", + ] + + def load_model(self, path: Union[Path, str]) -> None: + """ + Method to load a model + """ + filepath = str(path) + self.model = TabularPredictor.load(path=filepath) + + def save_model(self, path: Path) -> str: + """ + Method to save a model + """ + if self.model is None: + raise KeyError("No model trained/ loaded - unable to save") + + logger.info("In local development mode - no need for s3 client") + logger.info("Using AutoGluon Model - Model saving already occured") + + return str(path) + + def train_model( + self, data: pd.DataFrame, target: str, model_hyperparameters: dict + ) -> None: + """ + Method to train a model + """ + + validate_dict_keys( + keys_1=list(model_hyperparameters.keys()), + keys_2=self.ACCEPTED_MODEL_HYPERPAREMETERS, + config_type="Model Hyperparameters", + ) + + if model_hyperparameters["output_filepath"] is None: + logger.error("Please specify a output_filepath in order to train a model") + exit(1) + + AGdata = TabularDataset(data=data) + + self.model = TabularPredictor( + label=target, + path=model_hyperparameters["output_filepath"], + problem_type=model_hyperparameters["problem_type"], + eval_metric=model_hyperparameters["eval_metric"], + ).fit( + AGdata, + time_limit=model_hyperparameters["time_limit"], + presets=model_hyperparameters["presets"], + excluded_model_types=model_hyperparameters["excluded_model_types"], + ) + + def predict(self, data: pd.DataFrame) -> pd.Series: + """ + Method to predict + """ + + if self.model is None: + print("No model loaded/ trained") + exit(1) + + predictions = pd.Series(self.model.predict(data)) + + return predictions diff --git a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py index 20cfe30..3350714 100644 --- a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py +++ b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataClient.py @@ -3,6 +3,7 @@ Interface for all DataClient i.e. s3, database, local etc """ import pandas as pd +from io import BytesIO from typing import Protocol @@ -23,13 +24,32 @@ class DataClient(Protocol): """ ... - def load_data(self, load_config: dict) -> pd.DataFrame: + def download_data(self, location: dict) -> None: """ When the client is established, we can load data """ ... - def save_data(self, obj: object, save_config: dict) -> None: + def load_data_as_buffer(self, location: str) -> BytesIO: + """ + When the client is established, we can load data + """ + ... + + def load_database(self, database_location: dict) -> None: + """ + When the client is established, we can read from a database + """ + ... + + def upload_data(self, location: str) -> None: """ When the client is established, we can save out objects """ + ... + + def upload_data_from_buffer(self, buffer: BytesIO, location: str) -> None: + """ + When the client is established, we can save out objects + """ + ... diff --git a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataHandler.py b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataHandler.py new file mode 100644 index 0000000..1c21144 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceDataHandler.py @@ -0,0 +1,26 @@ +""" +Interface for all DataHandler i.e. Parquet data, csv data +""" + +import pandas as pd +from typing import Protocol, Union, Any +from core.interface.InterfaceDataClient import DataClient + + +class DataHandler(Protocol): + """ + Declare the methods required for a DataClient + """ + + def load_data(self, dataclient: DataClient, location: str) -> pd.DataFrame: + """ + When the client is established, we can load data + """ + ... + + def save_data( + self, dataclient: DataClient, obj: Union[pd.DataFrame, dict, Any], location: str + ) -> None: + """ + When the client is established, we can save out objects + """ diff --git a/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceFeatureProcessor.py b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceFeatureProcessor.py new file mode 100644 index 0000000..6c83c75 --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/core/interface/InterfaceFeatureProcessor.py @@ -0,0 +1,23 @@ +""" +Interface for all FeatureProcessors +""" + +import pandas as pd +from typing import Protocol, Union, Any + + +class FeatureProcessor(Protocol): + """ + Declare the methods required for a FeatureProcessor + """ + + def feature_process( + self, + obj: Union[pd.DataFrame, Any], + feature_processor_config: dict, + business_logic: dict, + new_feature_funcs: dict, + ) -> Union[pd.DataFrame, Any]: + """ + Apply all transformation to the data and return the same object back (for now) + """ diff --git a/modules/ml-pipeline/src/pipeline/src/dvc.lock b/modules/ml-pipeline/src/pipeline/src/dvc.lock index ed7c57c..01a400f 100644 --- a/modules/ml-pipeline/src/pipeline/src/dvc.lock +++ b/modules/ml-pipeline/src/pipeline/src/dvc.lock @@ -5,8 +5,8 @@ stages: deps: - path: prepare_data.py hash: md5 - md5: 38b0836237bfa25ea0d71ca259610f4d - size: 3623 + md5: 87a83e62512bff93c89f3e93c1ed248d + size: 5593 params: configs/prepare_data.yaml: output_test_filepath: ./data/prepared_data/test.parquet @@ -15,86 +15,108 @@ stages: outs: - path: data/prepared_data/ hash: md5 - md5: f0d462fe6b1a856a827409a745539285.dir - size: 36169 + md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir + size: 4428909 nfiles: 2 build_model: cmd: python build_model.py deps: - path: build_model.py hash: md5 - md5: 152d52b7754b4c6f96f3481dc26562fc - size: 3576 + md5: 662cd6b1562fbbc2c7d30dd0f2375a66 + size: 3948 - path: data/prepared_data hash: md5 - md5: f0d462fe6b1a856a827409a745539285.dir - size: 36169 + md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir + size: 4428909 nfiles: 2 params: configs/build_model.yaml: + AutogluonAutoML: + output_filepath: ./data/model/autogluonmodel/ + problem_type: regression + eval_metric: mean_absolute_error + time_limit: 200 + presets: medium_quality + excluded_model_types: SKLearnLinearRegression: SKLearnSVMRegression: kernel: linear - model_save_filepath: ./data/model/model.joblib - model_type: SKLearnLinearRegression - target: target + model_save_filepath: ./data/model/autogluonmodel/ + model_type: AutogluonAutoML outs: - path: data/model/ hash: md5 - md5: fb7ae4137b445dc91e840b794d72e940.dir - size: 1096 - nfiles: 1 + md5: 04a1e3bc625e7934c9f57a3fa2f1ea5c.dir + size: 1264795580 + nfiles: 28 generate_predictions: cmd: python generate_predictions.py deps: - path: data/model hash: md5 - md5: fb7ae4137b445dc91e840b794d72e940.dir - size: 1096 - nfiles: 1 + md5: 04a1e3bc625e7934c9f57a3fa2f1ea5c.dir + size: 1264795580 + nfiles: 28 - path: data/prepared_data hash: md5 - md5: f0d462fe6b1a856a827409a745539285.dir - size: 36169 + md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir + size: 4428909 nfiles: 2 - path: generate_predictions.py hash: md5 - md5: 424b9d89045eaf8a5a167ab2e0e363ae - size: 3400 + md5: 76c45e7575ec979e6c4c8e2cf754a720 + size: 4225 params: configs/generate_predictions.yaml: + input_dataclient_type: local + output_dataclient_type: local predictions_column_name: predictions predictions_output_filepath: ./data/predictions/predictions.parquet test_data_filepath: ./data/prepared_data/test.parquet outs: - path: data/predictions/ hash: md5 - md5: 4d5854903b25bdae15d99c934ebcfb99.dir - size: 2531 + md5: 44c298a28a0bb1367bb82d5da1a5dbd0.dir + size: 672577 nfiles: 1 generate_metrics: cmd: python generate_metrics.py deps: - path: data/predictions hash: md5 - md5: 4d5854903b25bdae15d99c934ebcfb99.dir - size: 2531 + md5: 44c298a28a0bb1367bb82d5da1a5dbd0.dir + size: 672577 nfiles: 1 - path: data/prepared_data hash: md5 - md5: f0d462fe6b1a856a827409a745539285.dir - size: 36169 + md5: 01a8f8f0b264ac4d61307a67bfa910b4.dir + size: 4428909 nfiles: 2 - path: generate_metrics.py hash: md5 - md5: b456e207b152298428ba79c083d1b6ff - size: 3728 + md5: cc368845f62523575a9ed5c791e27815 + size: 4329 params: configs/generate_metrics.yaml: + dataclient_type: local + input_datahandler_type: parquet metrics_output_filepath: ./metrics/metrics.json metrics_type: Regression + output_datahandler_type: json outs: - path: metrics/metrics.json hash: md5 - md5: 3c9306e992b07491ff7e642949d6bc47 + md5: 3f03e50a419af6730351a5016e2ae98a size: 182 + startup_cleanup: + cmd: python startup_cleanup.py + deps: + - path: startup_cleanup.py + hash: md5 + md5: f7fe2ca33004b34530da0a3ab48c1790 + size: 1458 + params: + configs/startup_cleanup.yaml: + artefacts: ./data + metrics: ./metrics diff --git a/modules/ml-pipeline/src/pipeline/src/dvc.yaml b/modules/ml-pipeline/src/pipeline/src/dvc.yaml index d1febb1..7e98535 100644 --- a/modules/ml-pipeline/src/pipeline/src/dvc.yaml +++ b/modules/ml-pipeline/src/pipeline/src/dvc.yaml @@ -1,4 +1,12 @@ stages: + startup_cleanup: + cmd: python startup_cleanup.py + deps: + - startup_cleanup.py + params: + - configs/startup_cleanup.yaml: + - artefacts + - metrics prepare_data: cmd: python prepare_data.py deps: diff --git a/modules/ml-pipeline/src/pipeline/src/generate_metrics.py b/modules/ml-pipeline/src/pipeline/src/generate_metrics.py index 11b214c..a7def45 100644 --- a/modules/ml-pipeline/src/pipeline/src/generate_metrics.py +++ b/modules/ml-pipeline/src/pipeline/src/generate_metrics.py @@ -11,9 +11,11 @@ from pathlib import Path from core.interface.InterfaceModels import MLModel from core.interface.InterfaceMetrics import MLMetrics from core.interface.InterfaceDataClient import DataClient +from core.interface.InterfaceDataHandler import DataHandler from core.DataClient import dataclient_factory from core.MLModels import model_factory from core.MLMetrics import metrics_factory +from core.DataHandler import datahandler_factory from core.Logger import logger @@ -33,9 +35,14 @@ generate_predictions_params = yaml.safe_load(open(generate_predictions_path)) generate_metrics_path = Path(__file__).parent / "configs" / "generate_metrics.yaml" generate_metrics_params = yaml.safe_load(open(generate_metrics_path)) +feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml" +feature_process_params = yaml.safe_load(open(feature_process_path)) + def generate_metrics( dataclient: DataClient, + input_datahandler: DataHandler, + output_datahandler: DataHandler, model: MLModel, metrics: MLMetrics, target: str, @@ -52,15 +59,17 @@ def generate_metrics( logger.info("--- Loading test data ---") logger.info("-------------------------") - # TODO: replace with client loader here - test_data = pd.read_parquet(test_data_filepath) + test_data = input_datahandler.load_data( + dataclient=dataclient, location=test_data_filepath + ) logger.info("---------------------------") logger.info("--- Loading predictions ---") logger.info("---------------------------") - # TODO: replace with client loader here - predictions = pd.read_parquet(predictions_output_filepath) + predictions = input_datahandler.load_data( + dataclient=dataclient, location=predictions_output_filepath + ) logger.info("--------------------------") logger.info("--- Generating metrics ---") @@ -75,13 +84,9 @@ def generate_metrics( logger.info("--- Saving metrics ---") logger.info("----------------------") - # TODO: replace with client - - if not Path(metrics_output_filepath).parent.exists(): - os.mkdir(Path(metrics_output_filepath).parent) - - with open(metrics_output_filepath, "w") as f: - json.dump(metrics_output, f) + output_datahandler.save_data( + dataclient=dataclient, obj=metrics_output, location=metrics_output_filepath + ) if __name__ == "__main__": @@ -91,14 +96,22 @@ if __name__ == "__main__": logger.info("----------------------------") model = model_factory(build_model_params["model_type"]) - dataclient = dataclient_factory(prepare_data_params["dataclient_type"]) + dataclient = dataclient_factory(generate_metrics_params["dataclient_type"]) + input_datahandler = datahandler_factory( + generate_metrics_params["input_datahandler_type"] + ) + output_datahandler = datahandler_factory( + generate_metrics_params["output_datahandler_type"] + ) metrics = metrics_factory(generate_metrics_params["metrics_type"]) generate_metrics( dataclient=dataclient, + input_datahandler=input_datahandler, + output_datahandler=output_datahandler, model=model, metrics=metrics, - target=build_model_params["target"], + target=feature_process_params["feature_processor_config"]["target"], test_data_filepath=generate_predictions_params["test_data_filepath"], predictions_output_filepath=generate_predictions_params[ "predictions_output_filepath" diff --git a/modules/ml-pipeline/src/pipeline/src/generate_predictions.py b/modules/ml-pipeline/src/pipeline/src/generate_predictions.py index 00d6fce..552db47 100644 --- a/modules/ml-pipeline/src/pipeline/src/generate_predictions.py +++ b/modules/ml-pipeline/src/pipeline/src/generate_predictions.py @@ -10,9 +10,10 @@ import pandas as pd from pathlib import Path from core.interface.InterfaceModels import MLModel from core.interface.InterfaceDataClient import DataClient +from core.interface.InterfaceDataHandler import DataHandler from core.DataClient import dataclient_factory from core.MLModels import model_factory -from core.MLMetrics import metrics_factory +from core.DataHandler import datahandler_factory from core.Logger import logger @@ -29,9 +30,14 @@ generate_predictions_path = ( ) generate_predictions_params = yaml.safe_load(open(generate_predictions_path)) +feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml" +feature_process_params = yaml.safe_load(open(feature_process_path)) + def generate_predictions( - dataclient: DataClient, + input_dataclient: DataClient, + output_dataclient: DataClient, + datahandler: DataHandler, model: MLModel, target: str, model_filepath: str, @@ -47,8 +53,9 @@ def generate_predictions( logger.info("--- Loading test data ---") logger.info("-------------------------") - # TODO: replace with client loader here - test_data = pd.read_parquet(test_data_filepath) + test_data = datahandler.load_data( + dataclient=input_dataclient, location=test_data_filepath + ) logger.info("---------------------") logger.info("--- Loading model ---") @@ -60,7 +67,6 @@ def generate_predictions( logger.info("--- Generating predictions ---") logger.info("------------------------------") - # Clean test data for now prediction_data = ( test_data.drop(columns=target) if target in test_data.columns else test_data ) @@ -71,13 +77,13 @@ def generate_predictions( logger.info("--- Saving predictions ---") logger.info("--------------------------") - # TODO: replace with client + predictions_df = pd.DataFrame(predictions) + predictions_df.columns = [predictions_column_name] - if not Path(predictions_output_filepath).parent.exists(): - os.mkdir(Path(predictions_output_filepath).parent) - - pd.DataFrame(predictions, columns=[predictions_column_name]).to_parquet( - predictions_output_filepath + datahandler.save_data( + dataclient=output_dataclient, + obj=predictions_df, + location=predictions_output_filepath, ) @@ -88,12 +94,23 @@ if __name__ == "__main__": logger.info("----------------------------") model = model_factory(build_model_params["model_type"]) - dataclient = dataclient_factory(prepare_data_params["dataclient_type"]) + # We may have different locations of loading hence why we use one specified in generate_predictions.yaml + # I.e. for metric runs, this will be a local data client + # For predictions, we will want a cloud data client + input_dataclient = dataclient_factory( + generate_predictions_params["input_dataclient_type"] + ) + output_dataclient = dataclient_factory( + generate_predictions_params["output_dataclient_type"] + ) + datahandler = datahandler_factory(prepare_data_params["datahandler_type"]) generate_predictions( - dataclient=dataclient, + input_dataclient=input_dataclient, + output_dataclient=output_dataclient, + datahandler=datahandler, model=model, - target=build_model_params["target"], + target=feature_process_params["feature_processor_config"]["target"], model_filepath=build_model_params["model_save_filepath"], test_data_filepath=generate_predictions_params["test_data_filepath"], predictions_output_filepath=generate_predictions_params[ diff --git a/modules/ml-pipeline/src/pipeline/src/prepare_data.py b/modules/ml-pipeline/src/pipeline/src/prepare_data.py index cb7ed80..53ff8fc 100644 --- a/modules/ml-pipeline/src/pipeline/src/prepare_data.py +++ b/modules/ml-pipeline/src/pipeline/src/prepare_data.py @@ -11,13 +11,21 @@ from pathlib import Path from sklearn.datasets import load_diabetes from sklearn.model_selection import train_test_split from core.interface.InterfaceDataClient import DataClient +from core.interface.InterfaceDataHandler import DataHandler +from core.interface.InterfaceFeatureProcessor import FeatureProcessor +from configs.feature_processor_logic import business_logic, new_feature_funcs from core.Logger import logger from core.DataClient import dataclient_factory +from core.DataHandler import datahandler_factory +from core.FeatureProcessor import feature_processor_factory RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") -params_path = Path(__file__).parent / "configs" / "prepare_data.yaml" -params = yaml.safe_load(open(params_path)) +prepare_data_path = Path(__file__).parent / "configs" / "prepare_data.yaml" +prepare_data_params = yaml.safe_load(open(prepare_data_path)) + +feature_process_path = Path(__file__).parent / "configs" / "feature_processor.yaml" +feature_process_params = yaml.safe_load(open(feature_process_path)) def use_dummy_data() -> pd.DataFrame: @@ -31,8 +39,15 @@ def use_dummy_data() -> pd.DataFrame: def prepare_data( - dataclient: DataClient, + input_dataclient: DataClient, + output_dataclient: DataClient, + datahandler: DataHandler, + feature_processor: FeatureProcessor, + data_filepath: str, train_proportion: float, + feature_processor_config: dict, + business_logic: dict, + new_feature_funcs: dict, output_train_filepath: str = "train.parquet", output_test_filepath: str = "test.parquet", ) -> Tuple[pd.DataFrame, pd.DataFrame]: @@ -46,8 +61,18 @@ def prepare_data( logger.info("--- Loading data ---") logger.info("--------------------") - # TODO: REPLACE THIS WIL CLIENT AND LOAD DATA - data = use_dummy_data() + data = datahandler.load_data(dataclient=input_dataclient, location=data_filepath) + + logger.info("--------------------------") + logger.info("--- Feature Processing ---") + logger.info("--------------------------") + + data = feature_processor.feature_process( + data, + feature_processor_config=feature_processor_config, + business_logic=business_logic, + new_feature_funcs=new_feature_funcs, + ) logger.info("----------------------") logger.info("--- Splitting data ---") @@ -57,29 +82,16 @@ def prepare_data( data, train_size=train_proportion, test_size=(1 - train_proportion) ) - logger.info("--------------------------") - logger.info("--- Feature Processing ---") - logger.info("--------------------------") - logger.info("-----------------------") logger.info("--- Outputting data ---") logger.info("-----------------------") - # TODO: REPLACE WITH CLIENT - output_directory = Path(output_train_filepath) - if not output_directory.parent.exists(): - os.makedirs(output_directory.parent) - - output_directory = Path(output_test_filepath) - if not output_directory.parent.exists(): - os.makedirs(output_directory.parent) - - logger.info("--- Outputting train and test data ---") - train.to_parquet(output_train_filepath) - test.to_parquet(output_test_filepath) - - # client.save_data(obj=train) - # client.save_data(obj=test) + datahandler.save_data( + dataclient=output_dataclient, obj=train, location=output_train_filepath + ) + datahandler.save_data( + dataclient=output_dataclient, obj=test, location=output_test_filepath + ) return train, test @@ -94,17 +106,51 @@ if __name__ == "__main__": logger.info(f"--- Initiate DataClient ---") logger.info("----------------------------") - dataclient = dataclient_factory(params["dataclient_type"]) + input_dataclient = dataclient_factory(prepare_data_params["input_dataclient_type"]) + output_dataclient = dataclient_factory( + prepare_data_params["output_dataclient_type"] + ) + + input_dataclient.ingest_configurations( + config=prepare_data_params["input_dataclient"] + ) + input_dataclient.establish_client() + + output_dataclient.ingest_configurations( + config=prepare_data_params["output_dataclient"] + ) + output_dataclient.establish_client() + + logger.info("-----------------------------") + logger.info(f"--- Initiate DataHandler ---") + logger.info("-----------------------------") + + datahandler = datahandler_factory(prepare_data_params["datahandler_type"]) + + logger.info("----------------------------------") + logger.info(f"--- Initiate FeatureProcessor ---") + logger.info("----------------------------------") + + feature_processor = feature_processor_factory( + feature_process_params["feature_processor_type"] + ) logger.info("---------------------------") logger.info(f"--- Prepare Data Stage ---") logger.info("---------------------------") prepare_data( - dataclient=dataclient, - train_proportion=params["train_proportion"], - output_train_filepath=params["output_train_filepath"], - output_test_filepath=params["output_test_filepath"], + input_dataclient=input_dataclient, + output_dataclient=output_dataclient, + datahandler=datahandler, + feature_processor=feature_processor, + data_filepath=prepare_data_params["data_filepath"], + train_proportion=prepare_data_params["train_proportion"], + output_train_filepath=prepare_data_params["output_train_filepath"], + output_test_filepath=prepare_data_params["output_test_filepath"], + feature_processor_config=feature_process_params["feature_processor_config"], + business_logic=business_logic, + new_feature_funcs=new_feature_funcs, ) logger.info("-------------------------------") diff --git a/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements-dev.txt b/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements-dev.txt index 5aac406..b4679d0 100644 --- a/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements-dev.txt +++ b/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements-dev.txt @@ -1,7 +1,7 @@ joblib==1.3.2 boto3==1.28.17 pandas==1.5.3 -scikit-learn==1.3.0 +autogluon==0.8.2 pyarrow==13.0.0 pre-commit==3.3.3 sphinx==7.2.5 diff --git a/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements.txt b/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements.txt index ab7c886..8f5fb55 100644 --- a/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements.txt +++ b/modules/ml-pipeline/src/pipeline/src/requirements/predictions/requirements.txt @@ -1,6 +1,6 @@ joblib==1.3.2 boto3==1.28.17 pandas==1.5.3 -scikit-learn==1.3.0 +autogluon==0.8.2 pyarrow==13.0.0 PyYAML==6.0.1 diff --git a/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements-dev.txt b/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements-dev.txt index 5aac406..b4679d0 100644 --- a/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements-dev.txt +++ b/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements-dev.txt @@ -1,7 +1,7 @@ joblib==1.3.2 boto3==1.28.17 pandas==1.5.3 -scikit-learn==1.3.0 +autogluon==0.8.2 pyarrow==13.0.0 pre-commit==3.3.3 sphinx==7.2.5 diff --git a/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements.txt b/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements.txt index 196dfe7..11f11a1 100644 --- a/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements.txt +++ b/modules/ml-pipeline/src/pipeline/src/requirements/training/requirements.txt @@ -1,3 +1,3 @@ boto3==1.28.41 pandas==1.5.3 -scikit-learn==1.3.0 +autogluon==0.8.2 diff --git a/modules/ml-pipeline/src/pipeline/src/startup_cleanup.py b/modules/ml-pipeline/src/pipeline/src/startup_cleanup.py new file mode 100644 index 0000000..d30308c --- /dev/null +++ b/modules/ml-pipeline/src/pipeline/src/startup_cleanup.py @@ -0,0 +1,50 @@ +""" +We remove all previous artefacts in the data folder for a dvc run +""" + +import shutil +import yaml +from pathlib import Path +from core.Logger import logger + +startup_cleanup_path = Path(__file__).parent / "configs" / "startup_cleanup.yaml" +startup_cleanup_params = yaml.safe_load(open(startup_cleanup_path)) + + +def run_cleanup(artefacts_directory: str, metrics_directory: str) -> None: + """ + Remove the directory where artefacts are stored + """ + artefact_directory_path = Path(artefacts_directory) + + if artefact_directory_path.exists(): + + logger.info(f"Removing the directory: {artefacts_directory}") + shutil.rmtree(artefact_directory_path) + + metrics_directory_path = Path(metrics_directory) + + if metrics_directory_path.exists(): + + logger.info(f"Removing the directory: {metrics_directory}") + shutil.rmtree(metrics_directory_path) + + +if __name__ == "__main__": + + logger.info("----------------------------") + logger.info(f"--- {__file__} - Start! ---") + logger.info("----------------------------") + + logger.info("---------------------") + logger.info(f"--- Run Clean up ---") + logger.info("---------------------") + + run_cleanup( + artefacts_directory=startup_cleanup_params["artefacts"], + metrics_directory=startup_cleanup_params["metrics"], + ) + + logger.info("-------------------------------") + logger.info(f"--- {__file__} - Complete! ---") + logger.info("-------------------------------")