diff --git a/.gitignore b/.gitignore index be9da3aa..4e43467e 100644 --- a/.gitignore +++ b/.gitignore @@ -127,6 +127,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.training_env/ # Spyder project settings .spyderproject @@ -255,4 +256,5 @@ model_data/.idea/ model_data/simulation_system/.idea/ model_data/simulation_system/data* - +model_data/simulation_system/model_directory/ +model_data/simulation_system/predictions/ diff --git a/model_data/simulation_system/.pre-commit-config.yaml b/model_data/simulation_system/.pre-commit-config.yaml new file mode 100644 index 00000000..cbc34112 --- /dev/null +++ b/model_data/simulation_system/.pre-commit-config.yaml @@ -0,0 +1,11 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.3.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace +- repo: https://github.com/psf/black + rev: 22.10.0 + hooks: + - id: black \ No newline at end of file diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.prediction b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction new file mode 100644 index 00000000..c0efdb5f --- /dev/null +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction @@ -0,0 +1,37 @@ +FROM python:3.10.12-slim as release + +ARG USER=nroot +ARG UID=1000 +ARG GID=100 + +# Install patches +RUN apt-get update && apt-get upgrade -y \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists + +# Install python packages +COPY ../requirements/predictions/predictions.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Create and configure non-root user +RUN useradd \ + --create-home \ + --uid ${UID} \ + --gid ${GID} \ + --shell /bin/bash \ + ${USER} + +# Copy the project code to user home directory +COPY --chown=${UID}:${GID} ./core /home/simulation_system/core +COPY --chown=${UID}:${GID} ./MLModel /home/simulation_system/MLModel +COPY --chown=${UID}:${GID} ./predictions.py /home/simulation_system/predictions.py + +# FOR TESTING +# COPY --chown=${UID}:${GID} ./model_build_data /home/simulation_system/model_build_data + +# Switch user +USER ${USER} +WORKDIR /home/simulation_system + +# Run the python command +CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"] diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.prediction.lambda b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction.lambda new file mode 100644 index 00000000..8f9a045e --- /dev/null +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction.lambda @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.10 + +# Install python packages +COPY ../requirements/predictions/predictions.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the project code to user home directory +COPY ./core ${LAMBDA_TASK_ROOT}/simulation_system/core +COPY ./MLModel ${LAMBDA_TASK_ROOT}/simulation_system/MLModel +COPY ./predictions.py ${LAMBDA_TASK_ROOT}/simulation_system/predictions.py +COPY ./handlers/predictions_app.py ${LAMBDA_TASK_ROOT}/simulation_system/predictions_app.py + +# Run off a lambda trigger +CMD [ "prediction_app.handler" ] diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.training b/model_data/simulation_system/Dockerfiles/Dockerfile.training new file mode 100644 index 00000000..dcba0499 --- /dev/null +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.training @@ -0,0 +1,37 @@ +FROM python:3.10.12-slim as release + +ARG USER=nroot +ARG UID=1000 +ARG GID=100 + +# Install patches +RUN apt-get update && apt-get upgrade -y \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists + +# Install python packages +COPY ../requirements/training/training.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Create and configure non-root user +RUN useradd \ + --create-home \ + --uid ${UID} \ + --gid ${GID} \ + --shell /bin/bash \ + ${USER} + +# Copy the project code to user home directory +COPY --chown=${UID}:${GID} ./core /home/simulation_system/core +COPY --chown=${UID}:${GID} ./MLModel /home/simulation_system/MLModel +COPY --chown=${UID}:${GID} ./training.py /home/simulation_system/training.py + +# FOR TESTING +# COPY --chown=${UID}:${GID} ./model_build_data /home/simulation_system/model_build_data + +# Switch user +USER ${USER} +WORKDIR /home/simulation_system + +# Run the python command +CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"] diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.training.lambda b/model_data/simulation_system/Dockerfiles/Dockerfile.training.lambda new file mode 100644 index 00000000..34d3fe42 --- /dev/null +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.training.lambda @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.10 + +# Install python packages +COPY ../requirements/training/training.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the project code to user home directory +COPY ./core ${LAMBDA_TASK_ROOT}/simulation_system/core +COPY ./MLModel ${LAMBDA_TASK_ROOT}/simulation_system/MLModel +COPY ./training.py ${LAMBDA_TASK_ROOT}/simulation_system/training.py +COPY ./handlers/training_app.py ${LAMBDA_TASK_ROOT}/simulation_system/training_app.py + +# Run off a lambda trigger +CMD [ "training_app.handler" ] diff --git a/model_data/simulation_system/MLModel/BaseMLModel.py b/model_data/simulation_system/MLModel/BaseMLModel.py index 42106a33..e631880d 100644 --- a/model_data/simulation_system/MLModel/BaseMLModel.py +++ b/model_data/simulation_system/MLModel/BaseMLModel.py @@ -5,19 +5,20 @@ This is the base protocol: Key tasks: - Template Model class for different model types - Save model -- Load Model +- Load Model - Generate Inference """ +from numpy import ndarray from pathlib import Path -from typing import Protocol, NamedTuple +from typing import Protocol, NamedTuple, Any import pandas as pd class MLModel(Protocol): - ''' + """ Base ML Model protocol - ''' + """ def load_model(self, filepath: Path) -> None: """ @@ -30,21 +31,23 @@ class MLModel(Protocol): """ def train_model( - self, - data: pd.DataFrame, - target_column: str, - hyperparameter: dict - ) -> None: + self, data: pd.DataFrame, target_column: str, hyperparameters: dict + ) -> None: """ For the given data and hyperparameters (specified to the model), a model is trained """ - def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame: + def generate_predictions(self, data: pd.DataFrame) -> ndarray[Any, Any] | None: """ For the given dataframe, model is loaded and predictions are generated """ - def model_evaluation(self, validation_data: pd.DataFrame, target_column: str, metrics_location: Path = None) -> NamedTuple: + def model_evaluation( + self, + validation_data: pd.DataFrame, + target_column: str, + metrics_location: Path | None = None, + ) -> pd.DataFrame | None: """ For any validation data, a set of predictions and metrics are return """ @@ -53,7 +56,8 @@ class MLModel(Protocol): """ Perfomance post processing on Model to ensure ready for deployment """ - - def generate_meta_data(self): + + def model_metadata(self) -> dict | None: """ + Extract out model metadata as dictionary """ diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index ccf6fdf8..dcf6a387 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -1,22 +1,45 @@ """ -Different implementations of the MLModel Protocol +Different implementations of the MLModel Protocol Uses the BaseMLModel protocol Key tasks: - Template Model class for different model types - Save model -- Load Model +- Load Model - Generate Inference """ -from typing import NamedTuple +from typing import Any from pathlib import Path import pandas as pd from autogluon.tabular import TabularDataset, TabularPredictor from sklearn.metrics import mean_absolute_percentage_error -from model_data.simulation_system.core.Logger import logger +from core.Logger import logger +from core.Metrics import Metrics +from core.Settings import METRIC_FILENAME +from MLModel.BaseMLModel import MLModel -AUTOGLUON_HYPERPARAMETERS = ['problem_type', 'eval_metric', 'time_limit', 'presets', 'excluded_model_types'] -METRIC_FILENAME = "metrics.csv" +AUTOGLUON_HYPERPARAMETERS = [ + "problem_type", + "eval_metric", + "time_limit", + "presets", + "excluded_model_types", +] + + +def model_factory(model_type: str, hyperparameters: dict) -> dict: + """ + Use factory pattern to register the different ML implementations + """ + + model_types = { + "autogluon": { + "model": AutogluonModel, + "naming_attributes": f"{hyperparameters['presets']}-{hyperparameters['time_limit']}", + }, + } + + return model_types[model_type] class AutogluonModel: @@ -24,28 +47,28 @@ class AutogluonModel: Autogluon model that implements the MLModel Protocol """ - def __init__(self, output_filepath: Path = None) -> None: + def __init__(self, output_filepath: Path | None = None) -> None: self.model = None self.output_filepath = output_filepath self.predictions = None - def load_model(self, filepath: Path) -> None: + def load_model(self, filepath: str | Path) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ + filepath = str(filepath) + self.model = TabularPredictor.load(path=filepath) - def save_model(self, output_filepath: Path = None) -> None: + def save_model(self, output_filepath: Path | None = None) -> None: """ Providing a path, this function will save the model to be used. """ logger.info("Using AutoGluon Model - Model saving already occured") def train_model( - self, - data: pd.DataFrame, - target_column: str, - hyperparameters: dict = None) -> None: + self, data: pd.DataFrame, target_column: str, hyperparameters: dict + ) -> None: """ For the given data and hyperparameters, a model is trained """ @@ -54,7 +77,9 @@ class AutogluonModel: exit(1) if set(AUTOGLUON_HYPERPARAMETERS) != set(hyperparameters.keys()): - print("Hyperparameters (dict) is incorrectly defined - please check what hyperparameters are required") + print( + "Hyperparameters (dict) is incorrectly defined - please check what hyperparameters are required" + ) exit(1) AGdata = TabularDataset(data=data) @@ -62,16 +87,16 @@ class AutogluonModel: self.model = TabularPredictor( label=target_column, path=self.output_filepath, - problem_type=hyperparameters['problem_type'], - eval_metric=hyperparameters['eval_metric'] + problem_type=hyperparameters["problem_type"], + eval_metric=hyperparameters["eval_metric"], ).fit( AGdata, - time_limit=hyperparameters['time_limit'], - presets=hyperparameters['presets'], - excluded_model_types=hyperparameters['excluded_model_types'] + time_limit=hyperparameters["time_limit"], + presets=hyperparameters["presets"], + excluded_model_types=hyperparameters["excluded_model_types"], ) - def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame: + def generate_predictions(self, data: pd.DataFrame) -> pd.Series: """ For the given dataframe, model is loaded and predictions are generated """ @@ -80,7 +105,7 @@ class AutogluonModel: print("No model loaded/ trained") exit(1) - predictions = self.model.predict(data) + predictions = pd.Series(self.model.predict(data)) return predictions @@ -88,30 +113,31 @@ class AutogluonModel: self, validation_data: pd.DataFrame, target_column: str, - metrics_location: Path = None, - metric_filename: str = METRIC_FILENAME + metrics: Metrics, + metrics_location: Path | None = None, + metric_filename: str = METRIC_FILENAME, ) -> pd.DataFrame: """ For any validation data, a set of predictions and metrics are return """ if metrics_location is None: logger.warning("Metrics will be outputted to current folder") + metrics_location = Path() if self.model is None: logger.error("No model loaded/ trained - Unable to generate evaluation") exit(1) - performance = self.model.evaluate(validation_data) + # Generate prediction, load metrics suite, generate metrics betweeen the two predictions = self.generate_predictions(validation_data) + performance = metrics.generate_metric_suite( + actuals=validation_data[target_column], predictions=predictions + ) + logger.info("Prediction used for evaluations are saved in self.prediction") self.predictions = predictions - # TODO: Can have a custom metric class that defines all different metrics we want - metric_mape = mean_absolute_percentage_error(validation_data[target_column], predictions) - - performance['mape'] = metric_mape - logger.info("Saving metric file as metric.csv") metrics_location.mkdir(exist_ok=True) @@ -122,7 +148,9 @@ class AutogluonModel: return metrics_df - def optimise_model_for_deployment(self, deployment_path: Path = None) -> str: + def optimise_model_for_deployment( + self, deployment_path: Path | str | None = None + ) -> Any: """ We can optimise the deployment for a autogluon model """ @@ -132,5 +160,18 @@ class AutogluonModel: if deployment_path is None: raise ValueError("Deployment path required") + deployment_path = str(deployment_path) + # This will return a string path of the location return self.model.clone_for_deployment(deployment_path) + + def model_metadata(self) -> dict[str, Any]: + """ + For Autogluon model, use the inbuilt model info method + """ + + if self.model is None: + logger.error("No Model loaded/ trained") + exit(1) + + return self.model.info() diff --git a/model_data/simulation_system/Makefile b/model_data/simulation_system/Makefile index 97df8d3e..4aad42b1 100644 --- a/model_data/simulation_system/Makefile +++ b/model_data/simulation_system/Makefile @@ -1,14 +1,33 @@ +export PYENV_ROOT=$(HOME)/.pyenv +export PATH := $(PYENV_ROOT)/bin:$(PATH) +PYTHON_VERSION ?= 3.10.12 + .PHONY: init init: build docker +.PHONY: training_env +env: + pyenv install $(PYTHON_VERSION) || echo "Proceeding..." # || is to swallow non-zero response if python version already is installed + pyenv global $(PYTHON_VERSION) + python3 -m venv .training_env + . .training_env/bin/activate + pip install --upgrade pip + pip install -r requirements/training/training-dev.txt && pre-commit install + + echo " --- TO ACTIVATE THE ENVIRONMENT --- " + echo "Run source .training_env/bin/activate to activate the virtual environment" + +.PHONY: check-all +check-all: pre-commit run -a + .PHONY: build -build: +build: docker-compose build .PHONY: docker -docker: +docker: docker-compose up -d .PHONY: down -down: - docker compose down \ No newline at end of file +down: + docker compose down diff --git a/model_data/simulation_system/README.md b/model_data/simulation_system/README.md index b6fe8327..e98767ea 100644 --- a/model_data/simulation_system/README.md +++ b/model_data/simulation_system/README.md @@ -3,6 +3,13 @@ Starter Readme: Steps for pipeline: +- (WIP) Set up the training development environment + - Change directory to this folder (simulation_system) + - Run the following command `make env PYTHON_VERSION=3.10.12` + - This will install the specified python version using `pyenv` and select this version as the global python version + - It will install all training packages as specified in the training-dev.txt requirements file, including the pre-commit hooks + - Run `source .training_env/bin/activate` to use this environment + - (WIP) Use Makefile to start up mock up s3 service - By running `make init`, this will run the `docker-compose build` and `docker-compose up -d`, which spins up a S3 service - This docker compose is running in detached mode `-d`, so will no output anything to the terminal @@ -27,7 +34,7 @@ Steps for pipeline: - Once model build is finished, you can run the `prediction.py` file to generate prediction - By default, the prediction pipeline will select the best model based on **mean absolute error** from the model registry - - This can be overwritten by specifying a model_path, which will load an alternative model + - This can be overwritten by specifying a model_path, which will load an alternative model - There are two ways of getting data into the pipeline: - Using the `--data` argument: - This is a JSON string which can be passed as `python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'` @@ -55,12 +62,16 @@ Steps for pipeline: - Add precommit hooks (linters, branch names, etc) - Sphinx documentation - Sort out local mock up services - - Sort out Model Registry + - Sort out Model Registry - Sort out Data version control + - pre-commit hooks: + - The types of hooks that we want (safety, bandit, iso8 etc) + - The customisations we require + - Add sphinx documentation - Data Science: - - Implement a metrics class, to hold all metric + - Implement a metrics class, to hold all metric - Rebuild metrics script (Could be a one off but good to have) - - Determine metrics + - Determine metrics - Implement and test custom model (Tensorflow Decision Trees etc) - Orchestration: - - Lambda handler for the pipeline \ No newline at end of file + - Lambda handler for the pipeline diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index dcd7af16..352815bf 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -1,25 +1,118 @@ import pandas as pd import os +from typing import Protocol -class DataLoader: +class DataLoader(Protocol): + """ + Interface for all DataLoader classes + """ @staticmethod - def load(filepath: str, index_col: str = None) -> pd.DataFrame: + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None: """ - Load different datasets + Loading data from the relevant source """ + +class LocalDataLoader: + """ + Implements the DataLoader Protocol for local files + """ + + @staticmethod + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: + if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") - if filepath.endswith('.parquet'): + if filepath.endswith(".parquet"): df = pd.read_parquet(filepath) if index_col is not None: df = df.set_index(index_col) - elif filepath.endswith('.csv'): + elif filepath.endswith(".csv"): df = pd.read_csv(filepath, index_col=index_col) else: raise ValueError(f"File format not supported for file: {filepath}") return df + + +class S3MockDataLoader: + """ + Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service + """ + + @staticmethod + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: + + # TODO: Ingest these as environment variables in the docker compose file + storage_options = { + "key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"), + "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), + "client_kwargs": { + "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") + }, + } + + if filepath.endswith(".parquet"): + df = pd.read_parquet(filepath, storage_options=storage_options) + if index_col is not None: + df = df.set_index(index_col) + elif filepath.endswith(".csv"): + df = pd.read_csv( + filepath, index_col=index_col, storage_options=storage_options + ) + else: + raise ValueError(f"File format not supported for file: {filepath}") + + return df + + +class S3DataLoader: + """ + Implements the DataLoader Protocol for s3 files + """ + + @staticmethod + def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: + + storage_options = { + "key": os.environ.get("AWS_ACCESS_KEY_ID"), + "secret": os.environ.get("AWS_SECRET_ACCESS_KEY"), + } + + if filepath.endswith(".parquet"): + df = pd.read_parquet(filepath, storage_options=storage_options) + if index_col is not None: + df = df.set_index(index_col) + elif filepath.endswith(".csv"): + df = pd.read_csv( + filepath, index_col=index_col, storage_options=storage_options + ) + else: + raise ValueError(f"File format not supported for file: {filepath}") + + return df + + +def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: + """ + Use factory pattern to determine which loading method we use + """ + + if runtime_environment is None: + runtime_environment = "local" + + dataloader_types = { + "local": LocalDataLoader(), + "local-mock": S3MockDataLoader(), + "dev": S3DataLoader(), + "staging": S3DataLoader(), + "prod": S3DataLoader(), + } + + if runtime_environment not in dataloader_types: + raise ValueError("Incorrect runtime environment specified") + + return dataloader_types[runtime_environment] diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index 7b50f486..4c37176d 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -1,7 +1,7 @@ from pathlib import Path import numpy as np import pandas as pd -from model_data.BaseUtility import Definitions +from BaseUtility import Definitions from simulation_system.core.Settings import ( DATA_PROCESSOR_SETTINGS, EARLIEST_EPC_DATE, @@ -11,7 +11,7 @@ from simulation_system.core.Settings import ( TOTAL_FLOOR_AREA_NATIONAL_AVERAGE, FLOOR_LEVEL_MAP, BUILT_FORM_REMAP, - COLUMNS_TO_MERGE_ON + COLUMNS_TO_MERGE_ON, ) from typing import List @@ -23,7 +23,6 @@ class DataProcessor: def __init__(self, filepath: Path) -> None: self.filepath = filepath - self.data = None def load_data(self, low_memory=False) -> None: self.data = pd.read_csv(self.filepath, low_memory=low_memory) @@ -32,16 +31,20 @@ class DataProcessor: """ Load data and begin initial cleaning """ - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory']) + self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) self.confine_data() - # TODO: CLean number of heated rooms and habitable rooms - self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings']) + # TODO: CLean number of heated rooms and habitable rooms + self.recast_df_columns( + column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"] + ) self.clean_multi_glaze_proportion() - self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count']) + self.retain_multiple_epc_properties( + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"] + ) self.remap_columns() - if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1: + if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1: # If we have multiple EPC records, we can try and do filling self.fill_na_fields() @@ -53,11 +56,15 @@ class DataProcessor: """ If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields """ - # Each uprn can fille backward from recent and forward fill from oldest + # Each uprn can fille backward from recent and forward fill from oldest # The groupby changes the order and we use the index to make the original data - filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply( - lambda group: group.fillna(method='bfill').fillna(method='ffill') - ).reset_index().set_index('level_1').sort_index() + filled_data = ( + self.data.groupby("UPRN", group_keys=True)[columns_to_fill] + .apply(lambda group: group.fillna(method="bfill").fillna(method="ffill")) + .reset_index() + .set_index("level_1") + .sort_index() + ) self.data[columns_to_fill] = filled_data[columns_to_fill] @@ -67,15 +74,20 @@ class DataProcessor: """ # Map all anomaly values to None - data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES))) + data_anomaly_map = dict( + zip( + Definitions.DATA_ANOMALY_MATCHES, + [None] * len(Definitions.DATA_ANOMALY_MATCHES), + ) + ) # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values data = self.data.replace(data_anomaly_map) data = data.replace(np.NAN, None) # Remap certain columns - data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP) - data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP) + data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) + data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) self.data = data @@ -84,80 +96,130 @@ class DataProcessor: def median_without_missing(group): return group[AVERAGE_FIXED_FEATURES].median(skipna=True) - cleaning_averages = self.data.groupby( - ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], - observed=True, - dropna=False - ).apply(median_without_missing).reset_index() + cleaning_averages = ( + self.data.groupby( + [ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "NUMBER_HABITABLE_ROOMS", + "NUMBER_HEATED_ROOMS", + ], + observed=True, + dropna=False, + ) + .apply(median_without_missing) + .reset_index() + ) - general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply( - median_without_missing).reset_index() + general_averages = ( + self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True) + .apply(median_without_missing) + .reset_index() + ) - property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply( - median_without_missing).reset_index() + property_averages = ( + self.data.groupby(["PROPERTY_TYPE"], observed=True) + .apply(median_without_missing) + .reset_index() + ) - built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply( - median_without_missing).reset_index() + built_form_averages = ( + self.data.groupby(["BUILT_FORM"], observed=True) + .apply(median_without_missing) + .reset_index() + ) # We can clean up any NA's in the cleaning averages with the general averages here - cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'], - suffixes=['', '_AVERAGE']) - cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'], - suffixes=['', '_PROPERTY_AVERAGE']) - cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'], - suffixes=['', '_BUILT_FORM_AVERAGE']) + cleaning_averages_filled = pd.merge( + cleaning_averages, + general_averages, + on=["PROPERTY_TYPE", "BUILT_FORM"], + suffixes=["", "_AVERAGE"], + ) + cleaning_averages_filled = pd.merge( + cleaning_averages_filled, + property_averages, + on=["PROPERTY_TYPE"], + suffixes=["", "_PROPERTY_AVERAGE"], + ) + cleaning_averages_filled = pd.merge( + cleaning_averages_filled, + built_form_averages, + on=["BUILT_FORM"], + suffixes=["", "_BUILT_FORM_AVERAGE"], + ) # Replace any missing NAN values with averages for the same Property type and built form - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE']) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE']) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_AVERAGE"]) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_AVERAGE"]) cleaning_averages_filled = cleaning_averages_filled.drop( - columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE']) + columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + ) # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope # and built form # We can use just the property type average and replace - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE']) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE']) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_PROPERTY_AVERAGE"]) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_PROPERTY_AVERAGE"]) cleaning_averages_filled = cleaning_averages_filled.drop( - columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE']) + columns=[ + "TOTAL_FLOOR_AREA_PROPERTY_AVERAGE", + "FLOOR_HEIGHT_PROPERTY_AVERAGE", + ] + ) # If there are still NA values, use BUILT FORM averages - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE']) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE']) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE"]) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT_BUILT_FORM_AVERAGE"]) cleaning_averages_filled = cleaning_averages_filled.drop( - columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE']) + columns=[ + "TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE", + "FLOOR_HEIGHT_BUILT_FORM_AVERAGE", + ] + ) # If there still is na values, use average across all properties in consituecy - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean()) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - cleaning_averages_filled['FLOOR_HEIGHT'].mean()) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA"].mean()) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(cleaning_averages_filled["FLOOR_HEIGHT"].mean()) # If the consituency is all NA values, then take UK AVERAGE VALUES - cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna( - TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) - cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna( - FLOOR_HEIGHT_NATIONAL_AVERAGE) + cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[ + "TOTAL_FLOOR_AREA" + ].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE) + cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[ + "FLOOR_HEIGHT" + ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) return cleaning_averages_filled def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None: - ''' + """ Reduce the data futher by keeping only datasets with multiple epcs - ''' + """ counts = self.data.groupby("UPRN").size().reset_index() counts.columns = ["UPRN", "count"] # take UPRNS with multiple EPCs counts = counts[counts["count"] > epc_minimum_count] - self.data = pd.merge(self.data, counts, on='UPRN') + self.data = pd.merge(self.data, counts, on="UPRN") def recast_df_columns(self, column_mappings: dict) -> None: """ @@ -166,7 +228,7 @@ class DataProcessor: for key, values in column_mappings.items(): if key not in self.data.columns: - print('Column mapping incorrectly specified') + print("Column mapping incorrectly specified") exit(1) for value in values: self.data[key] = self.data[key].astype(value) @@ -189,13 +251,16 @@ class DataProcessor: self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"] - self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])] + self.data = self.data[ + ~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"]) + ] def clean_multi_glaze_proportion(self) -> None: """ If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100 """ - no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & ( - self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) - self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100 + no_multi_glaze_proportion_index = pd.isnull( + self.data["MULTI_GLAZE_PROPORTION"] + ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) + self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 diff --git a/model_data/simulation_system/core/FeatureProcessor.py b/model_data/simulation_system/core/FeatureProcessor.py index cefcee9b..a425e972 100644 --- a/model_data/simulation_system/core/FeatureProcessor.py +++ b/model_data/simulation_system/core/FeatureProcessor.py @@ -4,10 +4,10 @@ Create additional features from the dataset import pandas as pd from typing import List -from model_data.simulation_system.core.Logger import logger +from core.Logger import logger -RDSAP_CHANGE_DROP_COLUMNS = ['UPRN', 'HEAT_DEMAND_CHANGE'] -HEAT_DEMAND_CHANGE_DROP_COLUMNS = ['UPRN', 'RDSAP_CHANGE'] +RDSAP_CHANGE_DROP_COLUMNS = ["UPRN", "HEAT_DEMAND_CHANGE"] +HEAT_DEMAND_CHANGE_DROP_COLUMNS = ["UPRN", "RDSAP_CHANGE"] RANDOM_SEED = 0 @@ -18,7 +18,9 @@ class FeatureProcessor: """ @staticmethod - def drop_unused_columns(df: pd.DataFrame, target_column: str = "RDSAP_CHANGE") -> pd.DataFrame: + def drop_unused_columns( + df: pd.DataFrame, target_column: str = "RDSAP_CHANGE" + ) -> pd.DataFrame: """ Remove the unused columns for RDS """ @@ -29,15 +31,15 @@ class FeatureProcessor: return df @staticmethod - def retain_features(df: pd.DataFrame, features: List[str] = None): + def retain_features(df: pd.DataFrame, features: List[str] | None = None): """ Determine which columns to keep for modelling """ if features is None: - features = df.columns + features = df.columns.to_list() else: if not set(features).issubset(df.columns): - logger.error('Features defined is not contained in data') + logger.error("Features defined is not contained in data") exit(1) df = df[features] @@ -45,7 +47,9 @@ class FeatureProcessor: return df @staticmethod - def subsample_data(df: pd.DataFrame, subsample_amount: int = None) -> pd.DataFrame: + def subsample_data( + df: pd.DataFrame, subsample_amount: int | None = None + ) -> pd.DataFrame: """ Sample data to reduce number of rows for model building if needed """ @@ -58,8 +62,8 @@ class FeatureProcessor: self, df: pd.DataFrame, target_column: str = "RDSAP_CHANGE", - features: List[str] = None, - subsample_amount: int = None + features: List[str] | None = None, + subsample_amount: int | None = None, ) -> pd.DataFrame: """ Pipeline to get data ready for building a model diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py new file mode 100644 index 00000000..99edae92 --- /dev/null +++ b/model_data/simulation_system/core/Metrics.py @@ -0,0 +1,130 @@ +""" +Generate metrics and enable regeneration of metrics if new metrics are generated +Key tasks: +- Specify metric functions that take in prediction vs actual to generate a metric value +- Given a model and test data, produce a suite of all metrics +""" + +import pandas as pd +from pathlib import Path +import seaborn as sns +import matplotlib.pyplot as plt +from core.Settings import ( + RESIDUAL_TRUE_LABEL, + RESIDUAL_PREDICTION_LABEL, + SEABORN_RESIDUAL_AXIS_FONTSIZE, + SEABORN_RESIDUAL_TITLE_FONTSIZE, + SEABORN_RESIDUAL_STYLE, + SEABORN_RESIDUAL_ASPECT_RATIO, + SEABORN_RESIDUAL_PLOT_DPI, + SEABORN_RESIDUAL_RANGE, + SEABORN_RESIDUAL_LINE_COLOUR, + SEABORN_RESIDUAL_LINE_WIDTH, +) +from sklearn.metrics import ( + mean_absolute_error, + median_absolute_error, + mean_squared_error, + mean_absolute_percentage_error, +) + + +# Dummy example of new metric that can be added - must be true and prediction as arguments +def max_error(y_true: pd.Series, y_pred: pd.Series): + return max(y_true - y_pred) + + +METRIC_TO_APPLY = [ + mean_absolute_error, + median_absolute_error, + mean_squared_error, + mean_absolute_percentage_error, + # max_error +] + + +def sort_by_metric( + data: pd.DataFrame, optimse_metric: str, best_model_column_name: str +) -> pd.DataFrame: + """ + Helper function to sort data frame by metric and append a best model flag + """ + # Ascending as we want lowest error values + data = data.sort_values(optimse_metric, ascending=True).reset_index(drop=True) + data[best_model_column_name] = [False] * len(data) + data.loc[0, best_model_column_name] = True + + return data + + +class Metrics: + """ + All metric functions used to generate a dictionary of metrics + """ + + @staticmethod + def list_metric_functions() -> list: + """ + Gather all metric functions to run + """ + return [metric_to_apply.__name__ for metric_to_apply in METRIC_TO_APPLY] + + @staticmethod + def generate_metric_suite(actuals: pd.Series, predictions: pd.Series) -> pd.Series: + """ + For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values + """ + + metric_dict = {} + for metric_function in METRIC_TO_APPLY: + metric_dict[metric_function.__name__] = metric_function( + actuals, predictions + ) + + metrics = pd.Series(metric_dict) + + return metrics + + @staticmethod + def generate_plot_suite(): + """ + Can do all metric ploting + """ + + @staticmethod + def generate_residual_plot( + actuals: pd.Series, + predictions: pd.Series, + target_column: str, + output_filepath: Path | str, + ): + + # TODO: can have a model.metric_outputs method + # FOr not just do it here + residual_df = pd.DataFrame( + list(zip(actuals, predictions)), + columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL], + ) + + # image formatting + sns.set(style=SEABORN_RESIDUAL_STYLE) + ax = sns.scatterplot( + x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df + ) + ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO) + ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE) + ax.set_ylabel( + f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE + ) # ylabel + ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE) + + # Square aspect ratio + ax.plot( + SEABORN_RESIDUAL_RANGE, + SEABORN_RESIDUAL_RANGE, + SEABORN_RESIDUAL_LINE_COLOUR, + linewidth=SEABORN_RESIDUAL_LINE_WIDTH, + ) + + plt.tight_layout() + plt.savefig(output_filepath, dpi=SEABORN_RESIDUAL_PLOT_DPI) diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index e562a39b..f595a3d2 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -1,45 +1,65 @@ -# Using a simply python file as settings for now +# Using a simply python file as settings for now # TODO: migrate to dynaconf from pathlib import Path +METRIC_FILENAME = "metrics.csv" + +OPTIMISE_METRIC = "mean_absolute_error" +BEST_MODEL_COLUMN_NAME = "best_model" + +# TODO: remove these setting elsewhere for CML +RESIDUAL_TRUE_LABEL = "true" +RESIDUAL_PREDICTION_LABEL = "pred" +RESIDUAL_FILE = "residual.png" +SEABORN_RESIDUAL_AXIS_FONTSIZE = 12 +SEABORN_RESIDUAL_TITLE_FONTSIZE = 22 +SEABORN_RESIDUAL_STYLE = "whitegrid" +SEABORN_RESIDUAL_ASPECT_RATIO = "equal" +SEABORN_RESIDUAL_PLOT_DPI = 120 +SEABORN_RESIDUAL_RANGE = [-100, 100] +SEABORN_RESIDUAL_LINE_COLOUR = "black" +SEABORN_RESIDUAL_LINE_WIDTH = 1 + # Can move to a hyperparmeters file # If anything we might want to have a file that can be loaded and sent to this script MODEL_HYPERPARAMETERS = { "autogluon": { - 'problem_type': 'regression', - 'eval_metric': 'mean_absolute_error', - 'time_limit': 30, - 'presets': 'medium_quality', - 'excluded_model_types': None + "problem_type": "regression", + "eval_metric": "mean_absolute_error", + "time_limit": 30, + "presets": "medium_quality", + "excluded_model_types": None, } } +TIMESTAMP_FORMAT = "%Y_%m_%d_%H_%M_%S" + RANDOM_SEED = 0 SUBSAMPLE_FACTOR = 200 -TRAIN_AND_VALIDATION_DATA_NAME = 'train_validation_data.parquet' -TEST_DATA_NAME = 'test_data.parquet' +TRAIN_AND_VALIDATION_DATA_NAME = "train_validation_data.parquet" +TEST_DATA_NAME = "test_data.parquet" REGISTRY_FILE = "model_registry.csv" MODEL_DIRECTORY = "model_directory" -BASE_REGISTRY_PATH = Path(__file__).parent.parent / MODEL_DIRECTORY +BASE_REGISTRY_PATH = Path(__file__).parent.parent / MODEL_DIRECTORY PREDICTION_LOCATION = Path("predictions") -PREDICTION_FILE = 'prediction.json' -METADATA_FILE = 'metadata.json' +PREDICTION_FILE = "prediction.json" +METADATA_FILE = "metadata.json" MODEL_FOLDER = "model" METRICS_FOLDER = "metrics" -DEPLOYMENT_FOLDER = "deployment" +DEPLOYMENT_FOLDER = "deployment" TOTAL_FLOOR_AREA_NATIONAL_AVERAGE = 70 FLOOR_HEIGHT_NATIONAL_AVERAGE = 2.45 COLUMNS_TO_MERGE_ON = [ - "PROPERTY_TYPE", - "BUILT_FORM", - "CONSTRUCTION_AGE_BAND", + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", - "NUMBER_HEATED_ROOMS" - ] + "NUMBER_HEATED_ROOMS", +] FULLY_GLAZED_DESCRIPTIONS = [ "Fully double glazed", @@ -50,48 +70,45 @@ FULLY_GLAZED_DESCRIPTIONS = [ ] FIXED_FEATURES = [ - 'PROPERTY_TYPE', - 'BUILT_FORM', - 'CONSTRUCTION_AGE_BAND', - 'NUMBER_HABITABLE_ROOMS', - 'CONSTITUENCY', - 'NUMBER_HEATED_ROOMS', - 'FIXED_LIGHTING_OUTLETS_COUNT', - 'FLOOR_HEIGHT', - 'FLOOR_LEVEL', - 'TOTAL_FLOOR_AREA', + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "NUMBER_HABITABLE_ROOMS", + "CONSTITUENCY", + "NUMBER_HEATED_ROOMS", + "FIXED_LIGHTING_OUTLETS_COUNT", + "FLOOR_HEIGHT", + "FLOOR_LEVEL", + "TOTAL_FLOOR_AREA", ] COMPONENT_FEATURES = [ - 'TRANSACTION_TYPE', - 'WALLS_DESCRIPTION', - 'FLOOR_DESCRIPTION', - 'LIGHTING_DESCRIPTION', - 'ROOF_DESCRIPTION', - 'MAINHEAT_DESCRIPTION', - 'HOTWATER_DESCRIPTION', - 'MAIN_FUEL', - 'MECHANICAL_VENTILATION', - 'SECONDHEAT_DESCRIPTION', - 'ENERGY_TARIFF', # Not sure if this is relevant - 'SOLAR_WATER_HEATING_FLAG', - 'PHOTO_SUPPLY', - 'WINDOWS_DESCRIPTION', - 'GLAZED_TYPE', - 'MULTI_GLAZE_PROPORTION', - 'LIGHTING_DESCRIPTION', - 'LOW_ENERGY_LIGHTING', - 'NUMBER_OPEN_FIREPLACES', - 'MAINHEATCONT_DESCRIPTION', - 'EXTENSION_COUNT', + "TRANSACTION_TYPE", + "WALLS_DESCRIPTION", + "FLOOR_DESCRIPTION", + "LIGHTING_DESCRIPTION", + "ROOF_DESCRIPTION", + "MAINHEAT_DESCRIPTION", + "HOTWATER_DESCRIPTION", + "MAIN_FUEL", + "MECHANICAL_VENTILATION", + "SECONDHEAT_DESCRIPTION", + "ENERGY_TARIFF", # Not sure if this is relevant + "SOLAR_WATER_HEATING_FLAG", + "PHOTO_SUPPLY", + "WINDOWS_DESCRIPTION", + "GLAZED_TYPE", + "MULTI_GLAZE_PROPORTION", + "LIGHTING_DESCRIPTION", + "LOW_ENERGY_LIGHTING", + "NUMBER_OPEN_FIREPLACES", + "MAINHEATCONT_DESCRIPTION", + "EXTENSION_COUNT", # 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION ] # For these fields, we take an average if we have multiple values -AVERAGE_FIXED_FEATURES = [ - "TOTAL_FLOOR_AREA", - "FLOOR_HEIGHT" -] +AVERAGE_FIXED_FEATURES = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"] # For these fields, we take the latest value if we have multiple values # Since more recent EPCs have been conducted with more rigour, we assume that the latest value is @@ -105,11 +122,7 @@ LATEST_FIELD = [ ] # If we see thee features changing, we don't use the EPC, since deem it not to be reliable -MANDATORY_FIXED_FEATURES = [ - "PROPERTY_TYPE", - "BUILT_FORM", - "CONSTITUENCY" -] +MANDATORY_FIXED_FEATURES = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTITUENCY"] # For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were # conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England @@ -119,14 +132,16 @@ EARLIEST_EPC_DATE = "2014-08-01" RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY" HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT" + def ordinal(n): if 10 <= n % 100 <= 20: - suffix = 'th' + suffix = "th" else: - suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th') + suffix = {1: "st", 2: "nd", 3: "rd"}.get(n % 10, "th") return str(n) + suffix + FLOOR_LEVEL_MAP = { "Basement": -1, "Ground": 0, @@ -145,8 +160,7 @@ BUILT_FORM_REMAP = { } DATA_PROCESSOR_SETTINGS = { - 'low_memory': False, - 'epc_minimum_count': 1, - 'column_mappings': {'UPRN': [int, str]} + "low_memory": False, + "epc_minimum_count": 1, + "column_mappings": {"UPRN": [int, str]}, } - diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index 55f181bc..c801473b 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -2,7 +2,7 @@ version: '3' services: minio: - image: minio/minio + image: minio/minio:RELEASE.2022-05-26T05-48-41Z ports: - "9000:9000" - "9001:9001" @@ -12,6 +12,45 @@ services: MINIO_ROOT_USER: &MINIO_USER admin MINIO_ROOT_PASSWORD: &MINIO_PASS password command: server --console-address ":9001" /data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + simulation_system_training: + build: + context: ./ + dockerfile: ./Dockerfiles/Dockerfile.training + image: simulation_system_training + environment: + ENDPOINT_URL: http://minio:9000/ + AWS_ACCESS_KEY_ID: *MINIO_USER + AWS_SECRET_ACCESS_KEY: *MINIO_PASS + tty: true + depends_on: + minio: + condition: service_healthy + # command: + # ["bash"] + + # simulation_system_prediction: + # build: + # context: ./ + # dockerfile: ./Dockerfiles/Dockerfile.prediction + # image: simulation_system_prediction + # environment: + # ENDPOINT_URL: http://minio:9000/ + # AWS_ACCESS_KEY_ID: *MINIO_USER + # AWS_SECRET_ACCESS_KEY: *MINIO_PASS + # tty: true + # depends_on: + # simulation_system_training: + # condition: service_completed_successfully + # command: + # ["bash"] + + # volumes: -# minio_storage: {} \ No newline at end of file +# minio_storage: {} diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index 2400e7c7..c1ca56a6 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -10,15 +10,16 @@ from core.Settings import ( COMPONENT_FEATURES, RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, - COLUMNS_TO_MERGE_ON + COLUMNS_TO_MERGE_ON, ) from core.DataProcessor import DataProcessor -DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates' +DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates" # TODO: Have a look at temporal features + def app(): # Get all the files in the directory @@ -29,7 +30,7 @@ def app(): directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] dataset = [] - # 116 + # 116 # 128048706 # PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic # -certificates/domestic-E09000021-Kingston-upon-Thames') @@ -48,12 +49,14 @@ def app(): fixed_data = {} # If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row - if max(property_data[MANDATORY_FIXED_FEATURES].nunique()) > 1: + if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1): continue - # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS + # Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict() - mandatory_field_data = property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict() + mandatory_field_data = ( + property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict() + ) # Taking just the last row, which is the percentage change from the latest to previous one only # property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1 @@ -63,17 +66,25 @@ def app(): cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list() # Get the corresponding groupby and merge, and fill in NA values - cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[ - ['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean() + cleaning_averages_to_merge = cleaning_averages.groupby( + cleaned_columns_to_merge_on + )[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() - modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on, - suffixes=['', '_AVERAGE']) - modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna( - modified_property_data['TOTAL_FLOOR_AREA_AVERAGE']) - modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna( - modified_property_data['FLOOR_HEIGHT_AVERAGE']) + modified_property_data = pd.merge( + property_data, + cleaning_averages_to_merge, + on=cleaned_columns_to_merge_on, + suffixes=["", "_AVERAGE"], + ) + modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[ + "TOTAL_FLOOR_AREA" + ].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"]) + modified_property_data["FLOOR_HEIGHT"] = modified_property_data[ + "FLOOR_HEIGHT" + ].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"]) modified_property_data = modified_property_data.drop( - columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE']) + columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + ) for field in AVERAGE_FIXED_FEATURES: @@ -94,8 +105,9 @@ def app(): # We include the lodgement date here as we probably need to factor time into the # model, since EPC standards and rigour have changed over time variable_data = modified_property_data[ - COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] - ] + COMPONENT_FEATURES + + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] + ] # Note: we look at changes between subsequent EPCS, however we could look at other permutations # e.g. first vs second, second vs third and also first vs third @@ -107,15 +119,24 @@ def app(): starting_record = variable_data.iloc[idx] ending_record = variable_data.iloc[idx + 1] - rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE] - heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE] + rdsap_change = ( + ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE] + ) + heat_demand_change = ( + ending_record[HEAT_DEMAND_RESPONSE] + - starting_record[HEAT_DEMAND_RESPONSE] + ) # TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and # floors, we may want to use the U-value. We may also want to handle the (assumed) tags # within descriptions - starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING") - ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING") + starting_record = starting_record[ + COMPONENT_FEATURES + ["LODGEMENT_DATE"] + ].add_suffix("_STARTING") + ending_record = ending_record[ + COMPONENT_FEATURES + ["LODGEMENT_DATE"] + ].add_suffix("_ENDING") features = pd.concat([starting_record, ending_record]) @@ -125,14 +146,14 @@ def app(): "RDSAP_CHANGE": rdsap_change, "HEAT_DEMAND_CHANGE": heat_demand_change, **fixed_data, - **features.to_dict() + **features.to_dict(), } ) dataset.extend(property_model_data) output = pd.DataFrame(dataset) - output.to_parquet('./dataset.parquet') + output.to_parquet("./dataset.parquet") if __name__ == "__main__": diff --git a/model_data/simulation_system/handlers/predictions_app.py b/model_data/simulation_system/handlers/predictions_app.py new file mode 100644 index 00000000..1f3a5c8b --- /dev/null +++ b/model_data/simulation_system/handlers/predictions_app.py @@ -0,0 +1,44 @@ +import os +import urllib.parse +from predictions import prediction + +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") + + +def handler(event, context): + """ + Take in event and trigger the prediction pipeline + """ + + # Assuming a file in a bucket landing for now? + # Assuming we have a model to use + + # bucket = event["Records"][0]["s3"]["bucket"]["name"] + # key = urllib.parse.unquote_plus( + # event["Records"][0]["s3"]["bucket"]["key"], encoding="utf-8" + # ) + + payload = event["body"] + data_path = payload["file_location"] + property_id = payload["property_id"] + portfolio_id = payload["portfolio_id"] + created_at = payload["created_at"] + + # prediction_file = bucket + "/" + key + + # TODO: put a model into s3, both locally and in aws + # model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/") + model_path = os.environ.get( + "MODEL_PATH", + "s3://retrofit-model-directory-{RUNTIME_ENVIRONMENT}/RDSAP_CHANGE/autogluon/rdsap_change-medium_quality-30-2023-08-30_11-43-41/deployment/", + ) + + try: + outputs = prediction(model_path=model_path, data_path=data_path) + # Store into s3, with key of {portfolio_id}-{property_id} + outputs.to_csv( + f"s3://retrofit-sap-prediction-{RUNTIME_ENVIRONMENT}/{portfolio_id}/{property_id}/{created_at}.csv" + ) + + except (Exception, KeyError, ValueError): + print("Prediction failed") diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index aa6c2d0f..22104993 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -2,24 +2,26 @@ Script to load MLModel class and generate predictions """ +import os import json import argparse -from model_data.simulation_system.MLModel.Models import AutogluonModel -from model_data.simulation_system.core.Logger import logger -from model_data.simulation_system.core.DataLoader import DataLoader import pandas as pd from typing import Optional from datetime import datetime -from model_data.simulation_system.core.Settings import ( +from MLModel.Models import AutogluonModel +from core.Logger import logger +from core.DataLoader import dataloader_factory +from core.Settings import ( BASE_REGISTRY_PATH, REGISTRY_FILE, PREDICTION_LOCATION, PREDICTION_FILE, - METADATA_FILE + METADATA_FILE, + TIMESTAMP_FORMAT, ) -TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - +TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") # FOR TESTING # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to @@ -33,30 +35,51 @@ def ingest_arguments() -> argparse.Namespace: Helper function to take in arguments from script start """ - parser = argparse.ArgumentParser(description='Inputs for training script') - parser.add_argument('--target-column', type=str, help='The response variable you are predicting for', - choices=['RDSAP_CHANGE', 'HEAT_DEMAND_CHANGE'], default='RDSAP_CHANGE') - parser.add_argument('--model-path', type=str, - help='If you wish to use a specific model, specify the model path here') - parser.add_argument('--data', type=str, help='Json data for predictions') - parser.add_argument('--data-path', type=str, help='Location of Parquet dataset to load for training') + parser = argparse.ArgumentParser(description="Inputs for training script") + parser.add_argument( + "--target-column", + type=str, + help="The response variable you are predicting for", + choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], + default="RDSAP_CHANGE", + ) + parser.add_argument( + "--model-path", + type=str, + help="If you wish to use a specific model, specify the model path here", + ) + parser.add_argument("--data", type=str, help="Json data for predictions") + parser.add_argument( + "--data-path", type=str, help="Location of Parquet dataset to load for training" + ) args = parser.parse_args() return args -def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data: pd.DataFrame = None, - data_path: Optional[str] = None): +def prediction( + target_column: str = "RDSAP_CHANGE", + model_path: str | None = None, + data: Optional[pd.DataFrame | str] = None, + data_path: Optional[str] = None, +): """ Main pipeline function """ - registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE + if RUNTIME_ENVIRONMENT == "local": + registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE - if registry_path is None or not registry_path.exists(): - logger.error("No registry path provided or registry doesn't exist") - exit(1) + if registry_path is None or not registry_path.exists(): + logger.error("No registry path provided or registry doesn't exist") + exit(1) + elif RUNTIME_ENVIRONMENT == "dev": + registry_path = ( + "s3://retrofit-model-directory-dev/RDSAP_CHANGE/model_registry.csv" + ) + else: + raise NotImplemented("TO be implemented") if model_path is not None: logger.info("User specified a model to load - ignoring registry") @@ -67,11 +90,11 @@ def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data # TODO: Think about where registry will sit/ type logger.info("Loading best model from registry") registry_df = pd.read_csv(registry_path) - best_model_df = registry_df[registry_df['best_model']] + best_model_df = registry_df[registry_df["best_model"]] - model_location = best_model_df['model_location'].values[0] - model_type = best_model_df['model_type'].values[0] - model_name = best_model_df['model_name'].values[0] + model_location = best_model_df["model_location"].values[0] + model_type = best_model_df["model_type"].values[0] + model_name = best_model_df["model_name"].values[0] logger.info("--- Model Info: ---") logger.info(f"Model type: {model_type}") @@ -84,23 +107,34 @@ def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data exit(1) if data_path and data is None: logger.info("Loading data from provided path") - data = DataLoader().load(filepath=data_path, index_col="UPRN") + dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) + data = dataloader.load(filepath=data_path, index_col="UPRN") - # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION - data = data.sample(1) + if data is None: + raise ValueError("No data loaded") + + # # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION + # data = data.sample(1) else: - logger.info('Using data provided') - data = json.loads(data) + logger.info("Using data provided") + data = json.loads(str(data)) data = pd.DataFrame([data]) print(data) logger.info("--- Loading Model ---") - model = AutogluonModel() + + if model_type == "autogluon": + logger.info("Using an Autogluon model") + model = AutogluonModel() + else: + logger.error("No other model currently") + exit(1) model.load_model(filepath=model_location) logger.info("--- Generating Predictions ---") prediction = model.generate_predictions(data=data) + return pd.concat([data["recommendation_id"], prediction], axis=1) # Save prediction some where? # prediction.to_csv("s3?") @@ -108,24 +142,23 @@ def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data # TODO: Check how we want to structure outputs # For now, just categorise by uprn and timestamp # Assume one uprn coming in for now - uprn = data.index.values[0] + # uprn = data.index.values[0] - # Saving prediction local for now - # TODO: change uprn to TARGET_ID, put in setting - logger.info("--- Outputting prediction and metadata --- ") - output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP - output_base.mkdir(parents=True, exist_ok=True) + # # Saving prediction local for now + # # TODO: change uprn to TARGET_ID, put in setting + # logger.info("--- Outputting prediction and metadata --- ") + # output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP + # output_base.mkdir(parents=True, exist_ok=True) - # TODO: change model.model.info to a class method for MLModel - json_prediction = prediction.to_json(output_base / PREDICTION_FILE) - prediction_metadata = { - "model_type": model_type, - "model_name": model_name, - "model_location": model_location, - "model_settings": model.model.info() - } + # json_prediction = prediction.to_json(output_base / PREDICTION_FILE) + # prediction_metadata = { + # "model_type": model_type, + # "model_name": model_name, + # "model_location": model_location, + # "model_settings": model.model_metadata(), + # } - pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE) + # pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE) return json_prediction @@ -134,6 +167,10 @@ if __name__ == "__main__": args = ingest_arguments() # Data can be passed in as JSON string: python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}' - # Data path can be passed as so: python3 predictions.py --data-path - # ../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet - prediction(target_column=args.target_column, model_path=args.model_path, data=args.data, data_path=args.data_path) + # Data path can be passed as so: python3 predictions.py --data-path ./model_build_data/change_data/rdsap_full/test_data.parquet + prediction( + target_column=args.target_column, + model_path=args.model_path, + data=args.data, + data_path=args.data_path, + ) diff --git a/model_data/simulation_system/regenerate_metrics.py b/model_data/simulation_system/regenerate_metrics.py new file mode 100644 index 00000000..025892f5 --- /dev/null +++ b/model_data/simulation_system/regenerate_metrics.py @@ -0,0 +1,118 @@ +""" +Script to regenerate metrics for all the models in the model registry +Key task: +- Load model registry +- For each model in the registry, generate the metrics (Key questions here is what if the test data changes) +- Save the new metrics out to s3 bucket +""" + +import os +import argparse +from s3pathlib import S3Path +import pandas as pd +from tqdm import tqdm +from core.Logger import logger +from core.Metrics import Metrics, sort_by_metric +from core.DataLoader import dataloader_factory +from core.Settings import ( + OPTIMISE_METRIC, + MODEL_DIRECTORY, + REGISTRY_FILE, + BEST_MODEL_COLUMN_NAME, +) +from MLModel.Models import model_factory + +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") + + +def ingest_arguments() -> argparse.Namespace: + """ + Helper function to take in arguments from script start + """ + + parser = argparse.ArgumentParser(description="Inputs for training script") + + parser.add_argument( + "--test-filepath", + type=str, + help="Location of Parquet dataset to load for testing", + required=True, + ) + parser.add_argument( + "--target-column", + type=str, + help="The response variable", + choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], + default="RDSAP_CHANGE", + ) + + args = parser.parse_args() + + return args + + +def regenerate_metrics(test_filepath: str, target_column: str) -> None: + """ + Recreate all metrics for all models + """ + + logger.info("--- Loading test data ---") + dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) + test_df = dataloader.load(filepath=test_filepath) + + logger.info("--- Loading model registry ---") + logger.info(f"Loading registry for {target_column} models") + registry_df = dataloader.load( + filepath=S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri + ) + + logger.info("Extract non-metric columns") + registry_df = registry_df[["model_type", "model_name", "model_location"]] + + logger.info("--- Regenerating metrics ---") + + metric_suite = Metrics() + + metrics_df = pd.DataFrame(columns=metric_suite.list_metric_functions()) + + for _, row in tqdm(registry_df.iterrows()): + + logger.info(f"--- Loading Model ({row['model_name']}) ---") + + model = model_factory(model_type=row["model_type"])() + + model.load_model(filepath=row["model_location"]) + + metrics = metric_suite.generate_metric_suite( + model=model, data=test_df, target_column=target_column + ) + + # Add metrics row by row + metrics_df = pd.concat([metrics_df, metrics], axis=0).reset_index(drop=True) + + # Add metrics df to registry df side by side + registry_df = pd.concat([registry_df, metrics_df], axis=1) + + logger.info(f"--- Sorting by Optimise Metric ({OPTIMISE_METRIC}) ---") + + registry_df = sort_by_metric( + data=registry_df, + optimse_metric=OPTIMISE_METRIC, + best_model_column_name=BEST_MODEL_COLUMN_NAME, + ) + + logger.info("--- Saving model metrics ---") + + registry_df.to_csv(S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri) + + +if __name__ == "__main__": + + logger.info("---Begin Pipeline to regenerate metrics---") + + logger.info("---Ingest Arguments---") + args = ingest_arguments() + + regenerate_metrics( + test_filepath=args.test_filepath, target_column=args.target_column + ) diff --git a/model_data/simulation_system/requirements/predictions/predictions-dev.txt b/model_data/simulation_system/requirements/predictions/predictions-dev.txt new file mode 100644 index 00000000..0574e0f5 --- /dev/null +++ b/model_data/simulation_system/requirements/predictions/predictions-dev.txt @@ -0,0 +1,3 @@ +autogluon==0.8.2 +pandas==1.5.3 +pre-commit==3.3.3 diff --git a/model_data/simulation_system/requirements/prediction.txt b/model_data/simulation_system/requirements/predictions/predictions.txt similarity index 54% rename from model_data/simulation_system/requirements/prediction.txt rename to model_data/simulation_system/requirements/predictions/predictions.txt index f9ce32bf..ae7457ee 100644 --- a/model_data/simulation_system/requirements/prediction.txt +++ b/model_data/simulation_system/requirements/predictions/predictions.txt @@ -1,2 +1,2 @@ autogluon==0.8.2 -pandas==1.5.3 \ No newline at end of file +pandas==1.5.3 diff --git a/model_data/simulation_system/requirements/training/training-dev.txt b/model_data/simulation_system/requirements/training/training-dev.txt new file mode 100644 index 00000000..ea205270 --- /dev/null +++ b/model_data/simulation_system/requirements/training/training-dev.txt @@ -0,0 +1,4 @@ +autogluon==0.8.2 +pandas==1.5.3 +seaborn==0.12.2 +pre-commit==3.3.3 diff --git a/model_data/simulation_system/requirements/training.txt b/model_data/simulation_system/requirements/training/training.txt similarity index 100% rename from model_data/simulation_system/requirements/training.txt rename to model_data/simulation_system/requirements/training/training.txt diff --git a/model_data/simulation_system/test_data_generation.py b/model_data/simulation_system/test_data_generation.py index d57c90f8..8d1dbf2b 100644 --- a/model_data/simulation_system/test_data_generation.py +++ b/model_data/simulation_system/test_data_generation.py @@ -2,79 +2,109 @@ from core.Logger import logger import argparse import pandas as pd from pathlib import Path -from core.Settings import ( - RANDOM_SEED, - TRAIN_AND_VALIDATION_DATA_NAME, - TEST_DATA_NAME -) +from core.Settings import RANDOM_SEED, TRAIN_AND_VALIDATION_DATA_NAME, TEST_DATA_NAME + def ingest_arguments() -> argparse.Namespace: """ Helper function to take in arguments from script start """ - parser = argparse.ArgumentParser(description='Inputs for training script') + parser = argparse.ArgumentParser(description="Inputs for training script") - parser.add_argument('--filepath', type=str, help='Location of Parquet dataset to load', required=True) - parser.add_argument('--output-folder', type=str, help='Location of Parquet dataset to save', required=True) - parser.add_argument('--percentage', type=float, help='Percentage of data to use as test data', default=None) - parser.add_argument('--volume', type=int, help='Volume of data to use as test data', default=None) - parser.add_argument('--sampling', type=str, help='Type of sampling to do for test data', choices=['random', 'stratified'], default='random') + parser.add_argument( + "--filepath", + type=str, + help="Location of Parquet dataset to load", + required=True, + ) + parser.add_argument( + "--output-folder", + type=str, + help="Location of Parquet dataset to save", + required=True, + ) + parser.add_argument( + "--percentage", + type=float, + help="Percentage of data to use as test data", + default=None, + ) + parser.add_argument( + "--volume", type=int, help="Volume of data to use as test data", default=None + ) + parser.add_argument( + "--sampling", + type=str, + help="Type of sampling to do for test data", + choices=["random", "stratified"], + default="random", + ) args = parser.parse_args() return args -def main(filepath: str, output_folder: str, percentage: float, volume: int, sampling: str): + +def main( + filepath: str, output_folder: str, percentage: float, volume: int, sampling: str +): """ Load a dataset in and split out the training+validation data and the test data. """ - logger.info('---Loading Data---') + logger.info("---Loading Data---") data = pd.read_parquet(filepath).reset_index(drop=True) if percentage and volume is None: - test_amount = round(len(data)*percentage) + test_amount = round(len(data) * percentage) elif percentage is None and volume: test_amount = volume elif percentage is None and volume is None: - logger.error('No amount specified - please specify either a percentage or volume') + logger.error( + "No amount specified - please specify either a percentage or volume" + ) exit(1) else: - logger.info('Both percentage and volume specified - taking largest of the two') - test_amount = max(round(len(data)*percentage), volume) + logger.info("Both percentage and volume specified - taking largest of the two") + test_amount = max(round(len(data) * percentage), volume) - logger.info(f'---Extracting {test_amount} from dataset to be test data') + logger.info(f"---Extracting {test_amount} from dataset to be test data") - if sampling == 'random': - logger.info('--- Using random sample method ---') + train_validation_data = pd.DataFrame() + test_data = pd.DataFrame() + + if sampling == "random": + logger.info("--- Using random sample method ---") sample_index = data.sample(n=test_amount, random_state=RANDOM_SEED).index train_validation_data = data.drop(sample_index) test_data = data.iloc[sample_index] - elif sampling =='stratified': - # Not yet implemented + elif sampling == "stratified": + # Not yet implemented pass - logger.info('--- Saving data ---') + logger.info("--- Saving data ---") - train_validation_data.to_parquet(Path(output_folder)/ TRAIN_AND_VALIDATION_DATA_NAME) - test_data.to_parquet(Path(output_folder)/ TEST_DATA_NAME) + train_validation_data.to_parquet( + Path(output_folder) / TRAIN_AND_VALIDATION_DATA_NAME + ) + test_data.to_parquet(Path(output_folder) / TEST_DATA_NAME) + + logger.info(" ---Pipeline complete---") - logger.info(' ---Pipeline complete---') if __name__ == "__main__": - logger.info('--- Generate test data pipeline ---') + logger.info("--- Generate test data pipeline ---") args = ingest_arguments() main( - filepath=args.filepath, + filepath=args.filepath, output_folder=args.output_folder, - percentage=args.percentage, - volume=args.volume, - sampling=args.sampling - ) - + percentage=args.percentage, + volume=args.volume, + sampling=args.sampling, + ) diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 2a1dfcfa..47f70772 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -1,13 +1,19 @@ import argparse + +import os + +# from s3pathlib import S3Path + # import boto3 from pathlib import Path from datetime import datetime -from model_data.simulation_system.core.Logger import logger -from model_data.simulation_system.core.DataLoader import DataLoader -from model_data.simulation_system.core.FeatureProcessor import FeatureProcessor -from model_data.simulation_system.MLModel.Models import AutogluonModel import pandas as pd -from model_data.simulation_system.core.Settings import ( +from MLModel.Models import model_factory +from core.Logger import logger +from core.Metrics import Metrics, sort_by_metric +from core.DataLoader import dataloader_factory +from core.FeatureProcessor import FeatureProcessor +from core.Settings import ( MODEL_DIRECTORY, BASE_REGISTRY_PATH, REGISTRY_FILE, @@ -15,13 +21,24 @@ from model_data.simulation_system.core.Settings import ( METRICS_FOLDER, DEPLOYMENT_FOLDER, SUBSAMPLE_FACTOR, - MODEL_HYPERPARAMETERS + MODEL_HYPERPARAMETERS, + TIMESTAMP_FORMAT, + RESIDUAL_FILE, + BEST_MODEL_COLUMN_NAME, + OPTIMISE_METRIC, ) -import seaborn as sns -import matplotlib.pyplot as plt -TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") +TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) +RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") + +# STORAGE_OPTIONS = { +# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'), +# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'), +# "client_kwargs": { +# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") +# } +# } # FOR TESTING # train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet" @@ -43,21 +60,45 @@ TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # S3_CLIENT.create_bucket # S3_CLIENT.list_buckets() +# df = pd.read_parquet( +# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet", +# storage_options=STORAGE_OPTIONS +# ) + + def ingest_arguments() -> argparse.Namespace: """ Helper function to take in arguments from script start """ - parser = argparse.ArgumentParser(description='Inputs for training script') + parser = argparse.ArgumentParser(description="Inputs for training script") - parser.add_argument('--train-filepath', type=str, help='Location of Parquet dataset to load for training', - required=True) - parser.add_argument('--test-filepath', type=str, help='Location of Parquet dataset to load for testing', - required=True) - parser.add_argument('--model-type', type=str, help='The type of model to train', choices=["autogluon"], - default="autogluon") - parser.add_argument('--target-column', type=str, help='The response variable', - choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], default='RDSAP_CHANGE') + parser.add_argument( + "--train-filepath", + type=str, + help="Location of Parquet dataset to load for training", + required=True, + ) + parser.add_argument( + "--test-filepath", + type=str, + help="Location of Parquet dataset to load for testing", + required=True, + ) + parser.add_argument( + "--model-type", + type=str, + help="The type of model to train", + choices=["autogluon"], + default="autogluon", + ) + parser.add_argument( + "--target-column", + type=str, + help="The response variable", + choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], + default="RDSAP_CHANGE", + ) args = parser.parse_args() @@ -69,116 +110,129 @@ def training( test_filepath: str, target_column: str = "RDSAP_CHANGE", model_type: str = "autogluon", - hyperparameters: dict = None + hyperparameters: dict | None = None, ) -> None: """ Pipeline to run training on the dataset """ - logger.info('--- Loading data ---') - dataloader = DataLoader() + logger.info("--- Loading data ---") + dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) train_df = dataloader.load(filepath=train_filepath) test_df = dataloader.load(filepath=test_filepath) - logger.info('--- Feature processing ---') + if train_df is None or test_df is None: + raise ValueError("No data Loaded - cancelling pipeline") + + logger.info("--- Feature processing ---") feature_processor = FeatureProcessor() + # This is for convenience for now subsample_amount = round(len(train_df) / SUBSAMPLE_FACTOR) - train_df = feature_processor.process(train_df, target_column=target_column, subsample_amount=subsample_amount) + train_df = feature_processor.process( + train_df, target_column=target_column, subsample_amount=subsample_amount + ) test_df = feature_processor.process(test_df, target_column=target_column) - logger.info('--- Build Model ---') + logger.info("--- Build Model ---") logger.info("--- Load Hyperparameters ---") if hyperparameters is None: logger.info("Use base hyperparameters in settings") hyperparameters = MODEL_HYPERPARAMETERS[model_type] - logger.info(f'Hyperparameters are: {hyperparameters}') + logger.info(f"Hyperparameters are: {hyperparameters}") - if model_type == "autogluon": - model_root = f"{target_column}-{hyperparameters['presets']}-{hyperparameters['time_limit']}-{TIMESTAMP}".lower() - output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root + logger.info( + "--- Loading model configuration (Model type and Naming convention) ---" + ) + # We might want to have hyperparameters in the names to make models more recognisable + model_toolkit = model_factory( + model_type=model_type, hyperparameters=hyperparameters + ) - model = AutogluonModel( - output_filepath=output_base / MODEL_FOLDER - ) - else: - raise ValueError("No alternative model implemented yet") + model_root = ( + f"{target_column}-{model_toolkit['naming_attributes']}-{TIMESTAMP}".lower() + ) + output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root + + # Will need to pass output path to model (for saving purposes) + model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER) model.train_model( - data=train_df, - target_column=target_column, - hyperparameters=hyperparameters + data=train_df, target_column=target_column, hyperparameters=hyperparameters ) logger.info("--- Save Model ---") model.save_model(output_filepath=model.output_filepath) - logger.info('--- Generate evaluation metrics ---') + logger.info("--- Generate evaluation metrics ---") + metrics = Metrics() + metrics_df = model.model_evaluation( validation_data=test_df, target_column=target_column, - metrics_location=output_base / METRICS_FOLDER + metrics_location=output_base / METRICS_FOLDER, + metrics=metrics, ) logger.info("--- Generate metric outputs using predictions ---") - # TODO: can have a model.metric_outputs method - # FOr not just do it here - residual_df = pd.DataFrame(list(zip(test_df[target_column], model.predictions)), columns=['true', 'pred']) - - # image formatting - # TODO: move to settings file , AXIS_FONT, TITLE_FONT - axis_fs = 18 # fontsize - title_fs = 22 # fontsize - sns.set(style="whitegrid") - ax = sns.scatterplot(x="true", y="pred", data=residual_df) - ax.set_aspect('equal') - ax.set_xlabel(f'True {target_column}', fontsize=axis_fs) - ax.set_ylabel(f'Predicted {target_column}', fontsize=axis_fs) # ylabel - ax.set_title('Residuals', fontsize=title_fs) - - # Square aspect ratio - ax.plot([-100, 100], [-100, 100], 'black', linewidth=1) - - plt.tight_layout() - RESIDUAL_FILE = "residuals.png" - plt.savefig(output_base / METRICS_FOLDER / RESIDUAL_FILE, dpi=120) + # metrics.generate_plot_suite() + metrics.generate_residual_plot( + actuals=test_df[target_column], + predictions=model.predictions, + target_column=target_column, + output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE, + ) # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host - # plt.savefig(RESIDUAL_FILE, dpi=120) # TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment # Imagining for now that the model trained here is the best model amongst all models built logger.info("--- Optimising model for deployment ---") - deployment_model_path = model.optimise_model_for_deployment(deployment_path=output_base / DEPLOYMENT_FOLDER) - logger.info(f"Optimised version of best model can be found at: {deployment_model_path}") + deployment_model_path = model.optimise_model_for_deployment( + deployment_path=output_base / DEPLOYMENT_FOLDER + ) + logger.info( + f"Optimised version of best model can be found at: {deployment_model_path}" + ) # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory + # Loading registry from s3 logger.info("--- Append registry with new model ---") + registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE + # registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri + # registry = RegistryHandler(location=registry_path) if registry_path.exists(): logger.info("Registry file found - Loading into Dataframe") registry_df = pd.read_csv(registry_path, index_col=None) else: - # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns - registry_df = pd.DataFrame( - columns=['model_type', 'model_name', 'model_location', 'mean_absolute_error', 'root_mean_squared_error', - 'mean_squared_error', 'r2', 'pearsonr', 'median_absolute_error', 'mape', 'best_model']) + # TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns + columns = [ + BEST_MODEL_COLUMN_NAME, + "model_type", + "model_name", + "model_location", + ] + metrics.list_metric_functions() + + registry_df = pd.DataFrame(columns=columns) model_details_df = pd.DataFrame( - [{ - 'model_type': model_type, - 'model_name': model_root, - 'model_location': deployment_model_path - }] + [ + { + "model_type": model_type, + "model_name": model_root, + "model_location": deployment_model_path, + } + ] ) registry_row = pd.concat([model_details_df, metrics_df], axis=1) @@ -187,9 +241,12 @@ def training( # TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and # regenerate new metrics # TODO: decide metric to optimise to - registry_df = registry_df.sort_values("mean_absolute_error", ascending=False).reset_index(drop=True) - registry_df['best_model'] = [False] * len(registry_df) - registry_df.loc[0, 'best_model'] = True + + registry_df = sort_by_metric( + registry_df, + optimse_metric=OPTIMISE_METRIC, + best_model_column_name=BEST_MODEL_COLUMN_NAME, + ) logger.info("--- Saving new model to registry ---") # Ensure the directory exists @@ -200,9 +257,10 @@ def training( if __name__ == "__main__": - logger.info('---Begin Pipeline---') - logger.info('---Ingest Arguments---') + logger.info("---Begin Pipeline---") + + logger.info("---Ingest Arguments---") args = ingest_arguments() # To run script: python3 training.py --train-filepath @@ -213,5 +271,5 @@ if __name__ == "__main__": train_filepath=args.train_filepath, test_filepath=args.test_filepath, target_column=args.target_column, - model_type=args.model_type + model_type=args.model_type, )