Added hnalder and dockerfiles

This commit is contained in:
Michael Duong 2023-08-31 14:46:10 +01:00
parent a137cccc05
commit cfc004077a
14 changed files with 490 additions and 304 deletions

View file

@ -9,8 +9,9 @@ Key tasks:
- Generate Inference
"""
from numpy import ndarray
from pathlib import Path
from typing import Protocol, NamedTuple
from typing import Protocol, NamedTuple, Any
import pandas as pd
@ -30,13 +31,13 @@ class MLModel(Protocol):
"""
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameter: dict
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:
"""
For the given data and hyperparameters (specified to the model), a model is trained
"""
def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame:
def generate_predictions(self, data: pd.DataFrame) -> ndarray[Any, Any] | None:
"""
For the given dataframe, model is loaded and predictions are generated
"""
@ -45,8 +46,8 @@ class MLModel(Protocol):
self,
validation_data: pd.DataFrame,
target_column: str,
metrics_location: Path = None,
) -> NamedTuple:
metrics_location: Path | None = None,
) -> pd.DataFrame | None:
"""
For any validation data, a set of predictions and metrics are return
"""
@ -56,7 +57,7 @@ class MLModel(Protocol):
Perfomance post processing on Model to ensure ready for deployment
"""
def model_metadata(self) -> dict:
def model_metadata(self) -> dict | None:
"""
Extract out model metadata as dictionary
"""

View file

@ -8,12 +8,14 @@ Key tasks:
- Generate Inference
"""
from typing import Any
from pathlib import Path
import pandas as pd
from autogluon.tabular import TabularDataset, TabularPredictor
from sklearn.metrics import mean_absolute_percentage_error
from core.Logger import logger
from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME
from MLModel.BaseMLModel import MLModel
AUTOGLUON_HYPERPARAMETERS = [
@ -23,13 +25,13 @@ AUTOGLUON_HYPERPARAMETERS = [
"presets",
"excluded_model_types",
]
METRIC_FILENAME = "metrics.csv"
def model_factory(model_type: str, hyperparameters: dict = None) -> MLModel:
def model_factory(model_type: str, hyperparameters: dict) -> dict:
"""
Use factory pattern to register the different ML implementations
"""
model_types = {
"autogluon": {
"model": AutogluonModel,
@ -45,25 +47,27 @@ class AutogluonModel:
Autogluon model that implements the MLModel Protocol
"""
def __init__(self, output_filepath: Path = None) -> None:
def __init__(self, output_filepath: Path | None = None) -> None:
self.model = None
self.output_filepath = output_filepath
self.predictions = None
def load_model(self, filepath: Path) -> None:
def load_model(self, filepath: str | Path) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
filepath = str(filepath)
self.model = TabularPredictor.load(path=filepath)
def save_model(self, output_filepath: Path = None) -> None:
def save_model(self, output_filepath: Path | None = None) -> None:
"""
Providing a path, this function will save the model to be used.
"""
logger.info("Using AutoGluon Model - Model saving already occured")
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict = None
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:
"""
For the given data and hyperparameters, a model is trained
@ -92,7 +96,7 @@ class AutogluonModel:
excluded_model_types=hyperparameters["excluded_model_types"],
)
def generate_predictions(self, data: pd.DataFrame) -> pd.DataFrame:
def generate_predictions(self, data: pd.DataFrame) -> pd.Series:
"""
For the given dataframe, model is loaded and predictions are generated
"""
@ -101,7 +105,7 @@ class AutogluonModel:
print("No model loaded/ trained")
exit(1)
predictions = self.model.predict(data)
predictions = pd.Series(self.model.predict(data))
return predictions
@ -110,7 +114,7 @@ class AutogluonModel:
validation_data: pd.DataFrame,
target_column: str,
metrics: Metrics,
metrics_location: Path = None,
metrics_location: Path | None = None,
metric_filename: str = METRIC_FILENAME,
) -> pd.DataFrame:
"""
@ -118,6 +122,7 @@ class AutogluonModel:
"""
if metrics_location is None:
logger.warning("Metrics will be outputted to current folder")
metrics_location = Path()
if self.model is None:
logger.error("No model loaded/ trained - Unable to generate evaluation")
@ -126,18 +131,13 @@ class AutogluonModel:
# Generate prediction, load metrics suite, generate metrics betweeen the two
predictions = self.generate_predictions(validation_data)
performance = self.model.evaluate(validation_data)
performance = metrics.generate_metric_suite(
actuals=validation_data[target_column], predictions=predictions
)
logger.info("Prediction used for evaluations are saved in self.prediction")
self.predictions = predictions
# TODO: Can have a custom metric class that defines all different metrics we want
metric_mape = mean_absolute_percentage_error(
validation_data[target_column], predictions
)
performance["mape"] = metric_mape
logger.info("Saving metric file as metric.csv")
metrics_location.mkdir(exist_ok=True)
@ -148,7 +148,9 @@ class AutogluonModel:
return metrics_df
def optimise_model_for_deployment(self, deployment_path: Path = None) -> str:
def optimise_model_for_deployment(
self, deployment_path: Path | str | None = None
) -> Any:
"""
We can optimise the deployment for a autogluon model
"""
@ -158,11 +160,18 @@ class AutogluonModel:
if deployment_path is None:
raise ValueError("Deployment path required")
deployment_path = str(deployment_path)
# This will return a string path of the location
return self.model.clone_for_deployment(deployment_path)
def model_metadata(self) -> dict:
def model_metadata(self) -> dict[str, Any]:
"""
For Autogluon model, use the inbuilt model info method
"""
if self.model is None:
logger.error("No Model loaded/ trained")
exit(1)
return self.model.info()

View file

@ -9,7 +9,7 @@ class DataLoader(Protocol):
"""
@staticmethod
def load(filepath: str, index_col: str = None) -> pd.DataFrame:
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None:
"""
Loading data from the relevant source
"""
@ -21,7 +21,7 @@ class LocalDataLoader:
"""
@staticmethod
def load(filepath: str, index_col: str = None) -> pd.DataFrame:
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
@ -44,7 +44,7 @@ class S3MockDataLoader:
"""
@staticmethod
def load(filepath: str, index_col: str = None) -> pd.DataFrame:
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
# TODO: Ingest these as environment variables in the docker compose file
storage_options = {
@ -75,7 +75,7 @@ class S3DataLoader:
"""
@staticmethod
def load(filepath: str, index_col: str = None) -> pd.DataFrame:
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
storage_options = {
"key": os.environ.get("AWS_ACCESS_KEY_ID"),
@ -96,7 +96,7 @@ class S3DataLoader:
return df
def dataloader_factory(runtime_environment: str = None) -> DataLoader:
def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
"""
Use factory pattern to determine which loading method we use
"""

View file

@ -1,7 +1,7 @@
from pathlib import Path
import numpy as np
import pandas as pd
from model_data.BaseUtility import Definitions
from BaseUtility import Definitions
from simulation_system.core.Settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
@ -11,7 +11,7 @@ from simulation_system.core.Settings import (
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON
COLUMNS_TO_MERGE_ON,
)
from typing import List
@ -23,7 +23,6 @@ class DataProcessor:
def __init__(self, filepath: Path) -> None:
self.filepath = filepath
self.data = None
def load_data(self, low_memory=False) -> None:
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
@ -32,16 +31,20 @@ class DataProcessor:
"""
Load data and begin initial cleaning
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS['low_memory'])
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.confine_data()
# TODO: CLean number of heated rooms and habitable rooms
self.recast_df_columns(column_mappings=DATA_PROCESSOR_SETTINGS['column_mappings'])
# TODO: CLean number of heated rooms and habitable rooms
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
self.clean_multi_glaze_proportion()
self.retain_multiple_epc_properties(epc_minimum_count=DATA_PROCESSOR_SETTINGS['epc_minimum_count'])
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
self.remap_columns()
if DATA_PROCESSOR_SETTINGS['epc_minimum_count'] >= 1:
if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
@ -53,11 +56,15 @@ class DataProcessor:
"""
If we have a minimum of 2 epcs, we can do back fill and forward fill on certain data fields
"""
# Each uprn can fille backward from recent and forward fill from oldest
# Each uprn can fille backward from recent and forward fill from oldest
# The groupby changes the order and we use the index to make the original data
filled_data = self.data.groupby("UPRN", group_keys=True)[columns_to_fill].apply(
lambda group: group.fillna(method='bfill').fillna(method='ffill')
).reset_index().set_index('level_1').sort_index()
filled_data = (
self.data.groupby("UPRN", group_keys=True)[columns_to_fill]
.apply(lambda group: group.fillna(method="bfill").fillna(method="ffill"))
.reset_index()
.set_index("level_1")
.sort_index()
)
self.data[columns_to_fill] = filled_data[columns_to_fill]
@ -67,15 +74,20 @@ class DataProcessor:
"""
# Map all anomaly values to None
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
)
)
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
data['FLOOR_LEVEL'] = data['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
data['BUILT_FROM'] = data['BUILT_FORM'].replace(BUILT_FORM_REMAP)
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
self.data = data
@ -84,80 +96,130 @@ class DataProcessor:
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
cleaning_averages = self.data.groupby(
["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
observed=True,
dropna=False
).apply(median_without_missing).reset_index()
cleaning_averages = (
self.data.groupby(
[
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
],
observed=True,
dropna=False,
)
.apply(median_without_missing)
.reset_index()
)
general_averages = self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
general_averages = (
self.data.groupby(["PROPERTY_TYPE", "BUILT_FORM"], observed=True)
.apply(median_without_missing)
.reset_index()
)
property_averages = self.data.groupby(["PROPERTY_TYPE"], observed=True).apply(
median_without_missing).reset_index()
property_averages = (
self.data.groupby(["PROPERTY_TYPE"], observed=True)
.apply(median_without_missing)
.reset_index()
)
built_form_averages = self.data.groupby(["BUILT_FORM"], observed=True).apply(
median_without_missing).reset_index()
built_form_averages = (
self.data.groupby(["BUILT_FORM"], observed=True)
.apply(median_without_missing)
.reset_index()
)
# We can clean up any NA's in the cleaning averages with the general averages here
cleaning_averages_filled = pd.merge(cleaning_averages, general_averages, on=['PROPERTY_TYPE', 'BUILT_FORM'],
suffixes=['', '_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, property_averages, on=['PROPERTY_TYPE'],
suffixes=['', '_PROPERTY_AVERAGE'])
cleaning_averages_filled = pd.merge(cleaning_averages_filled, built_form_averages, on=['BUILT_FORM'],
suffixes=['', '_BUILT_FORM_AVERAGE'])
cleaning_averages_filled = pd.merge(
cleaning_averages,
general_averages,
on=["PROPERTY_TYPE", "BUILT_FORM"],
suffixes=["", "_AVERAGE"],
)
cleaning_averages_filled = pd.merge(
cleaning_averages_filled,
property_averages,
on=["PROPERTY_TYPE"],
suffixes=["", "_PROPERTY_AVERAGE"],
)
cleaning_averages_filled = pd.merge(
cleaning_averages_filled,
built_form_averages,
on=["BUILT_FORM"],
suffixes=["", "_BUILT_FORM_AVERAGE"],
)
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_AVERAGE'])
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_AVERAGE"])
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"]
)
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_PROPERTY_AVERAGE'])
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_PROPERTY_AVERAGE"])
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT_PROPERTY_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_PROPERTY_AVERAGE', 'FLOOR_HEIGHT_PROPERTY_AVERAGE'])
columns=[
"TOTAL_FLOOR_AREA_PROPERTY_AVERAGE",
"FLOOR_HEIGHT_PROPERTY_AVERAGE",
]
)
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE'])
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE"])
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT_BUILT_FORM_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=['TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE', 'FLOOR_HEIGHT_BUILT_FORM_AVERAGE'])
columns=[
"TOTAL_FLOOR_AREA_BUILT_FORM_AVERAGE",
"FLOOR_HEIGHT_BUILT_FORM_AVERAGE",
]
)
# If there still is na values, use average across all properties in consituecy
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
cleaning_averages_filled['TOTAL_FLOOR_AREA'].mean())
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
cleaning_averages_filled['FLOOR_HEIGHT'].mean())
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(cleaning_averages_filled["TOTAL_FLOOR_AREA"].mean())
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(cleaning_averages_filled["FLOOR_HEIGHT"].mean())
# If the consituency is all NA values, then take UK AVERAGE VALUES
cleaning_averages_filled['TOTAL_FLOOR_AREA'] = cleaning_averages_filled['TOTAL_FLOOR_AREA'].fillna(
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled['FLOOR_HEIGHT'] = cleaning_averages_filled['FLOOR_HEIGHT'].fillna(
FLOOR_HEIGHT_NATIONAL_AVERAGE)
cleaning_averages_filled["TOTAL_FLOOR_AREA"] = cleaning_averages_filled[
"TOTAL_FLOOR_AREA"
].fillna(TOTAL_FLOOR_AREA_NATIONAL_AVERAGE)
cleaning_averages_filled["FLOOR_HEIGHT"] = cleaning_averages_filled[
"FLOOR_HEIGHT"
].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
'''
"""
Reduce the data futher by keeping only datasets with multiple epcs
'''
"""
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
# take UPRNS with multiple EPCs
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on='UPRN')
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict) -> None:
"""
@ -166,7 +228,7 @@ class DataProcessor:
for key, values in column_mappings.items():
if key not in self.data.columns:
print('Column mapping incorrectly specified')
print("Column mapping incorrectly specified")
exit(1)
for value in values:
self.data[key] = self.data[key].astype(value)
@ -189,13 +251,16 @@ class DataProcessor:
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
self.data = self.data[
~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])
]
def clean_multi_glaze_proportion(self) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
no_multi_glaze_proportion_index = pd.isnull(self.data["MULTI_GLAZE_PROPORTION"]) & (
self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
no_multi_glaze_proportion_index = pd.isnull(
self.data["MULTI_GLAZE_PROPORTION"]
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100

View file

@ -31,12 +31,12 @@ class FeatureProcessor:
return df
@staticmethod
def retain_features(df: pd.DataFrame, features: List[str] = None):
def retain_features(df: pd.DataFrame, features: List[str] | None = None):
"""
Determine which columns to keep for modelling
"""
if features is None:
features = df.columns
features = df.columns.to_list()
else:
if not set(features).issubset(df.columns):
logger.error("Features defined is not contained in data")
@ -47,7 +47,9 @@ class FeatureProcessor:
return df
@staticmethod
def subsample_data(df: pd.DataFrame, subsample_amount: int = None) -> pd.DataFrame:
def subsample_data(
df: pd.DataFrame, subsample_amount: int | None = None
) -> pd.DataFrame:
"""
Sample data to reduce number of rows for model building if needed
"""
@ -60,8 +62,8 @@ class FeatureProcessor:
self,
df: pd.DataFrame,
target_column: str = "RDSAP_CHANGE",
features: List[str] = None,
subsample_amount: int = None,
features: List[str] | None = None,
subsample_amount: int | None = None,
) -> pd.DataFrame:
"""
Pipeline to get data ready for building a model

View file

@ -6,8 +6,41 @@ Key tasks:
"""
import pandas as pd
from core.Settings import OPTIMISE_METRIC
from MLModel.BaseMLModel import MLModel
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from core.Settings import (
RESIDUAL_TRUE_LABEL,
RESIDUAL_PREDICTION_LABEL,
SEABORN_RESIDUAL_AXIS_FONTSIZE,
SEABORN_RESIDUAL_TITLE_FONTSIZE,
SEABORN_RESIDUAL_STYLE,
SEABORN_RESIDUAL_ASPECT_RATIO,
SEABORN_RESIDUAL_PLOT_DPI,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
SEABORN_RESIDUAL_LINE_WIDTH,
)
from sklearn.metrics import (
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
)
# Dummy example of new metric that can be added - must be true and prediction as arguments
def max_error(y_true: pd.Series, y_pred: pd.Series):
return max(y_true - y_pred)
METRIC_TO_APPLY = [
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
# max_error
]
def sort_by_metric(
@ -16,7 +49,8 @@ def sort_by_metric(
"""
Helper function to sort data frame by metric and append a best model flag
"""
data = data.sort_values(optimse_metric, ascending=False).reset_index(drop=True)
# Ascending as we want lowest error values
data = data.sort_values(optimse_metric, ascending=True).reset_index(drop=True)
data[best_model_column_name] = [False] * len(data)
data.loc[0, best_model_column_name] = True
@ -29,38 +63,68 @@ class Metrics:
"""
@staticmethod
def metric_1(predictions: pd.Series, actuals: pd.Series) -> float:
"""
Can leverage ML packages like sklearn for individual metrics like MAPE etc
"""
pass
@staticmethod
def metric_2(predictions: pd.Series, actuals: pd.Series) -> float:
"""
Can leverage ML packages like sklearn for individual metrics like MAPE etc
"""
pass
def list_metric_functions(self) -> list:
def list_metric_functions() -> list:
"""
Gather all metric functions to run
"""
pass
return [metric_to_apply.__name__ for metric_to_apply in METRIC_TO_APPLY]
def generate_metric_suite(
self, model: MLModel, data: pd.DataFrame, target_column: str
) -> pd.Series:
@staticmethod
def generate_metric_suite(actuals: pd.Series, predictions: pd.Series) -> pd.Series:
"""
For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values
"""
predictions = model.generate_predictions(data=data)
actuals = data[target_column]
metric_dict = {}
for key, metric_function in asd: # TODO:
metric_dict[key] = metric_function(predictions, actuals)
for metric_function in METRIC_TO_APPLY:
metric_dict[metric_function.__name__] = metric_function(
actuals, predictions
)
metrics = pd.Series([metric_dict])
metrics = pd.Series(metric_dict)
return metrics
@staticmethod
def generate_plot_suite():
"""
Can do all metric ploting
"""
@staticmethod
def generate_residual_plot(
actuals: pd.Series,
predictions: pd.Series,
target_column: str,
output_filepath: Path | str,
):
# TODO: can have a model.metric_outputs method
# FOr not just do it here
residual_df = pd.DataFrame(
list(zip(actuals, predictions)),
columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL],
)
# image formatting
sns.set(style=SEABORN_RESIDUAL_STYLE)
ax = sns.scatterplot(
x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df
)
ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO)
ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE)
ax.set_ylabel(
f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE
) # ylabel
ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE)
# Square aspect ratio
ax.plot(
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
linewidth=SEABORN_RESIDUAL_LINE_WIDTH,
)
plt.tight_layout()
plt.savefig(output_filepath, dpi=SEABORN_RESIDUAL_PLOT_DPI)

View file

@ -2,6 +2,8 @@
# TODO: migrate to dynaconf
from pathlib import Path
METRIC_FILENAME = "metrics.csv"
OPTIMISE_METRIC = "mean_absolute_error"
BEST_MODEL_COLUMN_NAME = "best_model"

View file

@ -18,21 +18,21 @@ services:
timeout: 20s
retries: 3
# simulation_system_training:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.training
# image: simulation_system_training
# environment:
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# minio:
# condition: service_healthy
# command:
# ["bash"]
simulation_system_training:
build:
context: ./
dockerfile: ./Dockerfiles/Dockerfile.training
image: simulation_system_training
environment:
ENDPOINT_URL: http://minio:9000/
AWS_ACCESS_KEY_ID: *MINIO_USER
AWS_SECRET_ACCESS_KEY: *MINIO_PASS
tty: true
depends_on:
minio:
condition: service_healthy
# command:
# ["bash"]
# simulation_system_prediction:
# build:

View file

@ -10,15 +10,16 @@ from core.Settings import (
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON
COLUMNS_TO_MERGE_ON,
)
from core.DataProcessor import DataProcessor
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates"
# TODO: Have a look at temporal features
def app():
# Get all the files in the directory
@ -29,7 +30,7 @@ def app():
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
# 116
# 116
# 128048706
# PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic
# -certificates/domestic-E09000021-Kingston-upon-Thames')
@ -48,12 +49,14 @@ def app():
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if max(property_data[MANDATORY_FIXED_FEATURES].nunique()) > 1:
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1):
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
mandatory_field_data = (
property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
)
# Taking just the last row, which is the percentage change from the latest to previous one only
# property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1
@ -63,17 +66,25 @@ def app():
cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list()
# Get the corresponding groupby and merge, and fill in NA values
cleaning_averages_to_merge = cleaning_averages.groupby(cleaned_columns_to_merge_on)[
['TOTAL_FLOOR_AREA', 'FLOOR_HEIGHT']].mean()
cleaning_averages_to_merge = cleaning_averages.groupby(
cleaned_columns_to_merge_on
)[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
modified_property_data = pd.merge(property_data, cleaning_averages_to_merge, on=cleaned_columns_to_merge_on,
suffixes=['', '_AVERAGE'])
modified_property_data['TOTAL_FLOOR_AREA'] = modified_property_data['TOTAL_FLOOR_AREA'].fillna(
modified_property_data['TOTAL_FLOOR_AREA_AVERAGE'])
modified_property_data['FLOOR_HEIGHT'] = modified_property_data['FLOOR_HEIGHT'].fillna(
modified_property_data['FLOOR_HEIGHT_AVERAGE'])
modified_property_data = pd.merge(
property_data,
cleaning_averages_to_merge,
on=cleaned_columns_to_merge_on,
suffixes=["", "_AVERAGE"],
)
modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[
"TOTAL_FLOOR_AREA"
].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"])
modified_property_data["FLOOR_HEIGHT"] = modified_property_data[
"FLOOR_HEIGHT"
].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"])
modified_property_data = modified_property_data.drop(
columns=['TOTAL_FLOOR_AREA_AVERAGE', 'FLOOR_HEIGHT_AVERAGE'])
columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"]
)
for field in AVERAGE_FIXED_FEATURES:
@ -94,8 +105,9 @@ def app():
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = modified_property_data[
COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
]
COMPONENT_FEATURES
+ ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
@ -107,15 +119,24 @@ def app():
starting_record = variable_data.iloc[idx]
ending_record = variable_data.iloc[idx + 1]
rdsap_change = ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE]
heat_demand_change = ending_record[HEAT_DEMAND_RESPONSE] - starting_record[HEAT_DEMAND_RESPONSE]
rdsap_change = (
ending_record[RDSAP_RESPONSE] - starting_record[RDSAP_RESPONSE]
)
heat_demand_change = (
ending_record[HEAT_DEMAND_RESPONSE]
- starting_record[HEAT_DEMAND_RESPONSE]
)
# TODO: We need to pre-process the data. For instance, rather than using static for roofs, walls and
# floors, we may want to use the U-value. We may also want to handle the (assumed) tags
# within descriptions
starting_record = starting_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = ending_record[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
starting_record = starting_record[
COMPONENT_FEATURES + ["LODGEMENT_DATE"]
].add_suffix("_STARTING")
ending_record = ending_record[
COMPONENT_FEATURES + ["LODGEMENT_DATE"]
].add_suffix("_ENDING")
features = pd.concat([starting_record, ending_record])
@ -125,14 +146,14 @@ def app():
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
**fixed_data,
**features.to_dict()
**features.to_dict(),
}
)
dataset.extend(property_model_data)
output = pd.DataFrame(dataset)
output.to_parquet('./dataset.parquet')
output.to_parquet("./dataset.parquet")
if __name__ == "__main__":

View file

@ -2,6 +2,8 @@ import os
import urllib.parse
from predictions import prediction
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
def handler(event, context):
"""
@ -11,17 +13,32 @@ def handler(event, context):
# Assuming a file in a bucket landing for now?
# Assuming we have a model to use
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = urllib.parse.unquote_plus(
event["Records"][0]["s3"]["bucket"]["key"], encoding="utf-8"
)
# bucket = event["Records"][0]["s3"]["bucket"]["name"]
# key = urllib.parse.unquote_plus(
# event["Records"][0]["s3"]["bucket"]["key"], encoding="utf-8"
# )
prediction_file = bucket + "/" + key
payload = event["body"]
data_path = payload["file_location"]
property_id = payload["property_id"]
portfolio_id = payload["portfolio_id"]
created_at = payload["created_at"]
# prediction_file = bucket + "/" + key
# TODO: put a model into s3, both locally and in aws
model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/")
# model_path = os.environ.get("MODEL_PATH", "http://minio:9000/data/model_directory/")
model_path = os.environ.get(
"MODEL_PATH",
"s3://retrofit-model-directory-{RUNTIME_ENVIRONMENT}/RDSAP_CHANGE/autogluon/rdsap_change-medium_quality-30-2023-08-30_11-43-41/deployment/",
)
try:
prediction(model_path=model_path, data_path=prediction_file)
outputs = prediction(model_path=model_path, data_path=data_path)
# Store into s3, with key of {portfolio_id}-{property_id}
outputs.to_csv(
f"s3://retrofit-sap-prediction-{RUNTIME_ENVIRONMENT}/{portfolio_id}/{property_id}/{created_at}.csv"
)
except (Exception, KeyError, ValueError):
print("Prediction failed")

View file

@ -2,6 +2,7 @@
Script to load MLModel class and generate predictions
"""
import os
import json
import argparse
import pandas as pd
@ -9,7 +10,7 @@ from typing import Optional
from datetime import datetime
from MLModel.Models import AutogluonModel
from core.Logger import logger
from core.DataLoader import DataLoader
from core.DataLoader import dataloader_factory
from core.Settings import (
BASE_REGISTRY_PATH,
REGISTRY_FILE,
@ -20,6 +21,7 @@ from core.Settings import (
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
# FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
@ -58,19 +60,26 @@ def ingest_arguments() -> argparse.Namespace:
def prediction(
target_column: str = "RDSAP_CHANGE",
model_path: str = None,
data: pd.DataFrame = None,
model_path: str | None = None,
data: Optional[pd.DataFrame | str] = None,
data_path: Optional[str] = None,
):
"""
Main pipeline function
"""
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if RUNTIME_ENVIRONMENT == "local":
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist")
exit(1)
if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist")
exit(1)
elif RUNTIME_ENVIRONMENT == "dev":
registry_path = (
"s3://retrofit-model-directory-dev/RDSAP_CHANGE/model_registry.csv"
)
else:
raise NotImplemented("TO be implemented")
if model_path is not None:
logger.info("User specified a model to load - ignoring registry")
@ -98,13 +107,17 @@ def prediction(
exit(1)
if data_path and data is None:
logger.info("Loading data from provided path")
data = DataLoader().load(filepath=data_path, index_col="UPRN")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(filepath=data_path, index_col="UPRN")
# TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
data = data.sample(1)
if data is None:
raise ValueError("No data loaded")
# # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
# data = data.sample(1)
else:
logger.info("Using data provided")
data = json.loads(data)
data = json.loads(str(data))
data = pd.DataFrame([data])
print(data)
@ -121,6 +134,7 @@ def prediction(
logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data)
return pd.concat([data["recommendation_id"], prediction], axis=1)
# Save prediction some where?
# prediction.to_csv("s3?")
@ -128,23 +142,23 @@ def prediction(
# TODO: Check how we want to structure outputs
# For now, just categorise by uprn and timestamp
# Assume one uprn coming in for now
uprn = data.index.values[0]
# uprn = data.index.values[0]
# Saving prediction local for now
# TODO: change uprn to TARGET_ID, put in setting
logger.info("--- Outputting prediction and metadata --- ")
output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP
output_base.mkdir(parents=True, exist_ok=True)
# # Saving prediction local for now
# # TODO: change uprn to TARGET_ID, put in setting
# logger.info("--- Outputting prediction and metadata --- ")
# output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP
# output_base.mkdir(parents=True, exist_ok=True)
json_prediction = prediction.to_json(output_base / PREDICTION_FILE)
prediction_metadata = {
"model_type": model_type,
"model_name": model_name,
"model_location": model_location,
"model_settings": model.model_metadata(),
}
# json_prediction = prediction.to_json(output_base / PREDICTION_FILE)
# prediction_metadata = {
# "model_type": model_type,
# "model_name": model_name,
# "model_location": model_location,
# "model_settings": model.model_metadata(),
# }
pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE)
# pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE)
return json_prediction

View file

@ -6,20 +6,23 @@ Key task:
- Save the new metrics out to s3 bucket
"""
import os
import argparse
from s3pathlib import S3Path
import pandas as pd
from tqdm import tqdm
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import DataLoader
from core.DataLoader import dataloader_factory
from core.Settings import (
OPTIMISE_METRIC,
MODEL_DIRECTORY,
REGISTRY_FILE,
BEST_MODEL_COLUMN_NAME,
)
from MLModel.Models import AutogluonModel, model_factory
from MLModel.Models import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
def ingest_arguments() -> argparse.Namespace:
@ -54,7 +57,7 @@ def regenerate_metrics(test_filepath: str, target_column: str) -> None:
"""
logger.info("--- Loading test data ---")
dataloader = DataLoader()
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
test_df = dataloader.load(filepath=test_filepath)
logger.info("--- Loading model registry ---")

View file

@ -2,79 +2,109 @@ from core.Logger import logger
import argparse
import pandas as pd
from pathlib import Path
from core.Settings import (
RANDOM_SEED,
TRAIN_AND_VALIDATION_DATA_NAME,
TEST_DATA_NAME
)
from core.Settings import RANDOM_SEED, TRAIN_AND_VALIDATION_DATA_NAME, TEST_DATA_NAME
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description='Inputs for training script')
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument('--filepath', type=str, help='Location of Parquet dataset to load', required=True)
parser.add_argument('--output-folder', type=str, help='Location of Parquet dataset to save', required=True)
parser.add_argument('--percentage', type=float, help='Percentage of data to use as test data', default=None)
parser.add_argument('--volume', type=int, help='Volume of data to use as test data', default=None)
parser.add_argument('--sampling', type=str, help='Type of sampling to do for test data', choices=['random', 'stratified'], default='random')
parser.add_argument(
"--filepath",
type=str,
help="Location of Parquet dataset to load",
required=True,
)
parser.add_argument(
"--output-folder",
type=str,
help="Location of Parquet dataset to save",
required=True,
)
parser.add_argument(
"--percentage",
type=float,
help="Percentage of data to use as test data",
default=None,
)
parser.add_argument(
"--volume", type=int, help="Volume of data to use as test data", default=None
)
parser.add_argument(
"--sampling",
type=str,
help="Type of sampling to do for test data",
choices=["random", "stratified"],
default="random",
)
args = parser.parse_args()
return args
def main(filepath: str, output_folder: str, percentage: float, volume: int, sampling: str):
def main(
filepath: str, output_folder: str, percentage: float, volume: int, sampling: str
):
"""
Load a dataset in and split out the training+validation data and the test data.
"""
logger.info('---Loading Data---')
logger.info("---Loading Data---")
data = pd.read_parquet(filepath).reset_index(drop=True)
if percentage and volume is None:
test_amount = round(len(data)*percentage)
test_amount = round(len(data) * percentage)
elif percentage is None and volume:
test_amount = volume
elif percentage is None and volume is None:
logger.error('No amount specified - please specify either a percentage or volume')
logger.error(
"No amount specified - please specify either a percentage or volume"
)
exit(1)
else:
logger.info('Both percentage and volume specified - taking largest of the two')
test_amount = max(round(len(data)*percentage), volume)
logger.info("Both percentage and volume specified - taking largest of the two")
test_amount = max(round(len(data) * percentage), volume)
logger.info(f'---Extracting {test_amount} from dataset to be test data')
logger.info(f"---Extracting {test_amount} from dataset to be test data")
if sampling == 'random':
logger.info('--- Using random sample method ---')
train_validation_data = pd.DataFrame()
test_data = pd.DataFrame()
if sampling == "random":
logger.info("--- Using random sample method ---")
sample_index = data.sample(n=test_amount, random_state=RANDOM_SEED).index
train_validation_data = data.drop(sample_index)
test_data = data.iloc[sample_index]
elif sampling =='stratified':
# Not yet implemented
elif sampling == "stratified":
# Not yet implemented
pass
logger.info('--- Saving data ---')
logger.info("--- Saving data ---")
train_validation_data.to_parquet(Path(output_folder)/ TRAIN_AND_VALIDATION_DATA_NAME)
test_data.to_parquet(Path(output_folder)/ TEST_DATA_NAME)
train_validation_data.to_parquet(
Path(output_folder) / TRAIN_AND_VALIDATION_DATA_NAME
)
test_data.to_parquet(Path(output_folder) / TEST_DATA_NAME)
logger.info(" ---Pipeline complete---")
logger.info(' ---Pipeline complete---')
if __name__ == "__main__":
logger.info('--- Generate test data pipeline ---')
logger.info("--- Generate test data pipeline ---")
args = ingest_arguments()
main(
filepath=args.filepath,
filepath=args.filepath,
output_folder=args.output_folder,
percentage=args.percentage,
volume=args.volume,
sampling=args.sampling
)
percentage=args.percentage,
volume=args.volume,
sampling=args.sampling,
)

View file

@ -8,11 +8,9 @@ import os
from pathlib import Path
from datetime import datetime
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from MLModel.Models import AutogluonModel, model_factory
from MLModel.Models import model_factory
from core.Logger import logger
from core.Metrics import Metrics
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor
from core.Settings import (
@ -25,17 +23,9 @@ from core.Settings import (
SUBSAMPLE_FACTOR,
MODEL_HYPERPARAMETERS,
TIMESTAMP_FORMAT,
RESIDUAL_TRUE_LABEL,
RESIDUAL_PREDICTION_LABEL,
RESIDUAL_FILE,
SEABORN_RESIDUAL_AXIS_FONTSIZE,
SEABORN_RESIDUAL_TITLE_FONTSIZE,
SEABORN_RESIDUAL_STYLE,
SEABORN_RESIDUAL_ASPECT_RATIO,
SEABORN_RESIDUAL_PLOT_DPI,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
SEABORN_RESIDUAL_LINE_WIDTH,
BEST_MODEL_COLUMN_NAME,
OPTIMISE_METRIC,
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
@ -120,7 +110,7 @@ def training(
test_filepath: str,
target_column: str = "RDSAP_CHANGE",
model_type: str = "autogluon",
hyperparameters: dict = None,
hyperparameters: dict | None = None,
) -> None:
"""
Pipeline to run training on the dataset
@ -131,6 +121,9 @@ def training(
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline")
logger.info("--- Feature processing ---")
feature_processor = FeatureProcessor()
@ -165,6 +158,7 @@ def training(
)
output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root
# Will need to pass output path to model (for saving purposes)
model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER)
model.train_model(
@ -175,56 +169,26 @@ def training(
model.save_model(output_filepath=model.output_filepath)
logger.info("--- Generate evaluation metrics ---")
# TODO: replace this with metrics class
# metrics_df = model.model_evaluation(
# validation_data=test_df,
# target_column=target_column,
# metrics_location=output_base / METRICS_FOLDER,
# metrics = Metrics
# )
metrics = Metrics()
metrics_df = model.model_evaluation(
validation_data=test_df,
target_column=target_column,
metrics_location=output_base / METRICS_FOLDER,
metrics=metrics,
)
logger.info("--- Generate metric outputs using predictions ---")
# TODO: can have a model.metric_outputs method
# FOr not just do it here
residual_df = pd.DataFrame(
list(zip(test_df[target_column], model.predictions)),
columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL],
)
# image formatting
# TODO: move to settings file , AXIS_FONT, TITLE_FONT
sns.set(style=SEABORN_RESIDUAL_STYLE)
ax = sns.scatterplot(
x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df
)
ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO)
ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE)
ax.set_ylabel(
f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE
) # ylabel
ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE)
# Square aspect ratio
ax.plot(
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
linewidth=SEABORN_RESIDUAL_LINE_WIDTH,
)
plt.tight_layout()
plt.savefig(
output_base / METRICS_FOLDER / RESIDUAL_FILE, dpi=SEABORN_RESIDUAL_PLOT_DPI
# metrics.generate_plot_suite()
metrics.generate_residual_plot(
actuals=test_df[target_column],
predictions=model.predictions,
target_column=target_column,
output_filepath=output_base / METRICS_FOLDER / RESIDUAL_FILE,
)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
# plt.savefig(RESIDUAL_FILE, dpi=120)
# TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment
# Imagining for now that the model trained here is the best model amongst all models built
@ -252,21 +216,14 @@ def training(
registry_df = pd.read_csv(registry_path, index_col=None)
else:
# TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns
registry_df = pd.DataFrame(
columns=[
"model_type",
"model_name",
"model_location",
"mean_absolute_error",
"root_mean_squared_error",
"mean_squared_error",
"r2",
"pearsonr",
"median_absolute_error",
"mape",
"best_model",
]
)
columns = [
BEST_MODEL_COLUMN_NAME,
"model_type",
"model_name",
"model_location",
] + metrics.list_metric_functions()
registry_df = pd.DataFrame(columns=columns)
model_details_df = pd.DataFrame(
[
@ -284,11 +241,12 @@ def training(
# TODO: will need a rebuild script metric script -i.e. if we add new metrics, we will want to load models and
# regenerate new metrics
# TODO: decide metric to optimise to
registry_df = registry_df.sort_values(
"mean_absolute_error", ascending=False
).reset_index(drop=True)
registry_df["best_model"] = [False] * len(registry_df)
registry_df.loc[0, "best_model"] = True
registry_df = sort_by_metric(
registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving new model to registry ---")
# Ensure the directory exists