Merge pull request #183 from Hestia-Homes/mlmodel

Mlmodel
This commit is contained in:
quandanrepo 2023-08-31 14:49:40 +01:00 committed by GitHub
commit 3e833a8c73
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1228 additions and 378 deletions

4
.gitignore vendored
View file

@ -127,6 +127,7 @@ venv/
ENV/
env.bak/
venv.bak/
.training_env/
# Spyder project settings
.spyderproject
@ -255,4 +256,5 @@ model_data/.idea/
model_data/simulation_system/.idea/
model_data/simulation_system/data*
model_data/simulation_system/model_directory/
model_data/simulation_system/predictions/

View file

@ -0,0 +1,11 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 22.10.0
hooks:
- id: black

View file

@ -0,0 +1,37 @@
FROM python:3.10.12-slim as release
ARG USER=nroot
ARG UID=1000
ARG GID=100
# Install patches
RUN apt-get update && apt-get upgrade -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists
# Install python packages
COPY ../requirements/predictions/predictions.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Create and configure non-root user
RUN useradd \
--create-home \
--uid ${UID} \
--gid ${GID} \
--shell /bin/bash \
${USER}
# Copy the project code to user home directory
COPY --chown=${UID}:${GID} ./core /home/simulation_system/core
COPY --chown=${UID}:${GID} ./MLModel /home/simulation_system/MLModel
COPY --chown=${UID}:${GID} ./predictions.py /home/simulation_system/predictions.py
# FOR TESTING
# COPY --chown=${UID}:${GID} ./model_build_data /home/simulation_system/model_build_data
# Switch user
USER ${USER}
WORKDIR /home/simulation_system
# Run the python command
CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"]

View file

@ -0,0 +1,14 @@
FROM public.ecr.aws/lambda/python:3.10
# Install python packages
COPY ../requirements/predictions/predictions.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the project code to user home directory
COPY ./core ${LAMBDA_TASK_ROOT}/simulation_system/core
COPY ./MLModel ${LAMBDA_TASK_ROOT}/simulation_system/MLModel
COPY ./predictions.py ${LAMBDA_TASK_ROOT}/simulation_system/predictions.py
COPY ./handlers/predictions_app.py ${LAMBDA_TASK_ROOT}/simulation_system/predictions_app.py
# Run off a lambda trigger
CMD [ "prediction_app.handler" ]

View file

@ -0,0 +1,37 @@
FROM python:3.10.12-slim as release
ARG USER=nroot
ARG UID=1000
ARG GID=100
# Install patches
RUN apt-get update && apt-get upgrade -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists
# Install python packages
COPY ../requirements/training/training.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Create and configure non-root user
RUN useradd \
--create-home \
--uid ${UID} \
--gid ${GID} \
--shell /bin/bash \
${USER}
# Copy the project code to user home directory
COPY --chown=${UID}:${GID} ./core /home/simulation_system/core
COPY --chown=${UID}:${GID} ./MLModel /home/simulation_system/MLModel
COPY --chown=${UID}:${GID} ./training.py /home/simulation_system/training.py
# FOR TESTING
# COPY --chown=${UID}:${GID} ./model_build_data /home/simulation_system/model_build_data
# Switch user
USER ${USER}
WORKDIR /home/simulation_system
# Run the python command
CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"]

View file

@ -0,0 +1,14 @@
FROM public.ecr.aws/lambda/python:3.10
# Install python packages
COPY ../requirements/training/training.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the project code to user home directory
COPY ./core ${LAMBDA_TASK_ROOT}/simulation_system/core
COPY ./MLModel ${LAMBDA_TASK_ROOT}/simulation_system/MLModel
COPY ./training.py ${LAMBDA_TASK_ROOT}/simulation_system/training.py
COPY ./handlers/training_app.py ${LAMBDA_TASK_ROOT}/simulation_system/training_app.py
# Run off a lambda trigger
CMD [ "training_app.handler" ]

View file

@ -5,19 +5,20 @@ This is the base protocol:
Key tasks:
- Template Model class for different model types
- Save model
- Load Model
- Load Model
- Generate Inference
"""
from numpy import ndarray
from pathlib import Path
from typing import Protocol, NamedTuple
from typing import Protocol, NamedTuple, Any
import pandas as pd
class MLModel(Protocol):
'''
"""
Base ML Model protocol
'''
"""
def load_model(self, filepath: Path) -> None:
"""
@ -30,21 +31,23 @@ class MLModel(Protocol):
"""
def train_model(
self,
data: pd.DataFrame,
target_column: str,
hyperparameter: dict
) -> None:
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:
"""
For the given data and hyperparameters (specified to the model), a model is trained
"""
def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame:
def generate_predictions(self, data: pd.DataFrame) -> ndarray[Any, Any] | None:
"""
For the given dataframe, model is loaded and predictions are generated
"""
def model_evaluation(self, validation_data: pd.DataFrame, target_column: str, metrics_location: Path = None) -> NamedTuple:
def model_evaluation(
self,
validation_data: pd.DataFrame,
target_column: str,
metrics_location: Path | None = None,
) -> pd.DataFrame | None:
"""
For any validation data, a set of predictions and metrics are return
"""
@ -53,7 +56,8 @@ class MLModel(Protocol):
"""
Perfomance post processing on Model to ensure ready for deployment
"""
def generate_meta_data(self):
def model_metadata(self) -> dict | None:
"""
Extract out model metadata as dictionary
"""

View file

@ -1,22 +1,45 @@
"""
Different implementations of the MLModel Protocol
Different implementations of the MLModel Protocol
Uses the BaseMLModel protocol
Key tasks:
- Template Model class for different model types
- Save model
- Load Model
- Load Model
- Generate Inference
"""
from typing import NamedTuple
from typing import Any
from pathlib import Path
import pandas as pd
from autogluon.tabular import TabularDataset, TabularPredictor
from sklearn.metrics import mean_absolute_percentage_error
from model_data.simulation_system.core.Logger import logger
from core.Logger import logger
from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME
from MLModel.BaseMLModel import MLModel
AUTOGLUON_HYPERPARAMETERS = ['problem_type', 'eval_metric', 'time_limit', 'presets', 'excluded_model_types']
METRIC_FILENAME = "metrics.csv"
AUTOGLUON_HYPERPARAMETERS = [
"problem_type",
"eval_metric",
"time_limit",
"presets",
"excluded_model_types",
]
def model_factory(model_type: str, hyperparameters: dict) -> dict:
"""
Use factory pattern to register the different ML implementations
"""
model_types = {
"autogluon": {
"model": AutogluonModel,
"naming_attributes": f"{hyperparameters['presets']}-{hyperparameters['time_limit']}",
},
}
return model_types[model_type]
class AutogluonModel:
@ -24,28 +47,28 @@ class AutogluonModel:
Autogluon model that implements the MLModel Protocol
"""
def __init__(self, output_filepath: Path = None) -> None:
def __init__(self, output_filepath: Path | None = None) -> None:
self.model = None
self.output_filepath = output_filepath
self.predictions = None
def load_model(self, filepath: Path) -> None:
def load_model(self, filepath: str | Path) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
filepath = str(filepath)
self.model = TabularPredictor.load(path=filepath)
def save_model(self, output_filepath: Path = None) -> None:
def save_model(self, output_filepath: Path | None = None) -> None:
"""
Providing a path, this function will save the model to be used.
"""
logger.info("Using AutoGluon Model - Model saving already occured")
def train_model(
self,
data: pd.DataFrame,
target_column: str,
hyperparameters: dict = None) -> None:
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:
"""
For the given data and hyperparameters, a model is trained
"""
@ -54,7 +77,9 @@ class AutogluonModel:
exit(1)
if set(AUTOGLUON_HYPERPARAMETERS) != set(hyperparameters.keys()):
print("Hyperparameters (dict) is incorrectly defined - please check what hyperparameters are required")
print(
"Hyperparameters (dict) is incorrectly defined - please check what hyperparameters are required"
)
exit(1)
AGdata = TabularDataset(data=data)
@ -62,16 +87,16 @@ class AutogluonModel:
self.model = TabularPredictor(
label=target_column,
path=self.output_filepath,
problem_type=hyperparameters['problem_type'],
eval_metric=hyperparameters['eval_metric']
problem_type=hyperparameters["problem_type"],
eval_metric=hyperparameters["eval_metric"],
).fit(
AGdata,
time_limit=hyperparameters['time_limit'],
presets=hyperparameters['presets'],
excluded_model_types=hyperparameters['excluded_model_types']
time_limit=hyperparameters["time_limit"],
presets=hyperparameters["presets"],
excluded_model_types=hyperparameters["excluded_model_types"],
)
def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame:
def generate_predictions(self, data: pd.DataFrame) -> pd.Series:
"""
For the given dataframe, model is loaded and predictions are generated
"""
@ -80,7 +105,7 @@ class AutogluonModel:
print("No model loaded/ trained")
exit(1)
predictions = self.model.predict(data)
predictions = pd.Series(self.model.predict(data))
return predictions
@ -88,30 +113,31 @@ class AutogluonModel:
self,
validation_data: pd.DataFrame,
target_column: str,
metrics_location: Path = None,
metric_filename: str = METRIC_FILENAME
metrics: Metrics,
metrics_location: Path | None = None,
metric_filename: str = METRIC_FILENAME,
) -> pd.DataFrame:
"""
For any validation data, a set of predictions and metrics are return
"""
if metrics_location is None:
logger.warning("Metrics will be outputted to current folder")
metrics_location = Path()
if self.model is None:
logger.error("No model loaded/ trained - Unable to generate evaluation")
exit(1)
performance = self.model.evaluate(validation_data)
# Generate prediction, load metrics suite, generate metrics betweeen the two
predictions = self.generate_predictions(validation_data)
performance = metrics.generate_metric_suite(
actuals=validation_data[target_column], predictions=predictions
)
logger.info("Prediction used for evaluations are saved in self.prediction")
self.predictions = predictions
# TODO: Can have a custom metric class that defines all different metrics we want
metric_mape = mean_absolute_percentage_error(validation_data[target_column], predictions)
performance['mape'] = metric_mape
logger.info("Saving metric file as metric.csv")
metrics_location.mkdir(exist_ok=True)
@ -122,7 +148,9 @@ class AutogluonModel:
return metrics_df
def optimise_model_for_deployment(self, deployment_path: Path = None) -> str:
def optimise_model_for_deployment(
self, deployment_path: Path | str | None = None
) -> Any:
"""
We can optimise the deployment for a autogluon model
"""
@ -132,5 +160,18 @@ class AutogluonModel:
if deployment_path is None:
raise ValueError("Deployment path required")
deployment_path = str(deployment_path)
# This will return a string path of the location
return self.model.clone_for_deployment(deployment_path)
def model_metadata(self) -> dict[str, Any]:
"""
For Autogluon model, use the inbuilt model info method
"""
if self.model is None:
logger.error("No Model loaded/ trained")
exit(1)
return self.model.info()

View file

@ -1,14 +1,33 @@
export PYENV_ROOT=$(HOME)/.pyenv
export PATH := $(PYENV_ROOT)/bin:$(PATH)
PYTHON_VERSION ?= 3.10.12
.PHONY: init
init: build docker
.PHONY: training_env
env:
pyenv install $(PYTHON_VERSION) || echo "Proceeding..." # || is to swallow non-zero response if python version already is installed
pyenv global $(PYTHON_VERSION)
python3 -m venv .training_env
. .training_env/bin/activate
pip install --upgrade pip
pip install -r requirements/training/training-dev.txt && pre-commit install
echo " --- TO ACTIVATE THE ENVIRONMENT --- "
echo "Run source .training_env/bin/activate to activate the virtual environment"
.PHONY: check-all
check-all: pre-commit run -a
.PHONY: build
build:
build:
docker-compose build
.PHONY: docker
docker:
docker:
docker-compose up -d
.PHONY: down
down:
docker compose down
down:
docker compose down

View file

@ -3,6 +3,13 @@
Starter Readme:
Steps for pipeline:
- (WIP) Set up the training development environment
- Change directory to this folder (simulation_system)
- Run the following command `make env PYTHON_VERSION=3.10.12`
- This will install the specified python version using `pyenv` and select this version as the global python version
- It will install all training packages as specified in the training-dev.txt requirements file, including the pre-commit hooks
- Run `source .training_env/bin/activate` to use this environment
- (WIP) Use Makefile to start up mock up s3 service
- By running `make init`, this will run the `docker-compose build` and `docker-compose up -d`, which spins up a S3 service
- This docker compose is running in detached mode `-d`, so will no output anything to the terminal
@ -27,7 +34,7 @@ Steps for pipeline:
- Once model build is finished, you can run the `prediction.py` file to generate prediction
- By default, the prediction pipeline will select the best model based on **mean absolute error** from the model registry
- This can be overwritten by specifying a model_path, which will load an alternative model
- This can be overwritten by specifying a model_path, which will load an alternative model
- There are two ways of getting data into the pipeline:
- Using the `--data` argument:
- This is a JSON string which can be passed as `python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'`
@ -55,12 +62,16 @@ Steps for pipeline:
- Add precommit hooks (linters, branch names, etc)
- Sphinx documentation
- Sort out local mock up services
- Sort out Model Registry
- Sort out Model Registry
- Sort out Data version control
- pre-commit hooks:
- The types of hooks that we want (safety, bandit, iso8 etc)
- The customisations we require
- Add sphinx documentation
- Data Science:
- Implement a metrics class, to hold all metric
- Implement a metrics class, to hold all metric
- Rebuild metrics script (Could be a one off but good to have)
- Determine metrics
- Determine metrics
- Implement and test custom model (Tensorflow Decision Trees etc)
- Orchestration:
- Lambda handler for the pipeline
- Lambda handler for the pipeline

View file

@ -1,25 +1,118 @@
import pandas as pd
import os
from typing import Protocol
class DataLoader:
class DataLoader(Protocol):
"""
Interface for all DataLoader classes
"""
@staticmethod
def load(filepath: str, index_col: str = None) -> pd.DataFrame:
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None:
"""
Load different datasets
Loading data from the relevant source
"""
class LocalDataLoader:
"""
Implements the DataLoader Protocol for local files
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
if filepath.endswith('.parquet'):
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith('.csv'):
elif filepath.endswith(".csv"):
df = pd.read_csv(filepath, index_col=index_col)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
class S3MockDataLoader:
"""
Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
# TODO: Ingest these as environment variables in the docker compose file
storage_options = {
"key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
"client_kwargs": {
"endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
},
}
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=storage_options)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(
filepath, index_col=index_col, storage_options=storage_options
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
storage_options = {
"key": os.environ.get("AWS_ACCESS_KEY_ID"),
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY"),
}
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=storage_options)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(
filepath, index_col=index_col, storage_options=storage_options
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
"""
Use factory pattern to determine which loading method we use
"""
if runtime_environment is None:
runtime_environment = "local"
dataloader_types = {
"local": LocalDataLoader(),
"local-mock": S3MockDataLoader(),
"dev": S3DataLoader(),
"staging": S3DataLoader(),
"prod": S3DataLoader(),
}
if runtime_environment not in dataloader_types:
raise ValueError("Incorrect runtime environment specified")
return dataloader_types[runtime_environment]

View file

@ -1,7 +1,7 @@
from pathlib import Path
import numpy as np
import pandas as pd
from model_data.BaseUtility import Definitions
from BaseUtility import Definitions
from simulation_system.core.Settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
@ -11,7 +11,7 @@ from simulation_system.core.Settings import (
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON
COLUMNS_TO_MERGE_ON,
)
from typing import List
@ -23,7 +23,6 @@ class DataProcessor:
def __init__(self, filepath: Path) -> None:
self.filepath = filepath
self.data = None
def load_data(self, low_memory=False) -> None:
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
@ -32,16 +31,20 @@ class DataProcessor:
"""
Load data and begin initial cleaning
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory'])
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.confine_data()
# TODO: CLean number of heated rooms and habitable rooms
self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings'])
# TODO: CLean number of heated rooms and habitable rooms
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
self.clean_multi_glaze_proportion()
self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count'])
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
self.remap_columns()
if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1:
if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
@ -53,11 +56,15 @@ class DataProcessor:
"""
If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields
"""
# Each uprn can fille backward from recent and forward fill from oldest
# Each uprn can fille backward from recent and forward fill from oldest
# The groupby changes the order and we use the index to make the original data
filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply(
lambda group: group.fillna(method='bfill').fillna(method='ffill')
).reset_index().set_index('level_1').sort_index()
filled_data = (
self.data.groupby("UPRN", group_keys=True)[columns_to_fill]
.apply(lambda group: group.fillna(method="bfill").fillna(method="ffill"))
.reset_index()
.set_index("level_1")
.sort_index()
)
self.data[columns_to_fill] = filled_data[columns_to_fill]
@ -67,15 +74,20 @@ class DataProcessor:
"""
# Map all anomaly values to None
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
)
)
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP)
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
self.data = data
@ -84,80 +96,130 @@ class DataProcessor:
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
cleaning_averages = self.data.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True,
dropna=False
).apply(median_without_missing).reset_index()
cleaning_averages = (
self.data.groupby(
[
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
],
observed=True,
dropna=False,
)
.apply(median_without_missing)
.reset_index()
)
general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
general_averages = (
self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True)
.apply(median_without_missing)
.reset_index()
)
property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply(
median_without_missing).reset_index()
property_averages = (
self.data.groupby(["PROPERTY_TYPE"], observed=True)
.apply(median_without_missing)
.reset_index()
)
built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
built_form_averages = (
self.data.groupby(["BUILT_FORM"], observed=True)
.apply(median_without_missing)
.reset_index()
)
# We can clean up any NA's in the cleaning averages with the general averages here
cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'],
suffixes=['', '_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'],
suffixes=['', '_PROPERTY_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'],
suffixes=['', '_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = pd.merge(
cleaning_averages,
general_averages,
on=["PROPERTY_TYPE", "BUILT_FORM"],
suffixes=["", "_AVERAGE"],
)
cleaning_averages_filled = pd.merge(
cleaning_averages_filled,
property_averages,
on=["PROPERTY_TYPE"],
suffixes=["", "_PROPERTY_AVERAGE"],
)
cleaning_averages_filled = pd.merge(
cleaning_averages_filled,
built_form_averages,
on=["BUILT_FORM"],
suffixes=["", "_BUILT_FORM_AVERAGE"],
)
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_AVERAGE"])
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"]
)
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_PROPERTY_AVERAGE"])
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT_PROPERTY_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE'])
columns=[
"TOTAL_FLOOR_AREA_PROPERTY_AVERAGE",
"FLOOR_HEIGHT_PROPERTY_AVERAGE",
]
)
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE"])
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT_BUILT_FORM_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
columns=[
"TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE",
"FLOOR_HEIGHT_BUILT_FORM_AVERAGE",
]
)
# If there still is na values, use average across all properties in consituecy
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean())
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT'].mean())
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA"].mean())
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT"].mean())
# If the consituency is all NA values, then take UK AVERAGE VALUES
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
FLOOR_HEIGHT_NATIONAL_AVERAGE)
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
'''
"""
Reduce the data futher by keeping only datasets with multiple epcs
'''
"""
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on='UPRN')
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict) -> None:
"""
@ -166,7 +228,7 @@ class DataProcessor:
for key, values in column_mappings.items():
if key not in self.data.columns:
print('Column mapping incorrectly specified')
print("Column mapping incorrectly specified")
exit(1)
for value in values:
self.data[key] = self.data[key].astype(value)
@ -189,13 +251,16 @@ class DataProcessor:
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
self.data = self.data[
~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])
]
def clean_multi_glaze_proportion(self) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (
self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
no_multi_glaze_proportion_index = pd.isnull(
self.data["MULTI_GLAZE_PROPORTION"]
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100

View file

@ -4,10 +4,10 @@ Create additional features from the dataset
import pandas as pd
from typing import List
from model_data.simulation_system.core.Logger import logger
from core.Logger import logger
RDSAP_CHANGE_DROP_COLUMNS = ['UPRN', 'HEAT_DEMAND_CHANGE']
HEAT_DEMAND_CHANGE_DROP_COLUMNS = ['UPRN', 'RDSAP_CHANGE']
RDSAP_CHANGE_DROP_COLUMNS = ["UPRN", "HEAT_DEMAND_CHANGE"]
HEAT_DEMAND_CHANGE_DROP_COLUMNS = ["UPRN", "RDSAP_CHANGE"]
RANDOM_SEED = 0
@ -18,7 +18,9 @@ class FeatureProcessor:
"""
@staticmethod
def drop_unused_columns(df: pd.DataFrame, target_column: str = "RDSAP_CHANGE") -> pd.DataFrame:
def drop_unused_columns(
df: pd.DataFrame, target_column: str = "RDSAP_CHANGE"
) -> pd.DataFrame:
"""
Remove the unused columns for RDS
"""
@ -29,15 +31,15 @@ class FeatureProcessor:
return df
@staticmethod
def retain_features(df: pd.DataFrame, features: List[str] = None):
def retain_features(df: pd.DataFrame, features: List[str] | None = None):
"""
Determine which columns to keep for modelling
"""
if features is None:
features = df.columns
features = df.columns.to_list()
else:
if not set(features).issubset(df.columns):
logger.error('Features defined is not contained in data')
logger.error("Features defined is not contained in data")
exit(1)
df = df[features]
@ -45,7 +47,9 @@ class FeatureProcessor:
return df
@staticmethod
def subsample_data(df: pd.DataFrame, subsample_amount: int = None) -> pd.DataFrame:
def subsample_data(
df: pd.DataFrame, subsample_amount: int | None = None
) -> pd.DataFrame:
"""
Sample data to reduce number of rows for model building if needed
"""
@ -58,8 +62,8 @@ class FeatureProcessor:
self,
df: pd.DataFrame,
target_column: str = "RDSAP_CHANGE",
features: List[str] = None,
subsample_amount: int = None
features: List[str] | None = None,
subsample_amount: int | None = None,
) -> pd.DataFrame:
"""
Pipeline to get data ready for building a model

View file

@ -0,0 +1,130 @@
"""
Generate metrics and enable regeneration of metrics if new metrics are generated
Key tasks:
- Specify metric functions that take in prediction vs actual to generate a metric value
- Given a model and test data, produce a suite of all metrics
"""
import pandas as pd
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from core.Settings import (
RESIDUAL_TRUE_LABEL,
RESIDUAL_PREDICTION_LABEL,
SEABORN_RESIDUAL_AXIS_FONTSIZE,
SEABORN_RESIDUAL_TITLE_FONTSIZE,
SEABORN_RESIDUAL_STYLE,
SEABORN_RESIDUAL_ASPECT_RATIO,
SEABORN_RESIDUAL_PLOT_DPI,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
SEABORN_RESIDUAL_LINE_WIDTH,
)
from sklearn.metrics import (
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
)
# Dummy example of new metric that can be added - must be true and prediction as arguments
def max_error(y_true: pd.Series, y_pred: pd.Series):
return max(y_true - y_pred)
METRIC_TO_APPLY = [
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
# max_error
]
def sort_by_metric(
data: pd.DataFrame, optimse_metric: str, best_model_column_name: str
) -> pd.DataFrame:
"""
Helper function to sort data frame by metric and append a best model flag
"""
# Ascending as we want lowest error values
data = data.sort_values(optimse_metric, ascending=True).reset_index(drop=True)
data[best_model_column_name] = [False] * len(data)
data.loc[0, best_model_column_name] = True
return data
class Metrics:
"""
All metric functions used to generate a dictionary of metrics
"""
@staticmethod
def list_metric_functions() -> list:
"""
Gather all metric functions to run
"""
return [metric_to_apply.__name__ for metric_to_apply in METRIC_TO_APPLY]
@staticmethod
def generate_metric_suite(actuals: pd.Series, predictions: pd.Series) -> pd.Series:
"""
For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values
"""
metric_dict = {}
for metric_function in METRIC_TO_APPLY:
metric_dict[metric_function.__name__] = metric_function(
actuals, predictions
)
metrics = pd.Series(metric_dict)
return metrics
@staticmethod
def generate_plot_suite():
"""
Can do all metric ploting
"""
@staticmethod
def generate_residual_plot(
actuals: pd.Series,
predictions: pd.Series,
target_column: str,
output_filepath: Path | str,
):
# TODO: can have a model.metric_outputs method
# FOr not just do it here
residual_df = pd.DataFrame(
list(zip(actuals, predictions)),
columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL],
)
# image formatting
sns.set(style=SEABORN_RESIDUAL_STYLE)
ax = sns.scatterplot(
x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df
)
ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO)
ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE)
ax.set_ylabel(
f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE
) # ylabel
ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE)
# Square aspect ratio
ax.plot(
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
linewidth=SEABORN_RESIDUAL_LINE_WIDTH,
)
plt.tight_layout()
plt.savefig(output_filepath, dpi=SEABORN_RESIDUAL_PLOT_DPI)

View file

@ -1,45 +1,65 @@
# Using a simply python file as settings for now
# Using a simply python file as settings for now
# TODO: migrate to dynaconf
from pathlib import Path
METRIC_FILENAME = "metrics.csv"
OPTIMISE_METRIC = "mean_absolute_error"
BEST_MODEL_COLUMN_NAME = "best_model"
# TODO: remove these setting elsewhere for CML
RESIDUAL_TRUE_LABEL = "true"
RESIDUAL_PREDICTION_LABEL = "pred"
RESIDUAL_FILE = "residual.png"
SEABORN_RESIDUAL_AXIS_FONTSIZE = 12
SEABORN_RESIDUAL_TITLE_FONTSIZE = 22
SEABORN_RESIDUAL_STYLE = "whitegrid"
SEABORN_RESIDUAL_ASPECT_RATIO = "equal"
SEABORN_RESIDUAL_PLOT_DPI = 120
SEABORN_RESIDUAL_RANGE = [-100, 100]
SEABORN_RESIDUAL_LINE_COLOUR = "black"
SEABORN_RESIDUAL_LINE_WIDTH = 1
# Can move to a hyperparmeters file
# If anything we might want to have a file that can be loaded and sent to this script
MODEL_HYPERPARAMETERS = {
"autogluon": {
'problem_type': 'regression',
'eval_metric': 'mean_absolute_error',
'time_limit': 30,
'presets': 'medium_quality',
'excluded_model_types': None
"problem_type": "regression",
"eval_metric": "mean_absolute_error",
"time_limit": 30,
"presets": "medium_quality",
"excluded_model_types": None,
}
}
TIMESTAMP_FORMAT = "%Y_%m_%d_%H_%M_%S"
RANDOM_SEED = 0
SUBSAMPLE_FACTOR = 200
TRAIN_AND_VALIDATION_DATA_NAME = 'train_validation_data.parquet'
TEST_DATA_NAME = 'test_data.parquet'
TRAIN_AND_VALIDATION_DATA_NAME = "train_validation_data.parquet"
TEST_DATA_NAME = "test_data.parquet"
REGISTRY_FILE = "model_registry.csv"
MODEL_DIRECTORY = "model_directory"
BASE_REGISTRY_PATH = Path(__file__).parent.parent / MODEL_DIRECTORY
BASE_REGISTRY_PATH = Path(__file__).parent.parent / MODEL_DIRECTORY
PREDICTION_LOCATION = Path("predictions")
PREDICTION_FILE = 'prediction.json'
METADATA_FILE = 'metadata.json'
PREDICTION_FILE = "prediction.json"
METADATA_FILE = "metadata.json"
MODEL_FOLDER = "model"
METRICS_FOLDER = "metrics"
DEPLOYMENT_FOLDER = "deployment"
DEPLOYMENT_FOLDER = "deployment"
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE = 70
FLOOR_HEIGHT_NATIONAL_AVERAGE = 2.45
COLUMNS_TO_MERGE_ON = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS"
]
"NUMBER_HEATED_ROOMS",
]
FULLY_GLAZED_DESCRIPTIONS = [
"Fully double glazed",
@ -50,48 +70,45 @@ FULLY_GLAZED_DESCRIPTIONS = [
]
FIXED_FEATURES = [
'PROPERTY_TYPE',
'BUILT_FORM',
'CONSTRUCTION_AGE_BAND',
'NUMBER_HABITABLE_ROOMS',
'CONSTITUENCY',
'NUMBER_HEATED_ROOMS',
'FIXED_LIGHTING_OUTLETS_COUNT',
'FLOOR_HEIGHT',
'FLOOR_LEVEL',
'TOTAL_FLOOR_AREA',
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"CONSTITUENCY",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
"FLOOR_HEIGHT",
"FLOOR_LEVEL",
"TOTAL_FLOOR_AREA",
]
COMPONENT_FEATURES = [
'TRANSACTION_TYPE',
'WALLS_DESCRIPTION',
'FLOOR_DESCRIPTION',
'LIGHTING_DESCRIPTION',
'ROOF_DESCRIPTION',
'MAINHEAT_DESCRIPTION',
'HOTWATER_DESCRIPTION',
'MAIN_FUEL',
'MECHANICAL_VENTILATION',
'SECONDHEAT_DESCRIPTION',
'ENERGY_TARIFF', # Not sure if this is relevant
'SOLAR_WATER_HEATING_FLAG',
'PHOTO_SUPPLY',
'WINDOWS_DESCRIPTION',
'GLAZED_TYPE',
'MULTI_GLAZE_PROPORTION',
'LIGHTING_DESCRIPTION',
'LOW_ENERGY_LIGHTING',
'NUMBER_OPEN_FIREPLACES',
'MAINHEATCONT_DESCRIPTION',
'EXTENSION_COUNT',
"TRANSACTION_TYPE",
"WALLS_DESCRIPTION",
"FLOOR_DESCRIPTION",
"LIGHTING_DESCRIPTION",
"ROOF_DESCRIPTION",
"MAINHEAT_DESCRIPTION",
"HOTWATER_DESCRIPTION",
"MAIN_FUEL",
"MECHANICAL_VENTILATION",
"SECONDHEAT_DESCRIPTION",
"ENERGY_TARIFF", # Not sure if this is relevant
"SOLAR_WATER_HEATING_FLAG",
"PHOTO_SUPPLY",
"WINDOWS_DESCRIPTION",
"GLAZED_TYPE",
"MULTI_GLAZE_PROPORTION",
"LIGHTING_DESCRIPTION",
"LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES",
"MAINHEATCONT_DESCRIPTION",
"EXTENSION_COUNT",
# 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION
]
# For these fields, we take an average if we have multiple values
AVERAGE_FIXED_FEATURES = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT"
]
AVERAGE_FIXED_FEATURES = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
# For these fields, we take the latest value if we have multiple values
# Since more recent EPCs have been conducted with more rigour, we assume that the latest value is
@ -105,11 +122,7 @@ LATEST_FIELD = [
]
# If we see thee features changing, we don't use the EPC, since deem it not to be reliable
MANDATORY_FIXED_FEATURES = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTITUENCY"
]
MANDATORY_FIXED_FEATURES = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTITUENCY"]
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
@ -119,14 +132,16 @@ EARLIEST_EPC_DATE = "2014-08-01"
RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
def ordinal(n):
if 10 <= n % 100 <= 20:
suffix = 'th'
suffix = "th"
else:
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th')
suffix = {1: "st", 2: "nd", 3: "rd"}.get(n % 10, "th")
return str(n) + suffix
FLOOR_LEVEL_MAP = {
"Basement": -1,
"Ground": 0,
@ -145,8 +160,7 @@ BUILT_FORM_REMAP = {
}
DATA_PROCESSOR_SETTINGS = {
'low_memory': False,
'epc_minimum_count': 1,
'column_mappings': {'UPRN': [int, str]}
"low_memory": False,
"epc_minimum_count": 1,
"column_mappings": {"UPRN": [int, str]},
}

View file

@ -2,7 +2,7 @@ version: '3'
services:
minio:
image: minio/minio
image: minio/minio:RELEASE.2022-05-26T05-48-41Z
ports:
- "9000:9000"
- "9001:9001"
@ -12,6 +12,45 @@ services:
MINIO_ROOT_USER: &MINIO_USER admin
MINIO_ROOT_PASSWORD: &MINIO_PASS password
command: server --console-address ":9001" /data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
simulation_system_training:
build:
context: ./
dockerfile: ./Dockerfiles/Dockerfile.training
image: simulation_system_training
environment:
ENDPOINT_URL: http://minio:9000/
AWS_ACCESS_KEY_ID: *MINIO_USER
AWS_SECRET_ACCESS_KEY: *MINIO_PASS
tty: true
depends_on:
minio:
condition: service_healthy
# command:
# ["bash"]
# simulation_system_prediction:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.prediction
# image: simulation_system_prediction
# environment:
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# simulation_system_training:
# condition: service_completed_successfully
# command:
# ["bash"]
# volumes:
# minio_storage: {}
# minio_storage: {}

View file

@ -10,15 +10,16 @@ from core.Settings import (
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON
COLUMNS_TO_MERGE_ON,
)
from core.DataProcessor import DataProcessor
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates"
# TODO: Have a look at temporal features
def app():
# Get all the files in the directory
@ -29,7 +30,7 @@ def app():
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
# 116
# 116
# 128048706
# PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic
# -certificates/domestic-E09000021-Kingston-upon-Thames')
@ -48,12 +49,14 @@ def app():
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if max(property_data[MANDATORY_FIXED_FEATURES].nunique()) > 1:
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1):
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
mandatory_field_data = (
property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
)
# Taking just the last row, which is the percentage change from the latest to previous one only
# property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1
@ -63,17 +66,25 @@ def app():
cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list()
# Get the corresponding groupby and merge, and fill in NA values
cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[
['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean()
cleaning_averages_to_merge = cleaning_averages.groupby(
cleaned_columns_to_merge_on
)[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on,
suffixes=['', '_AVERAGE'])
modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna(
modified_property_data['TOTAL_FLOOR_AREA_AVERAGE'])
modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna(
modified_property_data['FLOOR_HEIGHT_AVERAGE'])
modified_property_data = pd.merge(
property_data,
cleaning_averages_to_merge,
on=cleaned_columns_to_merge_on,
suffixes=["", "_AVERAGE"],
)
modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[
"TOTAL_FLOOR_AREA"
].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"])
modified_property_data["FLOOR_HEIGHT"] = modified_property_data[
"FLOOR_HEIGHT"
].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"])
modified_property_data = modified_property_data.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"]
)
for field in AVERAGE_FIXED_FEATURES:
@ -94,8 +105,9 @@ def app():
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = modified_property_data[
COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
]
COMPONENT_FEATURES
+ ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
@ -107,15 +119,24 @@ def app():
starting_record = variable_data.iloc[idx]
ending_record = variable_data.iloc[idx + 1]
rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE]
heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE]
rdsap_change = (
ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE]
)
heat_demand_change = (
ending_record[HEAT_DEMAND_RESPONSE]
- starting_record[HEAT_DEMAND_RESPONSE]
)
# TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and
# floors, we may want to use the U-value. We may also want to handle the (assumed) tags
# within descriptions
starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
starting_record = starting_record[
COMPONENT_FEATURES + ["LODGEMENT_DATE"]
].add_suffix("_STARTING")
ending_record = ending_record[
COMPONENT_FEATURES + ["LODGEMENT_DATE"]
].add_suffix("_ENDING")
features = pd.concat([starting_record, ending_record])
@ -125,14 +146,14 @@ def app():
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
**fixed_data,
**features.to_dict()
**features.to_dict(),
}
)
dataset.extend(property_model_data)
output = pd.DataFrame(dataset)
output.to_parquet('./dataset.parquet')
output.to_parquet("./dataset.parquet")
if __name__ == "__main__":

View file

@ -0,0 +1,44 @@
import os
import urllib.parse
from predictions import prediction
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
"""
# Assuming a file in a bucket landing for now?
# Assuming we have a model to use
# bucket = event["Records"][0]["s3"]["bucket"]["name"]
# key = urllib.parse.unquote_plus(
# event["Records"][0]["s3"]["bucket"]["key"], encoding="utf-8"
# )
payload = event["body"]
data_path = payload["file_location"]
property_id = payload["property_id"]
portfolio_id = payload["portfolio_id"]
created_at = payload["created_at"]
# prediction_file = bucket + "/" + key
# TODO: put a model into s3, both locally and in aws
# model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/")
model_path = os.environ.get(
"MODEL_PATH",
"s3://retrofit-model-directory-{RUNTIME_ENVIRONMENT}/RDSAP_CHANGE/autogluon/rdsap_change-medium_quality-30-2023-08-30_11-43-41/deployment/",
)
try:
outputs = prediction(model_path=model_path, data_path=data_path)
# Store into s3, with key of {portfolio_id}-{property_id}
outputs.to_csv(
f"s3://retrofit-sap-prediction-{RUNTIME_ENVIRONMENT}/{portfolio_id}/{property_id}/{created_at}.csv"
)
except (Exception, KeyError, ValueError):
print("Prediction failed")

View file

@ -2,24 +2,26 @@
Script to load MLModel class and generate predictions
"""
import os
import json
import argparse
from model_data.simulation_system.MLModel.Models import AutogluonModel
from model_data.simulation_system.core.Logger import logger
from model_data.simulation_system.core.DataLoader import DataLoader
import pandas as pd
from typing import Optional
from datetime import datetime
from model_data.simulation_system.core.Settings import (
from MLModel.Models import AutogluonModel
from core.Logger import logger
from core.DataLoader import dataloader_factory
from core.Settings import (
BASE_REGISTRY_PATH,
REGISTRY_FILE,
PREDICTION_LOCATION,
PREDICTION_FILE,
METADATA_FILE
METADATA_FILE,
TIMESTAMP_FORMAT,
)
TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
# FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
@ -33,30 +35,51 @@ def ingest_arguments() -> argparse.Namespace:
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser.add_argument('--target-column', type=str, help='The response variable you are predicting for',
choices=['RDSAP_CHANGE', 'HEAT_DEMAND_CHANGE'], default='RDSAP_CHANGE')
parser.add_argument('--model-path', type=str,
help='If you wish to use a specific model, specify the model path here')
parser.add_argument('--data', type=str, help='Json data for predictions')
parser.add_argument('--data-path', type=str, help='Location of Parquet dataset to load for training')
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--target-column",
type=str,
help="The response variable you are predicting for",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
parser.add_argument(
"--model-path",
type=str,
help="If you wish to use a specific model, specify the model path here",
)
parser.add_argument("--data", type=str, help="Json data for predictions")
parser.add_argument(
"--data-path", type=str, help="Location of Parquet dataset to load for training"
)
args = parser.parse_args()
return args
def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data: pd.DataFrame = None,
data_path: Optional[str] = None):
def prediction(
target_column: str = "RDSAP_CHANGE",
model_path: str | None = None,
data: Optional[pd.DataFrame | str] = None,
data_path: Optional[str] = None,
):
"""
Main pipeline function
"""
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if RUNTIME_ENVIRONMENT == "local":
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist")
exit(1)
if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist")
exit(1)
elif RUNTIME_ENVIRONMENT == "dev":
registry_path = (
"s3://retrofit-model-directory-dev/RDSAP_CHANGE/model_registry.csv"
)
else:
raise NotImplemented("TO be implemented")
if model_path is not None:
logger.info("User specified a model to load - ignoring registry")
@ -67,11 +90,11 @@ def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data
# TODO: Think about where registry will sit/ type
logger.info("Loading best model from registry")
registry_df = pd.read_csv(registry_path)
best_model_df = registry_df[registry_df['best_model']]
best_model_df = registry_df[registry_df["best_model"]]
model_location = best_model_df['model_location'].values[0]
model_type = best_model_df['model_type'].values[0]
model_name = best_model_df['model_name'].values[0]
model_location = best_model_df["model_location"].values[0]
model_type = best_model_df["model_type"].values[0]
model_name = best_model_df["model_name"].values[0]
logger.info("--- Model Info: ---")
logger.info(f"Model type: {model_type}")
@ -84,23 +107,34 @@ def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data
exit(1)
if data_path and data is None:
logger.info("Loading data from provided path")
data = DataLoader().load(filepath=data_path, index_col="UPRN")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(filepath=data_path, index_col="UPRN")
# TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
data = data.sample(1)
if data is None:
raise ValueError("No data loaded")
# # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
# data = data.sample(1)
else:
logger.info('Using data provided')
data = json.loads(data)
logger.info("Using data provided")
data = json.loads(str(data))
data = pd.DataFrame([data])
print(data)
logger.info("--- Loading Model ---")
model = AutogluonModel()
if model_type == "autogluon":
logger.info("Using an Autogluon model")
model = AutogluonModel()
else:
logger.error("No other model currently")
exit(1)
model.load_model(filepath=model_location)
logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data)
return pd.concat([data["recommendation_id"], prediction], axis=1)
# Save prediction some where?
# prediction.to_csv("s3?")
@ -108,24 +142,23 @@ def prediction(target_column: str = "RDSAP_CHANGE", model_path: str = None, data
# TODO: Check how we want to structure outputs
# For now, just categorise by uprn and timestamp
# Assume one uprn coming in for now
uprn = data.index.values[0]
# uprn = data.index.values[0]
# Saving prediction local for now
# TODO: change uprn to TARGET_ID, put in setting
logger.info("--- Outputting prediction and metadata --- ")
output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP
output_base.mkdir(parents=True, exist_ok=True)
# # Saving prediction local for now
# # TODO: change uprn to TARGET_ID, put in setting
# logger.info("--- Outputting prediction and metadata --- ")
# output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP
# output_base.mkdir(parents=True, exist_ok=True)
# TODO: change model.model.info to a class method for MLModel
json_prediction = prediction.to_json(output_base / PREDICTION_FILE)
prediction_metadata = {
"model_type": model_type,
"model_name": model_name,
"model_location": model_location,
"model_settings": model.model.info()
}
# json_prediction = prediction.to_json(output_base / PREDICTION_FILE)
# prediction_metadata = {
# "model_type": model_type,
# "model_name": model_name,
# "model_location": model_location,
# "model_settings": model.model_metadata(),
# }
pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE)
# pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE)
return json_prediction
@ -134,6 +167,10 @@ if __name__ == "__main__":
args = ingest_arguments()
# Data can be passed in as JSON string: python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'
# Data path can be passed as so: python3 predictions.py --data-path
# ../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet
prediction(target_column=args.target_column, model_path=args.model_path, data=args.data, data_path=args.data_path)
# Data path can be passed as so: python3 predictions.py --data-path ./model_build_data/change_data/rdsap_full/test_data.parquet
prediction(
target_column=args.target_column,
model_path=args.model_path,
data=args.data,
data_path=args.data_path,
)

View file

@ -0,0 +1,118 @@
"""
Script to regenerate metrics for all the models in the model registry
Key task:
- Load model registry
- For each model in the registry, generate the metrics (Key questions here is what if the test data changes)
- Save the new metrics out to s3 bucket
"""
import os
import argparse
from s3pathlib import S3Path
import pandas as pd
from tqdm import tqdm
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.Settings import (
OPTIMISE_METRIC,
MODEL_DIRECTORY,
REGISTRY_FILE,
BEST_MODEL_COLUMN_NAME,
)
from MLModel.Models import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--test-filepath",
type=str,
help="Location of Parquet dataset to load for testing",
required=True,
)
parser.add_argument(
"--target-column",
type=str,
help="The response variable",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
args = parser.parse_args()
return args
def regenerate_metrics(test_filepath: str, target_column: str) -> None:
"""
Recreate all metrics for all models
"""
logger.info("--- Loading test data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
test_df = dataloader.load(filepath=test_filepath)
logger.info("--- Loading model registry ---")
logger.info(f"Loading registry for {target_column} models")
registry_df = dataloader.load(
filepath=S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri
)
logger.info("Extract non-metric columns")
registry_df = registry_df[["model_type", "model_name", "model_location"]]
logger.info("--- Regenerating metrics ---")
metric_suite = Metrics()
metrics_df = pd.DataFrame(columns=metric_suite.list_metric_functions())
for _, row in tqdm(registry_df.iterrows()):
logger.info(f"--- Loading Model ({row['model_name']}) ---")
model = model_factory(model_type=row["model_type"])()
model.load_model(filepath=row["model_location"])
metrics = metric_suite.generate_metric_suite(
model=model, data=test_df, target_column=target_column
)
# Add metrics row by row
metrics_df = pd.concat([metrics_df, metrics], axis=0).reset_index(drop=True)
# Add metrics df to registry df side by side
registry_df = pd.concat([registry_df, metrics_df], axis=1)
logger.info(f"--- Sorting by Optimise Metric ({OPTIMISE_METRIC}) ---")
registry_df = sort_by_metric(
data=registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving model metrics ---")
registry_df.to_csv(S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri)
if __name__ == "__main__":
logger.info("---Begin Pipeline to regenerate metrics---")
logger.info("---Ingest Arguments---")
args = ingest_arguments()
regenerate_metrics(
test_filepath=args.test_filepath, target_column=args.target_column
)

View file

@ -0,0 +1,3 @@
autogluon==0.8.2
pandas==1.5.3
pre-commit==3.3.3

View file

@ -0,0 +1,4 @@
autogluon==0.8.2
pandas==1.5.3
seaborn==0.12.2
pre-commit==3.3.3

View file

@ -2,79 +2,109 @@ from core.Logger import logger
import argparse
import pandas as pd
from pathlib import Path
from core.Settings import (
RANDOM_SEED,
TRAIN_AND_VALIDATION_DATA_NAME,
TEST_DATA_NAME
)
from core.Settings import RANDOM_SEED, TRAIN_AND_VALIDATION_DATA_NAME, TEST_DATA_NAME
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument('--filepath', type=str, help='Location of Parquet dataset to load', required=True)
parser.add_argument('--output-folder', type=str, help='Location of Parquet dataset to save', required=True)
parser.add_argument('--percentage', type=float, help='Percentage of data to use as test data', default=None)
parser.add_argument('--volume', type=int, help='Volume of data to use as test data', default=None)
parser.add_argument('--sampling', type=str, help='Type of sampling to do for test data', choices=['random', 'stratified'], default='random')
parser.add_argument(
"--filepath",
type=str,
help="Location of Parquet dataset to load",
required=True,
)
parser.add_argument(
"--output-folder",
type=str,
help="Location of Parquet dataset to save",
required=True,
)
parser.add_argument(
"--percentage",
type=float,
help="Percentage of data to use as test data",
default=None,
)
parser.add_argument(
"--volume", type=int, help="Volume of data to use as test data", default=None
)
parser.add_argument(
"--sampling",
type=str,
help="Type of sampling to do for test data",
choices=["random", "stratified"],
default="random",
)
args = parser.parse_args()
return args
def main(filepath: str, output_folder: str, percentage: float, volume: int, sampling: str):
def main(
filepath: str, output_folder: str, percentage: float, volume: int, sampling: str
):
"""
Load a dataset in and split out the training+validation data and the test data.
"""
logger.info('---Loading Data---')
logger.info("---Loading Data---")
data = pd.read_parquet(filepath).reset_index(drop=True)
if percentage and volume is None:
test_amount = round(len(data)*percentage)
test_amount = round(len(data) * percentage)
elif percentage is None and volume:
test_amount = volume
elif percentage is None and volume is None:
logger.error('No amount specified - please specify either a percentage or volume')
logger.error(
"No amount specified - please specify either a percentage or volume"
)
exit(1)
else:
logger.info('Both percentage and volume specified - taking largest of the two')
test_amount = max(round(len(data)*percentage), volume)
logger.info("Both percentage and volume specified - taking largest of the two")
test_amount = max(round(len(data) * percentage), volume)
logger.info(f'---Extracting {test_amount} from dataset to be test data')
logger.info(f"---Extracting {test_amount} from dataset to be test data")
if sampling == 'random':
logger.info('--- Using random sample method ---')
train_validation_data = pd.DataFrame()
test_data = pd.DataFrame()
if sampling == "random":
logger.info("--- Using random sample method ---")
sample_index = data.sample(n=test_amount, random_state=RANDOM_SEED).index
train_validation_data = data.drop(sample_index)
test_data = data.iloc[sample_index]
elif sampling =='stratified':
# Not yet implemented
elif sampling == "stratified":
# Not yet implemented
pass
logger.info('--- Saving data ---')
logger.info("--- Saving data ---")
train_validation_data.to_parquet(Path(output_folder)/ TRAIN_AND_VALIDATION_DATA_NAME)
test_data.to_parquet(Path(output_folder)/ TEST_DATA_NAME)
train_validation_data.to_parquet(
Path(output_folder) / TRAIN_AND_VALIDATION_DATA_NAME
)
test_data.to_parquet(Path(output_folder) / TEST_DATA_NAME)
logger.info(" ---Pipeline complete---")
logger.info(' ---Pipeline complete---')
if __name__ == "__main__":
logger.info('--- Generate test data pipeline ---')
logger.info("--- Generate test data pipeline ---")
args = ingest_arguments()
main(
filepath=args.filepath,
filepath=args.filepath,
output_folder=args.output_folder,
percentage=args.percentage,
volume=args.volume,
sampling=args.sampling
)
percentage=args.percentage,
volume=args.volume,
sampling=args.sampling,
)

View file

@ -1,13 +1,19 @@
import argparse
import os
# from s3pathlib import S3Path
# import boto3
from pathlib import Path
from datetime import datetime
from model_data.simulation_system.core.Logger import logger
from model_data.simulation_system.core.DataLoader import DataLoader
from model_data.simulation_system.core.FeatureProcessor import FeatureProcessor
from model_data.simulation_system.MLModel.Models import AutogluonModel
import pandas as pd
from model_data.simulation_system.core.Settings import (
from MLModel.Models import model_factory
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor
from core.Settings import (
MODEL_DIRECTORY,
BASE_REGISTRY_PATH,
REGISTRY_FILE,
@ -15,13 +21,24 @@ from model_data.simulation_system.core.Settings import (
METRICS_FOLDER,
DEPLOYMENT_FOLDER,
SUBSAMPLE_FACTOR,
MODEL_HYPERPARAMETERS
MODEL_HYPERPARAMETERS,
TIMESTAMP_FORMAT,
RESIDUAL_FILE,
BEST_MODEL_COLUMN_NAME,
OPTIMISE_METRIC,
)
import seaborn as sns
import matplotlib.pyplot as plt
TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
# STORAGE_OPTIONS = {
# "key": os.environ.get("AWS_ACCESS_KEY_ID", 'admin'),
# "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", 'password'),
# "client_kwargs": {
# "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
# }
# }
# FOR TESTING
# train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet"
@ -43,21 +60,45 @@ TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# S3_CLIENT.create_bucket
# S3_CLIENT.list_buckets()
# df = pd.read_parquet(
# "s3://model_build_data/change_data/rdsap_full/train_validation_data.parquet",
# storage_options=STORAGE_OPTIONS
# )
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument('--train-filepath', type=str, help='Location of Parquet dataset to load for training',
required=True)
parser.add_argument('--test-filepath', type=str, help='Location of Parquet dataset to load for testing',
required=True)
parser.add_argument('--model-type', type=str, help='The type of model to train', choices=["autogluon"],
default="autogluon")
parser.add_argument('--target-column', type=str, help='The response variable',
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"], default='RDSAP_CHANGE')
parser.add_argument(
"--train-filepath",
type=str,
help="Location of Parquet dataset to load for training",
required=True,
)
parser.add_argument(
"--test-filepath",
type=str,
help="Location of Parquet dataset to load for testing",
required=True,
)
parser.add_argument(
"--model-type",
type=str,
help="The type of model to train",
choices=["autogluon"],
default="autogluon",
)
parser.add_argument(
"--target-column",
type=str,
help="The response variable",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
args = parser.parse_args()
@ -69,116 +110,129 @@ def training(
test_filepath: str,
target_column: str = "RDSAP_CHANGE",
model_type: str = "autogluon",
hyperparameters: dict = None
hyperparameters: dict | None = None,
) -> None:
"""
Pipeline to run training on the dataset
"""
logger.info('--- Loading data ---')
dataloader = DataLoader()
logger.info("--- Loading data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
logger.info('--- Feature processing ---')
if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline")
logger.info("--- Feature processing ---")
feature_processor = FeatureProcessor()
# This is for convenience for now
subsample_amount = round(len(train_df) / SUBSAMPLE_FACTOR)
train_df = feature_processor.process(train_df, target_column=target_column, subsample_amount=subsample_amount)
train_df = feature_processor.process(
train_df, target_column=target_column, subsample_amount=subsample_amount
)
test_df = feature_processor.process(test_df, target_column=target_column)
logger.info('--- Build Model ---')
logger.info("--- Build Model ---")
logger.info("--- Load Hyperparameters ---")
if hyperparameters is None:
logger.info("Use base hyperparameters in settings")
hyperparameters = MODEL_HYPERPARAMETERS[model_type]
logger.info(f'Hyperparameters are: {hyperparameters}')
logger.info(f"Hyperparameters are: {hyperparameters}")
if model_type == "autogluon":
model_root = f"{target_column}-{hyperparameters['presets']}-{hyperparameters['time_limit']}-{TIMESTAMP}".lower()
output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root
logger.info(
"--- Loading model configuration (Model type and Naming convention) ---"
)
# We might want to have hyperparameters in the names to make models more recognisable
model_toolkit = model_factory(
model_type=model_type, hyperparameters=hyperparameters
)
model = AutogluonModel(
output_filepath=output_base / MODEL_FOLDER
)
else:
raise ValueError("No alternative model implemented yet")
model_root = (
f"{target_column}-{model_toolkit['naming_attributes']}-{TIMESTAMP}".lower()
)
output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root
# Will need to pass output path to model (for saving purposes)
model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER)
model.train_model(
data=train_df,
target_column=target_column,
hyperparameters=hyperparameters
data=train_df, target_column=target_column, hyperparameters=hyperparameters
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath)
logger.info('--- Generate evaluation metrics ---')
logger.info("--- Generate evaluation metrics ---")
metrics = Metrics()
metrics_df = model.model_evaluation(
validation_data=test_df,
target_column=target_column,
metrics_location=output_base / METRICS_FOLDER
metrics_location=output_base / METRICS_FOLDER,
metrics=metrics,
)
logger.info("--- Generate metric outputs using predictions ---")
# TODO: can have a model.metric_outputs method
# FOr not just do it here
residual_df = pd.DataFrame(list(zip(test_df[target_column], model.predictions)), columns=['true', 'pred'])
# image formatting
# TODO: move to settings file , AXIS_FONT, TITLE_FONT
axis_fs = 18 # fontsize
title_fs = 22 # fontsize
sns.set(style="whitegrid")
ax = sns.scatterplot(x="true", y="pred", data=residual_df)
ax.set_aspect('equal')
ax.set_xlabel(f'True {target_column}', fontsize=axis_fs)
ax.set_ylabel(f'Predicted {target_column}', fontsize=axis_fs) # ylabel
ax.set_title('Residuals', fontsize=title_fs)
# Square aspect ratio
ax.plot([-100, 100], [-100, 100], 'black', linewidth=1)
plt.tight_layout()
RESIDUAL_FILE = "residuals.png"
plt.savefig(output_base / METRICS_FOLDER / RESIDUAL_FILE, dpi=120)
# metrics.generate_plot_suite()
metrics.generate_residual_plot(
actuals=test_df[target_column],
predictions=model.predictions,
target_column=target_column,
output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE,
)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
# plt.savefig(RESIDUAL_FILE, dpi=120)
# TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment
# Imagining for now that the model trained here is the best model amongst all models built
logger.info("--- Optimising model for deployment ---")
deployment_model_path = model.optimise_model_for_deployment(deployment_path=output_base / DEPLOYMENT_FOLDER)
logger.info(f"Optimised version of best model can be found at: {deployment_model_path}")
deployment_model_path = model.optimise_model_for_deployment(
deployment_path=output_base / DEPLOYMENT_FOLDER
)
logger.info(
f"Optimised version of best model can be found at: {deployment_model_path}"
)
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
# Loading registry from s3
logger.info("--- Append registry with new model ---")
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
# registry_path = S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri
# registry = RegistryHandler(location=registry_path)
if registry_path.exists():
logger.info("Registry file found - Loading into Dataframe")
registry_df = pd.read_csv(registry_path, index_col=None)
else:
# TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns
registry_df = pd.DataFrame(
columns=['model_type', 'model_name', 'model_location', 'mean_absolute_error', 'root_mean_squared_error',
'mean_squared_error', 'r2', 'pearsonr', 'median_absolute_error', 'mape', 'best_model'])
# TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns
columns = [
BEST_MODEL_COLUMN_NAME,
"model_type",
"model_name",
"model_location",
] + metrics.list_metric_functions()
registry_df = pd.DataFrame(columns=columns)
model_details_df = pd.DataFrame(
[{
'model_type': model_type,
'model_name': model_root,
'model_location': deployment_model_path
}]
[
{
"model_type": model_type,
"model_name": model_root,
"model_location": deployment_model_path,
}
]
)
registry_row = pd.concat([model_details_df, metrics_df], axis=1)
@ -187,9 +241,12 @@ def training(
# TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and
# regenerate new metrics
# TODO: decide metric to optimise to
registry_df = registry_df.sort_values("mean_absolute_error", ascending=False).reset_index(drop=True)
registry_df['best_model'] = [False] * len(registry_df)
registry_df.loc[0, 'best_model'] = True
registry_df = sort_by_metric(
registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving new model to registry ---")
# Ensure the directory exists
@ -200,9 +257,10 @@ def training(
if __name__ == "__main__":
logger.info('---Begin Pipeline---')
logger.info('---Ingest Arguments---')
logger.info("---Begin Pipeline---")
logger.info("---Ingest Arguments---")
args = ingest_arguments()
# To run script: python3 training.py --train-filepath
@ -213,5 +271,5 @@ if __name__ == "__main__":
train_filepath=args.train_filepath,
test_filepath=args.test_filepath,
target_column=args.target_column,
model_type=args.model_type
model_type=args.model_type,
)