restructuing repo wip

This commit is contained in:
Khalim Conn-Kowlessar 2023-10-05 14:45:56 +01:00
parent f94cbd4385
commit 9d14953efb
40 changed files with 228 additions and 2730 deletions

View file

@ -127,7 +127,7 @@ class Property(Definitions):
"""
ventilation = self.data["mechanical-ventilation"]
# perform some simple cleaning - when checking 300k properties, the only unique values were
# perform some simple cleaning - when checking 300k property_change, the only unique values were
# {'', 'mechanical, supply and extract', 'NO DATA!', 'natural', 'mechanical, extract only'}
if ventilation in self.DATA_ANOMALY_MATCHES or ventilation in [""]:
ventilation = None
@ -145,7 +145,7 @@ class Property(Definitions):
- solar_pv
This is based on the "photo-supply" field in the EPC data.
When checking 100k properties, either the value was "" or a stringified number
When checking 100k property_change, either the value was "" or a stringified number
"""
solar_pv = self.data["photo-supply"]

View file

@ -253,7 +253,7 @@ async def trigger_plan(body: PlanTriggerRequest):
materials_by_type = filter_materials(materials)
cleaned = get_cleaned()
logger.info("Getting components and properties recommendations")
logger.info("Getting components and property_change recommendations")
# TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method

View file

@ -27,7 +27,7 @@ ENVIRONMENT = os.getenv("ENVIRONMENT", "dev")
def app():
"""
For a pre-defined list of constituencies and property data_types, we'll download EPC data from the API
and produce a dataset of cleaned fields so that when we get new properties, we can quickly
and produce a dataset of cleaned fields so that when we get new property_change, we can quickly
sanitise any description data
Currently, this application is just run on a local machine

View file

@ -16,7 +16,6 @@ from model_data.simulation_system.core.Settings import (
RDSAP_RESPONSE,
MAX_SAP_SCORE,
fill_na_map,
FIXED_DESCRIPTON_MAPPED_FEATURES
)
from typing import List
@ -348,7 +347,7 @@ class DataProcessor:
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE")
# If there still is na values, use average across all properties in consituecy
# If there still is na values, use average across all property_change in consituecy
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[variable].mean())
@ -528,126 +527,3 @@ class DataProcessor:
df[column] = df[column].astype(bool)
return df
@classmethod
def difference_data(cls, df: pd.DataFrame):
"""
Given a dataframe and starting and ending columns, this function will convert the features to
differenced the ending subtract the starting value, which is useful for modelling the difference responces
"""
# We ensure that the u value columns are co-erced to a numerical format
uvalue_columns = [col for col in df.columns if "thermal_transmittance" in col]
for uvalue_col in uvalue_columns:
df[uvalue_col] = pd.to_numeric(df[uvalue_col])
key_columns = [
"RDSAP_CHANGE", "HEAT_DEMAND_CHANGE", "CARBON_CHANGE",
"SAP_STARTING", "HEAT_DEMAND_STARTING",
"CARBON_STARTING", "UPRN", "CONSTITUENCY",
"SAP_ENDING", "CARBON_ENDING", "HEAT_DEMAND_ENDING",
"DAYS_TO_STARTING", "DAYS_TO_ENDING"
]
ignore_cols = FIXED_FEATURES + FIXED_DESCRIPTON_MAPPED_FEATURES + key_columns
columns = {x for x in df.columns if x not in ignore_cols}
non_numerical_columns = df.select_dtypes(exclude=['number']).columns.tolist()
non_numerical_columns = [col for col in non_numerical_columns if col in columns]
levels = {col: df[col].unique().tolist() for col in non_numerical_columns}
df = pd.get_dummies(df, columns=non_numerical_columns)
# We make sure there is a starting and ending version of the column
diff_columns = []
no_diff_columns = [] # Store for debugging
for col in columns:
if "_ENDING" in col:
# Don't keep the endings
continue
else:
# We have a starting column so check if we have an ending
if col.replace("_STARTING", "") + "_ENDING" in columns:
diff_columns.append(col)
else:
no_diff_columns.append(col)
if any(c not in FIXED_DESCRIPTON_MAPPED_FEATURES for c in no_diff_columns):
raise Exception("Something went wrong, potentially missed a differencing column")
datatypes = df.dtypes
# Note: We also difference columns like floor area and floor height. We should experiement with this.
# Starting floor area will heavily impact the starting sap value so that feature may be encapsulated by
# the starting value, therefore to explain any differences in the new floor area, it may be enough to
# just consider the difference however we can play around with this.
# Do the differencing
cols_to_append = {}
for starting_col in diff_columns:
base_col = starting_col.replace("_STARTING", "")
if "_STARTING" in starting_col:
ending_col = starting_col.replace("_STARTING", "_ENDING")
else:
ending_col = starting_col + "_ENDING"
if starting_col not in non_numerical_columns:
cols_to_append[f"{base_col}_DIFF"] = df[ending_col] - df[starting_col]
df = df.drop(columns=[starting_col, ending_col])
continue
level_values = list(set(levels[starting_col] + levels[ending_col]))
level_cols = []
for level in level_values:
starting_level_col = "_".join([starting_col, str(level)])
ending_level_col = "_".join([ending_col, str(level)])
if starting_level_col not in df.columns:
# We have no starting, just ending
col_type = datatypes[ending_level_col].name
if col_type == "bool":
cols_to_append[f"{base_col}_{level}_DIFF"] = df[ending_level_col].astype(int)
else:
cols_to_append[f"{base_col}_{level}_DIFF"] = df[ending_level_col]
level_cols.append(ending_level_col)
elif ending_level_col not in df.columns:
# We have no ending, just starting
col_type = datatypes[starting_level_col].name
if col_type == "bool":
cols_to_append[f"{base_col}_{level}_DIFF"] = -1 * df[starting_level_col].astype(int)
else:
cols_to_append[f"{base_col}_{level}_DIFF"] = -1 * df[ending_level_col]
level_cols.append(starting_level_col)
else:
col_type = datatypes[starting_level_col].name
if col_type == "bool":
cols_to_append[f"{base_col}_{level}_DIFF"] = (
df[ending_level_col].astype(int) - df[starting_level_col].astype(int)
)
else:
cols_to_append[f"{base_col}_{level}_DIFF"] = df[ending_level_col] - df[starting_level_col]
level_cols.extend([starting_level_col, ending_level_col])
# Drop the columns
df = df.drop(columns=level_cols)
cols_to_append = pd.DataFrame(cols_to_append)
df = pd.concat([df, cols_to_append], axis=1)
# Perform a final coercing of string True/False columns to boolean
df = cls.coerce_boolean_columns(df, cols_to_ignore=key_columns)
return df

View file

@ -4,7 +4,7 @@ from tqdm import tqdm
import msgpack
from pathlib import Path
from model_data.simulation_system.core.Settings import (
from etl.property_change.settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
@ -14,8 +14,8 @@ from model_data.simulation_system.core.Settings import (
EARLIEST_EPC_DATE,
CARBON_RESPONSE,
)
from model_data.simulation_system.core.DataProcessor import DataProcessor
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3, read_dataframe_from_s3_parquet
from etl.property_change.DataProcessor import DataProcessor
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,

View file

@ -0,0 +1,213 @@
# Using a simply python file as settings for now
# TODO: migrate to dynaconf
from pathlib import Path
METRIC_FILENAME = "metrics.csv"
OPTIMISE_METRIC = "mean_absolute_error"
BEST_MODEL_COLUMN_NAME = "best_model"
# TODO: remove these setting elsewhere for CML
RESIDUAL_TRUE_LABEL = "true"
RESIDUAL_PREDICTION_LABEL = "pred"
RESIDUAL_FILE = "residual.png"
SEABORN_RESIDUAL_AXIS_FONTSIZE = 12
SEABORN_RESIDUAL_TITLE_FONTSIZE = 22
SEABORN_RESIDUAL_STYLE = "whitegrid"
SEABORN_RESIDUAL_ASPECT_RATIO = "equal"
SEABORN_RESIDUAL_PLOT_DPI = 120
SEABORN_RESIDUAL_RANGE = [-100, 100]
SEABORN_RESIDUAL_LINE_COLOUR = "black"
SEABORN_RESIDUAL_LINE_WIDTH = 1
# Can move to a hyperparmeters file
# If anything we might want to have a file that can be loaded and sent to this script
MODEL_HYPERPARAMETERS = {
"autogluon": {
"problem_type": "regression",
"eval_metric": "mean_absolute_error",
"time_limit": 45,
"presets": "medium_quality",
"excluded_model_types": None,
}
}
TIMESTAMP_FORMAT = "%Y_%m_%d_%H_%M_%S"
RANDOM_SEED = 0
SUBSAMPLE_FACTOR = 200
TRAIN_AND_VALIDATION_DATA_NAME = "train_validation_data.parquet"
TEST_DATA_NAME = "test_data.parquet"
REGISTRY_FILE = "model_registry.csv"
MODEL_DIRECTORY = "model_directory"
BASE_REGISTRY_PATH = Path(__file__).parent.parent / MODEL_DIRECTORY
PREDICTION_LOCATION = Path("predictions")
PREDICTION_FILE = "prediction.json"
METADATA_FILE = "metadata.json"
MODEL_FOLDER = "model"
METRICS_FOLDER = "metrics"
DEPLOYMENT_FOLDER = "deployment"
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE = 70
FLOOR_HEIGHT_NATIONAL_AVERAGE = 2.45
AVERAGE_FIXED_FEATURES = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
COLUMNS_TO_MERGE_ON = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
]
FULLY_GLAZED_DESCRIPTIONS = [
"Fully double glazed",
"High performance glazing",
"Fully triple glazed",
"Full secondary glazing",
"Multiple glazing throughout",
]
FIXED_FEATURES = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"CONSTITUENCY",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
COMPONENT_FEATURES = [
"TRANSACTION_TYPE",
"WALLS_DESCRIPTION",
"FLOOR_DESCRIPTION",
"LIGHTING_DESCRIPTION",
"ROOF_DESCRIPTION",
"MAINHEAT_DESCRIPTION",
"HOTWATER_DESCRIPTION",
"MAIN_FUEL",
"MECHANICAL_VENTILATION",
"SECONDHEAT_DESCRIPTION",
"ENERGY_TARIFF", # Not sure if this is relevant
"SOLAR_WATER_HEATING_FLAG",
"PHOTO_SUPPLY",
"WINDOWS_DESCRIPTION",
"GLAZED_TYPE",
"MULTI_GLAZE_PROPORTION",
"LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES",
"MAINHEATCONT_DESCRIPTION",
"EXTENSION_COUNT",
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
# 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION
]
# For these fields, we take the latest value if we have multiple values
# Since more recent EPCs have been conducted with more rigour, we assume that the latest value is
# the most accurate
LATEST_FIELD = [
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
"CONSTRUCTION_AGE_BAND", # This is a field we're probably want to use verisk data for
]
# If we see thee features changing, we don't use the EPC, since deem it not to be reliable
MANDATORY_FIXED_FEATURES = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTITUENCY"]
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
# and Wales from 31 July 2014
EARLIEST_EPC_DATE = "2014-08-01"
RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
CARBON_RESPONSE = "CO2_EMISSIONS_CURRENT"
def ordinal(n):
if 10 <= n % 100 <= 20:
suffix = "th"
else:
suffix = {1: "st", 2: "nd", 3: "rd"}.get(n % 10, "th")
return str(n) + suffix
FLOOR_LEVEL_MAP = {
"Basement": -1,
"Ground": 0,
"ground floor": 0,
"20+": 20,
"21st or above": 21,
**{str(i).zfill(2): i for i in range(0, 21)},
**{ordinal(i): i for i in range(-1, 21)},
**{str(i): i for i in range(-1, 21)},
**{i: i for i in range(-1, 21)},
}
BUILT_FORM_REMAP = {
"Enclosed End-Terrace": "End-Terrace",
"Enclosed Mid-Terrace": "Mid-Terrace",
}
DATA_PROCESSOR_SETTINGS = {
"low_memory": False,
"epc_minimum_count": 1,
"column_mappings": {"UPRN": [int, str]},
}
# This has a manual mapping of the column types required
COLUMNTYPES = {
'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object',
'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64',
'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64',
'CONSTRUCTION_AGE_BAND': 'object',
'TRANSACTION_TYPE': 'object',
'WALLS_DESCRIPTION': 'object',
'FLOOR_DESCRIPTION': 'object',
'LIGHTING_DESCRIPTION': 'object',
'ROOF_DESCRIPTION': 'object',
'MAINHEAT_DESCRIPTION': 'object',
'HOTWATER_DESCRIPTION': 'object', 'MAIN_FUEL': 'object',
'MECHANICAL_VENTILATION': 'object',
'SECONDHEAT_DESCRIPTION': 'object', 'ENERGY_TARIFF': 'object',
'SOLAR_WATER_HEATING_FLAG': 'object', 'PHOTO_SUPPLY': 'float64',
'WINDOWS_DESCRIPTION': 'object',
'GLAZED_TYPE': 'object',
'MULTI_GLAZE_PROPORTION': 'float64',
'LOW_ENERGY_LIGHTING': 'float64',
'NUMBER_OPEN_FIREPLACES': 'float64',
'MAINHEATCONT_DESCRIPTION': 'object',
'EXTENSION_COUNT': 'float64',
'LODGEMENT_DATE': 'object',
}
# For modelling, we don't allow records with more than 100 SAP points
MAX_SAP_SCORE = 100
fill_na_map = {
# There are some descriptions, such as "To be used only when there is no heating/hot-water system or data is from
# a community network" that could be clustered with unknown fuel
"MAIN_FUEL": "UNKNOWN",
"MECHANICAL_VENTILATION": "Unknown",
"SECONDHEAT_DESCRIPTION": "None",
"ENERGY_TARIFF": "Unknown",
# We set solar water heating flag to N - we could investigate using a different category entirely
"SOLAR_WATER_HEATING_FLAG": "N",
"GLAZED_TYPE": "not defined",
"MULTI_GLAZE_PROPORTION": 0,
"LOW_ENERGY_LIGHTING": 0,
"MAINHEATCONT_DESCRIPTION": "Unknown",
"EXTENSION_COUNT": 0,
"NUMBER_OPEN_FIREPLACES": 0
}

View file

@ -56,7 +56,7 @@ class BoreholeClient:
# EXAMPLE
# There are ~1.4 million entries in this dataset and so we firstly want to reduce the number of
# entries in here if possible before we produce any form of comparison between our properties, to infer
# entries in here if possible before we produce any form of comparison between our property_change, to infer
# the distance from the property to the nearest borehole
# Let's take a sample

View file

View file

@ -1,5 +1,5 @@
"""
This script produces the dataset used to model the wall area of properties, which is used to estimate the cost
This script produces the dataset used to model the wall area of property_change, which is used to estimate the cost
of insulation measures within homes
"""
import os

View file

@ -0,0 +1 @@
PyPDF2

View file

@ -83,7 +83,7 @@ resource "aws_db_instance" "default" {
publicly_accessible = true
}
# Set up the bucket that recieve the csv uploads of properties to be retrofit
# Set up the bucket that recieve the csv uploads of property_change to be retrofit
module "s3_presignable_bucket" {
source = "./modules/s3_presignable_bucket"
bucketname = "retrofit-plan-inputs-${var.stage}"

View file

@ -1,650 +0,0 @@
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
from typing import Dict, Optional, List
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
median_absolute_error, mean_absolute_percentage_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from model_data.EpcClean import EpcClean
from statsmodels.stats.outliers_influence import variance_inflation_factor
from tqdm import tqdm
from utils.logger import setup_logger
logger = setup_logger()
class SapModel:
# We want to estimate for making improvements on different property components
RESPONSE = "current-energy-efficiency"
# We could potentially build models by constituency to avoid having too many
# features in the model
BASE_FEATURES = [
"property-type",
"built-form",
"construction-age-band",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"transaction-type"
]
COMPONENT_FEATURES = [
"walls-description",
"floor-description",
"lighting-description",
"roof-description",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"photo-supply",
"windows-description",
"glazed-type",
"glazed-area",
"multi-glaze-proportion",
# "lighting-description" # Might not need to use this
"low-energy-lighting",
"number-open-fireplaces",
"mainheatcont-description",
"fixed-lighting-outlets-count",
"floor-height",
"floor-level",
"total-floor-area",
"extension-count",
]
CATEGORICAL_COLS = [
"property-type",
"built-form",
"number-habitable-rooms",
"constituency",
"number-heated-rooms",
"mainheat-description",
"hotwater-description",
"main-fuel",
"mechanical-ventilation",
"secondheat-description",
"energy-tariff",
"solar-water-heating-flag",
"windows-description",
"glazed-type",
"glazed-area",
"construction-age-band",
"lighting-description",
"mainheatcont-description",
"floor-level",
]
NUMERICAL_COLUMNS = [
"photo-supply", "multi-glaze-proportion", "low-energy-lighting", "number-open-fireplaces",
"fixed-lighting-outlets-count",
"floor-height",
"total-floor-area",
"extension-count",
]
# For the moment, we store records of the best performing models as a benchmark for future imporvements
BEST_FIT = {
'MAPE': 0.04646530042225876, 'Mean Squared Error': 18.635209563729763,
'Mean Absolute Error': 2.856347408023325, 'R2 Score': 0.800701753826118,
'Explained Variance Score': 0.800701753826118, 'Median Absolute Error': 1.9026758012120197
}
BEST_PREDICT = {
'MAPE': 0.04346083528432316, 'Mean Squared Error': 21.16036509335514,
'Mean Absolute Error': 3.0440540802375833, 'R2 Score': 0.7219965012634312,
'Explained Variance Score': 0.7220620137390414, 'Median Absolute Error': 1.9031967986967828
}
BEST_FINAL = {
'MAPE': 0.04841470773386795, 'Mean Squared Error': 21.323052316630914, 'Mean Absolute Error': 2.988547998636157,
'R2 Score': 0.7633662459299112, 'Explained Variance Score': 0.7633785339028832,
'Median Absolute Error': 1.9487883489495985
}
BUCKET_VARIABLES = [
"number-open-fireplaces", "fixed-lighting-outlets-count", 'extension-count', 'multi-glaze-proportion'
]
def __init__(
self, data: List[Dict],
cleaner: EpcClean,
test_size: Optional[float] = 0.2,
random_state: Optional[int] = None
):
self.df = pd.DataFrame(data)
self.cleaner = cleaner
self.random_state = random_state if random_state is not None else 42
self.test_size = 0.2 if test_size is None else test_size
self.model_data = None
self.train_x = None
self.train_y = None
self.test_x = None
self.test_y = None
self.test_model = None
self.final_model = None
self.fit_error = None
self.predict_error = None
self.final_error = None
self.worst = {
"fit_errors": pd.DataFrame(),
"prediction_errors": pd.DataFrame(),
"fit_x": pd.DataFrame(),
"prediction_x": pd.DataFrame(),
"final_errors": pd.DataFrame(),
"final_x": pd.DataFrame(),
}
self.fit_df = None
self.predict_df = None
self.final_fit_df = None
self.diagnosis = {}
def run(self, plot: bool = False) -> None:
"""
A pipeline method to run all necessary methods in correct order.
:param plot: Boolean to indicate whether to plot the regression
"""
try:
self.create_dataset()
self.fit_model()
if plot:
self.plot_regression(self.fit_df)
except Exception as e:
logger.error("An error occurred during execution.")
logger.error(str(e))
def _merge_with_u_values(
self, model_data: pd.DataFrame, description: str, thermal_transmittance: str
) -> pd.DataFrame:
"""
Utility function to merge u value data with model data
:param model_data: Pandas dataframe which is the main modelling dataset
:param description: Name of the description column for which we're merging u-values onto
:param thermal_transmittance: Name of the thermal transmittance column
:return:
"""
u_values = pd.DataFrame(self.cleaner.cleaned[f"{description}-description"])[
["original_description", thermal_transmittance]].rename(
columns={thermal_transmittance: f"{description}_u_value"}
)
model_data = model_data.merge(
u_values,
how="left",
left_on=f"{description}-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
def _append_cleaned_data(self, model_data: pd.DataFrame) -> pd.DataFrame:
"""
Appends cleaned data into the model data.
:param model_data: Original model data.
:return: Model data with cleaned data appended.
"""
for description in ["walls", "floor", "roof"]:
model_data = self._merge_with_u_values(model_data, description, "thermal_transmittance")
# lighting_proportions added separately as it doesn't use the _merge_with_u_values method
lighting_proportions = pd.DataFrame(self.cleaner.cleaned["lighting-description"])[
["original_description", "low_energy_proportion"]]
model_data = model_data.merge(
lighting_proportions,
how="left",
left_on="lighting-description",
right_on="original_description"
).drop(columns=["original_description"])
return model_data
@staticmethod
def _convert_transaction_type(model_data: pd.DataFrame) -> pd.DataFrame:
"""
Converts transaction type to boolean
:param model_data: Model data with transaction type.
:return: Model data with converted transaction type.
"""
model_data["is_rdsap"] = model_data["transaction-type"] != "new dwelling"
model_data = model_data.drop(columns=["transaction-type"])
return model_data
@staticmethod
def bucket_and_fill(df: pd.DataFrame, column_name: str, n_bins: int = 10) -> pd.DataFrame:
"""
Simple utility function to bucket up features into bins and then fill any missing values with "NO_RECORD"
:param df: Dataframe of features to be binned
:param column_name: Name of the column to be binned
:param n_bins: Number of bins to use
:return: Dataframe with binned column
"""
# Check if the column is numerical
if np.issubdtype(df[column_name].dtype, np.number):
# Create a new categorical column from numerical one by binning the data
df[column_name + "_bucket"] = pd.cut(df[column_name], bins=n_bins).astype(str)
# Replace missing data with "NO_RECORD"
df[column_name + "_bucket"] = df[column_name + "_bucket"].fillna("NO_RECORD")
df[column_name + "_bucket"] = np.where(
df[column_name + "_bucket"] == "nan",
"NO_RECORD",
df[column_name + "_bucket"]
)
return df
def _clean_numericals(self, model_data):
# Try binning numericals
remaining_numericals = [x for x in self.NUMERICAL_COLUMNS if x not in self.BUCKET_VARIABLES]
for col in self.BUCKET_VARIABLES:
model_data[col] = pd.to_numeric(model_data[col], errors='coerce')
# If all values are missing, set all values to 0 - this column will get dropped
if all(pd.isnull(model_data[col])):
model_data[col + "_bucket"] = "NO_RECORD"
continue
model_data = self.bucket_and_fill(model_data, col)
# Replace the data with the binned version
model_data = model_data.drop(columns=self.BUCKET_VARIABLES)
model_data = model_data.rename(
columns=dict(zip([c + "_bucket" for c in self.BUCKET_VARIABLES], self.BUCKET_VARIABLES))
)
# Basic fill the rest of the columns with 0 - currenrtly this provided the best performance
for col in remaining_numericals:
model_data[col] = np.where(
model_data[col] == "", "0", model_data[col]
).astype(float)
return model_data
@staticmethod
def clean_missings(model_data: pd.DataFrame) -> pd.DataFrame:
"""
Fills categorical missing data with sensible values
:param model_data: Original model data.
:return: Model data with cleaned categorical data.
"""
# Cleaning of energy-tariff and construction-age-band hurt prediction performance, indicating there is
# potentially
# a notable difference between a "" missing and a "NO DATA!" missing, worth differentiating
model_data["mechanical-ventilation"] = np.where(
model_data["mechanical-ventilation"] == "", "NO DATA!", model_data["mechanical-ventilation"]
)
model_data["solar-water-heating-flag"] = np.where(
model_data["solar-water-heating-flag"] == "", "N", model_data["solar-water-heating-flag"]
)
model_data["glazed-type"] = np.where(
model_data["glazed-type"] == "", "NO DATA!", model_data["glazed-type"]
)
model_data["glazed-area"] = np.where(
model_data["glazed-area"] == "", "NO DATA!", model_data["glazed-type"]
)
return model_data
def create_dataset(self):
logger.info("Creating modelling dataset")
model_data = self.df[[self.RESPONSE] + self.COMPONENT_FEATURES + self.BASE_FEATURES]
model_data = model_data.reset_index(drop=True)
model_data["idx"] = model_data.index.copy()
# Append on u-values
model_data = self._append_cleaned_data(model_data)
model_data = self.clean_missings(model_data)
# Convert transaction_type
model_data = self._convert_transaction_type(model_data)
# Clean numerical columns
model_data = self._clean_numericals(model_data)
# Take just entries with U-values
# TODO: Rather than doing this, do we want to include the estimated u-values?
# Since this ends up with just 2k entries
model_data = model_data[
~pd.isnull(model_data["walls_u_value"]) &
~pd.isnull(model_data["floor_u_value"]) &
~pd.isnull(model_data["roof_u_value"])
]
exclude_features = [
"walls-description", "floor-description", "roof-description", "transaction-type"
]
features = [
x for x in self.BASE_FEATURES + self.COMPONENT_FEATURES + [
"walls_u_value", "floor_u_value", "roof_u_value", self.RESPONSE, "idx", "is_rdsap"
] if x not in exclude_features
]
model_data = model_data[features]
for col in self.CATEGORICAL_COLS:
model_data[col] = model_data[col].astype('category')
# Convert response
model_data[self.RESPONSE] = model_data[self.RESPONSE].astype(float)
self.model_data = model_data
def make_training_test(self, x):
# Split into training and test
self.train_x, self.test_x, self.train_y, self.test_y = train_test_split(
x.drop(self.RESPONSE, axis=1),
x[self.RESPONSE],
test_size=self.test_size,
random_state=self.random_state
)
@staticmethod
def remove_zero_std_cols(train_x, test_x=None, threshold=1e-3):
"""
Utility function to remove columns that have zero standard deviation from both test and train sets
:param train_x: Training data to remove columns from
:param test_x: If provided, remove the same columns from the test data
:param threshold: float value, if the standard deviation is below this threshold, the column is considered
to have zero standard deviation
:return: Tuple of train_x and test_x (if provided). If test_x is not provided, a null placeholder is returned
"""
# Compute standard deviations
std_devs = train_x.std()
# Find columns with zero or near-zero standard deviation
zero_std_cols = std_devs[std_devs <= threshold].index
# Drop these columns from the training data
train_x = train_x.drop(zero_std_cols, axis=1)
if test_x is not None:
# Ensure the test data has the same columns
test_x = test_x[train_x.columns]
return train_x, test_x
return train_x, None
def fit_model(self):
"""
Main function to fit the model and produce accuracy metrics
"""
x = pd.get_dummies(self.model_data, columns=self.CATEGORICAL_COLS + self.BUCKET_VARIABLES, drop_first=True)
# Convert booleans to integer
for col in x.columns:
if x[col].dtype == bool:
x[col] = x[col].astype(int)
if x[col].dtype == object:
x[col] = x[col].astype(float)
# Create the training and test sets for each run
self.make_training_test(x)
self.train_x, self.test_x = self.remove_zero_std_cols(self.train_x, self.test_x)
logger.info("Detecting multi-collinearity in training dataset")
self.detect_multi_collinearity()
# Add a constant to the independent value
train_x = sm.add_constant(self.train_x)
test_x = sm.add_constant(self.test_x)
train_idx = train_x["idx"].copy()
test_idx = self.test_x["idx"].copy()
train_x = train_x.drop(columns=["idx"])
test_x = test_x.drop(columns=["idx"])
logger.info("Fitting testing model")
# make regression model
model = sm.OLS(self.train_y, train_x)
# fit model and print results
self.test_model = model.fit()
train_predictions = self.test_model.fittedvalues
test_predictions = self.test_model.predict(test_x)
self.fit_error, self.worst["fit_errors"] = self.calculate_regression_metrics(
y_true=self.train_y, y_pred=train_predictions
)
# Predict on new data
self.predict_error, self.worst["prediction_errors"] = self.calculate_regression_metrics(
y_true=self.test_y, y_pred=test_predictions
)
fit_success = self.check_successes(self.fit_error, self.BEST_FIT)
predict_success = self.check_successes(self.predict_error, self.BEST_PREDICT)
self.model_data['fit'] = self.test_model.fittedvalues
# The worst errors over index heavily for flats
self.worst["fit_x"] = self.model_data[self.model_data.index.isin(self.worst["fit_errors"].index)]
self.worst["prediction_x"] = self.model_data[self.model_data.index.isin(self.worst["prediction_errors"].index)]
self.fit_df = pd.DataFrame(
{
"fit": train_predictions,
"actual": self.train_y,
"idx": train_idx
}
).sort_values("actual", ascending=True)
self.predict_df = pd.DataFrame(
{
"predictions": test_predictions,
"actual": self.test_y,
"idx": test_idx
}
)
self.diagnosis = {
"fit_success": fit_success,
"predict_success": predict_success,
"summary": self.test_model.summary()
}
# We're now ready to fit the final model
# For the momeent, the pre-processing at the top of this function merely removes columns, so we
# just need to remove the columns that were removed from the training data from the final model
logger.info("Fitting final model")
x = sm.add_constant(x)
y = x[self.RESPONSE]
x = x[self.train_x.columns]
idx = x["idx"].copy()
x = x.drop(columns=["idx"])
final_model = sm.OLS(y, x)
# fit model and print results
self.final_model = final_model.fit()
final_predictions = self.final_model.fittedvalues
self.final_error, self.worst["final_errors"] = self.calculate_regression_metrics(
y_true=y, y_pred=final_predictions
)
self.final_fit_df = pd.DataFrame(
{
"fit": final_predictions,
"actual": y,
"idx": idx
}
).sort_values("actual", ascending=True)
@staticmethod
def check_successes(experiment_error, best_error):
"""
Simple function to check if the experiment error is better than the best error
:param experiment_error: output of calculate_regression_metrics() on the experiment
:param best_error: Current benchmark best error
:return:
"""
successes = []
for k in experiment_error:
if k in ["Explained Variance Score", "R2 Score"]:
# We want to maximise this so we want experiment error to be higher
successes.append(
{
"measure": k,
"success": experiment_error[k] >= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
continue
successes.append(
{
"measure": k,
"success": experiment_error[k] <= best_error[k],
"difference": abs(experiment_error[k] - best_error[k])
}
)
return pd.DataFrame(successes)
def rf_importance(self, train_x, train_y, test_x, test_y):
"""
Utility function to estimate feature importance using a random forest
This is useful to get a sense of some of the key features which are driving model
performance
:param train_x: Training data covariates to build the importance model on
:param train_y: Training data response to build the importance model on
:param test_x: Test data covariates to build the permutation importance model on
:param test_y: Test data response to build the permutation importance model on
:return: Pandas dataframe of feature importances, ranked by most important to least
"""
rf = RandomForestRegressor(random_state=self.random_state)
rf.fit(train_x, train_y)
# Print the name and importance of each feature
rf_importance_df = []
for feature, importance in zip(train_x.columns, rf.feature_importances_):
rf_importance_df.append(
{
"Feature": feature,
"rf_importance": importance
}
)
rf_importance_df = pd.DataFrame(rf_importance_df)
rf_importance_df = rf_importance_df.sort_values(by="rf_importance", ascending=False)
perm_importance = self.permuation_importance(rf, test_x, test_y)
return rf_importance_df, perm_importance
@staticmethod
def permuation_importance(rf, test_x, test_y):
"""
Simple utility function to produce permutation importance for a given model\
:param rf: Random forest model to calculate permutation importance for
:param test_x: Test covariates to be used for permutation importance
:param test_y: Test response to be used for permutation importance
:return:
"""
perm_importance = permutation_importance(rf, test_x, test_y, scoring='neg_mean_squared_error')
perm_importance_df = pd.DataFrame(
{
"Feature": test_x.columns,
"perm_importance": perm_importance.importances_mean
}
).sort_values(by="perm_importance", ascending=False)
return perm_importance_df
def detect_multi_collinearity(self):
# Get the VIFs for each variable
vifs = pd.DataFrame()
vifs["features"] = self.train_x.columns
vifs["vif"] = [variance_inflation_factor(self.train_x.values, i) for i in tqdm(range(self.train_x.shape[1]))]
# Get the features with the highest VIF
vifs = vifs.sort_values("vif", ascending=False)
# There are some features, we do not want to remove
required_features = [
"walls_u_value", "floor_u_value", "roof_u_value", "idx", "is_rdsap"
]
vifs = vifs[~vifs["features"].isin(required_features)]
drop_vifs = vifs[np.isinf(vifs["vif"])]
# Acceptable drop variables:
# main-fuel_Gas: mains gas
# glazed-type_NO DATA!
# glazed-area_NO DATA!
self.train_x = self.train_x.drop(columns=drop_vifs["features"].values)
self.test_x = self.test_x[self.train_x.columns]
@staticmethod
def plot_regression(df):
# Extract the "fit" and "actual" columns from the dataframe
fit = df['fit']
actual = df['actual']
# Create an array of x-values (assumed to be sequential integers)
x = np.arange(len(df))
# Plot the fit and actual data
plt.plot(x, fit, color='red', label='Fit')
plt.plot(x, actual, color='blue', label='Actual')
# Set labels and title
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Linear Regression - Fit vs Actual')
# Display legend
plt.legend()
# Show the plot
plt.show()
@staticmethod
def calculate_regression_metrics(y_true, y_pred, n=20):
"""
Calculate the 5 most important accuracy metrics for regression.
Args:
y_true (array-like): Array of true target values.
y_pred (array-like): Array of predicted target values.
Returns:
dict: Dictionary containing the calculated metrics.
"""
metrics = {
'MAPE': mean_absolute_percentage_error(y_true, y_pred),
'Mean Squared Error': mean_squared_error(y_true, y_pred),
'Mean Absolute Error': mean_absolute_error(y_true, y_pred),
'R2 Score': r2_score(y_true, y_pred),
'Explained Variance Score': explained_variance_score(y_true, y_pred),
'Median Absolute Error': median_absolute_error(y_true, y_pred)
}
errors = pd.DataFrame()
errors['Fit'] = y_true
errors['Actual'] = y_pred
errors['Residual'] = errors['Actual'] - errors['Fit']
errors['Absolute Residual'] = np.abs(errors['Residual'])
worst_errors = errors.nlargest(n, 'Absolute Residual')
return metrics, worst_errors

View file

@ -1,207 +0,0 @@
import pickle
import pandas as pd
import numpy as np
from model_data.EpcClean import EpcClean
class UvalueEstimations:
def __init__(self, data: list):
"""
Initialize the UvalueEstimations class.
:param data: The input data as a list of dictionaries, to be converted to a dataframe
"""
self.data = pd.DataFrame(data)
self.walls = None
self.walls_decile_data = {}
self.roofs = None
self.floors = None
self.floors_decile_data = {}
def get_estimates(self, cleaner: EpcClean):
"""
Calculate U-value estimates for walls, roofs, and floors.
:param cleaner: An instance of the EpcClean class used for cleaning data.
"""
self.set_walls(cleaner)
self.set_roofs(cleaner)
self.set_floors(cleaner)
def set_walls(self, cleaner: EpcClean):
"""
Set U-value estimates for walls.
:param cleaner: An instance of the EpcClean class used for cleaning data.
"""
walls_columns = [
"local-authority", "property-type", "walls-description", "walls-energy-eff", "walls-env-eff", "built-form",
"total-floor-area", "number-habitable-rooms", "number-heated-rooms"
]
walls_df = self.data[self.data["walls-description"].str.contains("Average thermal transmittance")]
# Take just the columns we want
walls_df = walls_df[walls_columns]
walls_df["total-floor-area"] = walls_df["total-floor-area"].astype(float)
walls_df, decile_labels, decile_boundaries = self.classify_into_deciles(walls_df, "total-floor-area")
# We now get the U-values
walls_df = walls_df.merge(
pd.DataFrame(cleaner.cleaned['walls-description'])[["original_description", "thermal_transmittance"]],
how="left",
right_on="original_description",
left_on="walls-description"
)
u_value_summary = walls_df.groupby(
[
"local-authority",
"property-type",
"walls-energy-eff",
"walls-env-eff",
"built-form",
"number-habitable-rooms",
"number-heated-rooms",
"total-floor-area_group"
],
observed=True
).agg({"thermal_transmittance": ["median", "size"]}).reset_index()
u_value_summary.columns = [
"local-authority",
"property-type",
"walls-energy-eff",
"walls-env-eff",
"built-form",
"number-habitable-rooms",
"number-heated-rooms",
"total-floor-area_group",
"median_thermal_transmittance",
"n_samples"
]
self.walls = u_value_summary
self.walls_decile_data = {
"decile_labels": decile_labels,
"decile_boundaries": decile_boundaries
}
def set_roofs(self, cleaner: EpcClean):
"""
Set U-value estimates for roofs.
:param cleaner: An instance of the EpcClean class used for cleaning data.
"""
pass
def set_floors(self, cleaner: EpcClean):
"""
Set U-value estimates for floors.
:param cleaner: An instance of the EpcClean class used for cleaning data.
"""
floors_columns = [
"local-authority", "property-type", "floor-description", "floor-energy-eff", "floor-env-eff",
"built-form",
"total-floor-area", "number-habitable-rooms", "number-heated-rooms"
]
floors_df = self.data[self.data["floor-description"].str.contains("Average thermal transmittance")]
# Take just the columns we want
floors_df = floors_df[floors_columns]
floors_df["total-floor-area"] = floors_df["total-floor-area"].astype(float)
floors_df, decile_labels, decile_boundaries = self.classify_into_deciles(floors_df, "total-floor-area")
# We now get the U-values
floors_df = floors_df.merge(
pd.DataFrame(cleaner.cleaned['floor-description'])[["original_description", "thermal_transmittance"]],
how="left",
right_on="original_description",
left_on="floor-description"
)
u_value_summary = floors_df.groupby(
[
"local-authority",
"property-type",
"floor-energy-eff",
"floor-env-eff",
"built-form",
"number-habitable-rooms",
"number-heated-rooms",
"total-floor-area_group"
],
observed=True
).agg({"thermal_transmittance": ["median", "size"]}).reset_index()
u_value_summary.columns = [
"local-authority",
"property-type",
"floor-energy-eff",
"floor-env-eff",
"built-form",
"number-habitable-rooms",
"number-heated-rooms",
"total-floor-area_group",
"median_thermal_transmittance",
"n_samples"
]
self.floors = u_value_summary
self.floors_decile_data = {
"decile_labels": decile_labels,
"decile_boundaries": decile_boundaries
}
@staticmethod
def classify_into_deciles(df: pd.DataFrame, column: str) -> (pd.DataFrame, list, list):
"""
Break a column in a Pandas DataFrame into deciles and classify new values into the existing deciles.
:param df: The input Pandas DataFrame.
:param column: The column name to break into deciles.
:return: A tuple containing:
- The DataFrame with the decile group column.
- The list of decile labels.
- The list of decile boundaries.
"""
# Calculate decile boundaries
decile_boundaries = np.percentile(df[column], np.arange(0, 101, 10))
# Create decile labels
decile_labels = [f"Decile {i + 1}" for i in range(10)]
# Assign decile labels to existing values
df[column + "_group"] = pd.cut(df[column], bins=decile_boundaries, labels=decile_labels,
include_lowest=True)
return df, decile_labels, decile_boundaries
@staticmethod
def classify_decile_newvalues(decile_boundaries, decile_labels, new_values: list) -> list:
"""
Classify new values into existing deciles based on decile definitions.
:param decile_boundaries: The list of decile boundaries.
:param decile_labels: The list of decile labels.
:param new_values: A list of new values to classify.
:return: The classifications for the new values as a list.
"""
# Classify new values based on decile definitions
classifications = pd.cut(new_values, bins=decile_boundaries, labels=decile_labels, include_lowest=True)
return classifications.tolist()
def _save(self, filename):
"""
Useful utility function to store this object, which is particularly handy for unit testing
:return:
"""
with open(filename, 'wb') as f:
pickle.dump(self, f)

View file

@ -1,40 +0,0 @@
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
def create_heatmap_plots(data, response_var, pivot_var1, pivot_var2, order1=None, order2=None):
"""
Create a heatmap plot based on a list of data and given variables.
:param data: List of dictionaries, input data.
:param response_var: String, response variable to be plotted.
:param pivot_var1: String, first pivot variable to be used in the plot.
:param pivot_var2: String, second pivot variable to be used in the plot.
:param order1: List, the order of categories for pivot_var1. Optional.
:param order2: List, the order of categories for pivot_var2. Optional.
Returns:
None. Displays the generated plot.
"""
# Create a DataFrame from your list of dictionaries
df = pd.DataFrame(data)
# Convert the response variable column to float type if it's not already
df[response_var] = df[response_var].astype(float)
# Create a pivot table
pivot = df.pivot_table(index=pivot_var1, columns=pivot_var2, values=response_var)
# If an order is provided, reorder the pivot table
if order1 is not None:
pivot = pivot.reindex(order1)
if order2 is not None:
pivot = pivot[order2]
# Plot the heatmap
plt.figure(figsize=(10, 6))
sns.heatmap(pivot, annot=True, fmt=".2f", cmap='coolwarm')
plt.title(f"Heatmap of {response_var} by {pivot_var1} and {pivot_var2}")
plt.show()

View file

@ -1,38 +0,0 @@
FROM python:3.10.12-slim as release
ARG USER=nroot
ARG UID=1000
ARG GID=100
# Install patches
RUN apt-get update && apt-get upgrade -y \
&& apt-get install libgomp1 -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists
# Install python packages
COPY ../requirements/predictions/predictions.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Create and configure non-root user
RUN useradd \
--create-home \
--uid ${UID} \
--gid ${GID} \
--shell /bin/bash \
${USER}
# Copy the project code to user home directory
COPY --chown=${UID}:${GID} ./core /home/simulation_system/core
COPY --chown=${UID}:${GID} ./MLModel /home/simulation_system/MLModel
COPY --chown=${UID}:${GID} ./predictions.py /home/simulation_system/predictions.py
# FOR TESTING
# COPY --chown=${UID}:${GID} ./model_build_data /home/simulation_system/model_build_data
# Switch user
USER ${USER}
WORKDIR /home/simulation_system
# Run the python command
CMD ["python3", "predictions.py", "--data-path", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"]

View file

@ -1,22 +0,0 @@
FROM public.ecr.aws/lambda/python:3.10
# Set the working directory
WORKDIR ${LAMBDA_TASK_ROOT}/simulation_system
ENV PYTHONPATH "${PYTHONPATH}:${LAMBDA_TASK_ROOT}/simulation_system"
# Install necessary build tools - required to test locally
RUN yum install -y gcc python3-devel
# Install python packages
COPY requirements/predictions/predictions.txt ./requirements.txt
RUN pip install --no-cache-dir -r ./requirements.txt
# Copy the project code to the working directory
COPY ./core ./core
COPY ./MLModel ./MLModel
COPY ./predictions.py ./predictions.py
COPY ./handlers/predictions_app.py ./predictions_app.py
COPY ./__init__.py ./__init__.py
# Run off a lambda trigger
CMD [ "simulation_system.predictions_app.handler" ]

View file

@ -1,38 +0,0 @@
FROM python:3.10.12-slim as release
ARG USER=nroot
ARG UID=1000
ARG GID=100
# Install patches
RUN apt-get update && apt-get upgrade -y \
&& apt-get install libgomp1 -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists
# Install python packages
COPY ../requirements/training/training.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Create and configure non-root user
RUN useradd \
--create-home \
--uid ${UID} \
--gid ${GID} \
--shell /bin/bash \
${USER}
# Copy the project code to user home directory
COPY --chown=${UID}:${GID} ./core /home/simulation_system/core
COPY --chown=${UID}:${GID} ./MLModel /home/simulation_system/MLModel
COPY --chown=${UID}:${GID} ./training.py /home/simulation_system/training.py
# FOR TESTING
# COPY --chown=${UID}:${GID} ./model_build_data /home/simulation_system/model_build_data
# Switch user
USER ${USER}
WORKDIR /home/simulation_system
# Run the python command
CMD ["python3", "training.py", "--train-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"]

View file

@ -1,14 +0,0 @@
FROM public.ecr.aws/lambda/python:3.10
# Install python packages
COPY ../requirements/training/training.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy the project code to user home directory
COPY ./core ${LAMBDA_TASK_ROOT}/simulation_system/core
COPY ./MLModel ${LAMBDA_TASK_ROOT}/simulation_system/MLModel
COPY ./training.py ${LAMBDA_TASK_ROOT}/simulation_system/training.py
COPY ./handlers/training_app.py ${LAMBDA_TASK_ROOT}/simulation_system/training_app.py
# Run off a lambda trigger
CMD [ "training_app.handler" ]

View file

@ -1,66 +0,0 @@
"""
BaseMLModel class
This is the base protocol:
- Any implementation will be its own seperate file
Key tasks:
- Template Model class for different model types
- Save model
- Load Model
- Generate Inference
"""
from numpy import ndarray
from pathlib import Path
from typing import Protocol, NamedTuple, Any
import pandas as pd
from training import S3FSClient
class MLModel(Protocol):
"""
Base ML Model protocol
"""
def load_model(self, filepath: Path) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
def save_model(
self, output_filepath: Path, s3_client: S3FSClient | None = None
) -> None:
"""
Providing a path, this function will save the model to be used.
"""
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:
"""
For the given data and hyperparameters (specified to the model), a model is trained
"""
def generate_predictions(self, data: pd.DataFrame) -> ndarray[Any, Any] | None:
"""
For the given dataframe, model is loaded and predictions are generated
"""
def model_evaluation(
self,
validation_data: pd.DataFrame,
target_column: str,
metrics_location: Path | None = None,
) -> pd.DataFrame | None:
"""
For any validation data, a set of predictions and metrics are return
"""
def optimise_model_for_deployment(self):
"""
Perfomance post processing on Model to ensure ready for deployment
"""
def model_metadata(self) -> dict | None:
"""
Extract out model metadata as dictionary
"""

View file

@ -1,215 +0,0 @@
"""
Different implementations of the MLModel Protocol
Uses the BaseMLModel protocol
Key tasks:
- Template Model class for different model types
- Save model
- Load Model
- Generate Inference
"""
import os
from typing import Any
from pathlib import Path
import pandas as pd
from autogluon.tabular import TabularDataset, TabularPredictor
from core.Logger import logger
from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME
from core.CloudClient import BotoClient
AUTOGLUON_HYPERPARAMETERS = [
"problem_type",
"eval_metric",
"time_limit",
"presets",
"excluded_model_types",
]
def model_factory(model_type: str, hyperparameters: dict) -> dict:
"""
Use factory pattern to register the different ML implementations
"""
model_types = {
"autogluon": {
"model": AutogluonModel,
"naming_attributes": f"{hyperparameters['presets']}-{hyperparameters['time_limit']}",
},
}
return model_types[model_type]
class AutogluonModel:
"""
Autogluon model that implements the MLModel Protocol
"""
def __init__(self, output_filepath: Path | None = None) -> None:
self.model = None
self.output_filepath = output_filepath
self.predictions = None
def load_model(
self,
filepath: str | Path,
client: BotoClient,
model_folder: str = "local_model",
) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
filepath = str(filepath)
if client.client is None:
logger.info("In local development mode - no need for s3 client")
self.model = TabularPredictor.load(path=filepath)
else:
logger.info(f"Loading model from s3 with filepath: %s and model_folder: %s" % (filepath, model_folder))
client.download_model(filepath=filepath, model_folder=model_folder)
self.model = TabularPredictor.load(path=str(Path(model_folder) / filepath))
def save_model(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if client.client is None:
logger.info("In local development mode - no need for s3 client")
logger.info("Using AutoGluon Model - Model saving already occured")
else:
logger.info(f"Saving model into s3")
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:
"""
For the given data and hyperparameters, a model is trained
"""
if self.output_filepath is None:
logger.error("Please specify a output_filepath in order to train a model")
exit(1)
if set(AUTOGLUON_HYPERPARAMETERS) != set(hyperparameters.keys()):
print(
"Hyperparameters (dict) is incorrectly defined - please check what hyperparameters are required"
)
exit(1)
AGdata = TabularDataset(data=data)
self.model = TabularPredictor(
label=target_column,
path=self.output_filepath,
problem_type=hyperparameters["problem_type"],
eval_metric=hyperparameters["eval_metric"],
).fit(
AGdata,
time_limit=hyperparameters["time_limit"],
presets=hyperparameters["presets"],
excluded_model_types=hyperparameters["excluded_model_types"],
)
def generate_predictions(self, data: pd.DataFrame) -> pd.Series:
"""
For the given dataframe, model is loaded and predictions are generated
"""
if self.model is None:
print("No model loaded/ trained")
exit(1)
predictions = pd.Series(self.model.predict(data))
return predictions
def model_evaluation(
self,
validation_data: pd.DataFrame,
target_column: str,
metrics: Metrics,
metrics_location: Path | None = None,
metric_filename: str = METRIC_FILENAME,
) -> pd.DataFrame:
"""
For any validation data, a set of predictions and metrics are return
"""
if metrics_location is None:
logger.warning("Metrics will be outputted to current folder")
metrics_location = Path()
if self.model is None:
logger.error("No model loaded/ trained - Unable to generate evaluation")
exit(1)
# Generate prediction, load metrics suite, generate metrics betweeen the two
predictions = self.generate_predictions(validation_data)
performance = metrics.generate_metric_suite(
actuals=validation_data[target_column], predictions=predictions
)
logger.info("Prediction used for evaluations are saved in self.prediction")
self.predictions = predictions
logger.info("Saving metric file as metric.csv")
metrics_location.mkdir(exist_ok=True)
metrics_df = pd.DataFrame([performance])
metrics_df.to_csv(metrics_location / metric_filename)
markdown_filename = metric_filename.split(".")[0] + ".md"
metrics_df.to_markdown(metrics_location / markdown_filename)
return metrics_df
def optimise_model_for_deployment(
self, deployment_path: Path | str | None = None
) -> Any:
"""
We can optimise the deployment for a autogluon model
"""
if self.model is None:
raise ValueError("No model to optimise for deployment")
if deployment_path is None:
raise ValueError("Deployment path required")
deployment_path = str(deployment_path)
# This will return a string path of the location
return self.model.clone_for_deployment(deployment_path)
def model_metadata(self) -> dict[str, Any]:
"""
For Autogluon model, use the inbuilt model info method
"""
if self.model is None:
logger.error("No Model loaded/ trained")
exit(1)
return self.model.info()

View file

@ -1,33 +0,0 @@
export PYENV_ROOT=$(HOME)/.pyenv
export PATH := $(PYENV_ROOT)/bin:$(PATH)
PYTHON_VERSION ?= 3.10.12
.PHONY: init
init: build docker
.PHONY: training_env
env:
pyenv install $(PYTHON_VERSION) || echo "Proceeding..." # || is to swallow non-zero response if python version already is installed
pyenv global $(PYTHON_VERSION)
python3 -m venv .training_env
. .training_env/bin/activate
pip install --upgrade pip
pip install -r requirements/training/training-dev.txt && pre-commit install
echo " --- TO ACTIVATE THE ENVIRONMENT --- "
echo "Run source .training_env/bin/activate to activate the virtual environment"
.PHONY: check-all
check-all: pre-commit run -a
.PHONY: build
build:
docker-compose build
.PHONY: docker
docker:
docker-compose up -d
.PHONY: down
down:
docker compose down

View file

@ -1,77 +0,0 @@
# Simulation System
Starter Readme:
Steps for pipeline:
- (WIP) Set up the training development environment
- Change directory to this folder (simulation_system)
- Run the following command `make env PYTHON_VERSION=3.10.12`
- This will install the specified python version using `pyenv` and select this version as the global python version
- It will install all training packages as specified in the training-dev.txt requirements file, including the pre-commit hooks
- Run `source .training_env/bin/activate` to use this environment
- (WIP) Use Makefile to start up mock up s3 service
- By running `make init`, this will run the `docker-compose build` and `docker-compose up -d`, which spins up a S3 service
- This docker compose is running in detached mode `-d`, so will no output anything to the terminal
- Once the Minio service is run, you can run the `training.py` file to start a model build process
- This will output a model, for a given target column, and add model name composed of some of the hyperparameters
- An example of running this file is:
- `python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet`
- Outputs of the pipeline are:
- A model directory bucket
- A target variable prefix (i.e. RDSAP_CHANGE or HEAT_DEMAND_CHANGE)
- A model type prefix (i.e. autogluon, tensorflow etc)
- A model name prefix (i.e. rdsap_change_medium_quality_60_TIMESTAMP)
- This model name is made up of target variable, quality, time spent training and timestamp
- Within this prefix, there are three folders:
- model
- The model path that can be loaded in the codebase
- deployment
- The optimised model that can be deployed (may or maynot need this)
- metrics
- The metrics generatted from the model (may or may not need this as this can be contained in the registry)
- Once model build is finished, you can run the `prediction.py` file to generate prediction
- By default, the prediction pipeline will select the best model based on **mean absolute error** from the model registry
- This can be overwritten by specifying a model_path, which will load an alternative model
- There are two ways of getting data into the pipeline:
- Using the `--data` argument:
- This is a JSON string which can be passed as `python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'`
- Note the single and double quotation marks, as this affects the ingestion
- Using the `--data-path` argument:
- This can be a filepath (Can imagine that we might want to pull data from S3/ DB)
- An example of running the file is:
- `python3 predictions.py --data-path ../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet`
- Outputs of the pipeline are:
- prediction bucket
- a Target variables prefix (i.e. RDSAP_CHANGE or HEAT_DEMAND_CHANGE)
- a uprn prefix (i.e 0123456789)
- a `prediction.json`
- a `metadata.json`
- This is all the metadata from the model (can change this if needed)
- NOTE: If you wish to change any settings, these are currently all in the `Settings.py` file
- It will be separated out eventually but for now, it works to keep track of anything that we might want to respecify.
- I.e. the hyperparameters for models are in here but will move into a separate configuration file
# TODO:
- Structure/ MLOps:
- Add configuration files (dev, staging, prod), including hyperparamters
- Add precommit hooks (linters, branch names, etc)
- Sphinx documentation
- Sort out local mock up services
- Sort out Model Registry
- Sort out Data version control
- pre-commit hooks:
- The types of hooks that we want (safety, bandit, iso8 etc)
- The customisations we require
- Add sphinx documentation
- Data Science:
- Implement a metrics class, to hold all metric
- Rebuild metrics script (Could be a one off but good to have)
- Determine metrics
- Implement and test custom model (Tensorflow Decision Trees etc)
- Orchestration:
- Lambda handler for the pipeline

View file

@ -1,58 +0,0 @@
version: '3'
services:
minio:
image: minio/minio:RELEASE.2022-05-26T05-48-41Z
ports:
- "9000:9000"
- "9001:9001"
volumes:
- ./data:/data
environment:
MINIO_ROOT_USER: &MINIO_USER admin
MINIO_ROOT_PASSWORD: &MINIO_PASS password
command: server --console-address ":9001" /data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
simulation_system_training:
build:
context: ./
dockerfile: ./Dockerfiles/Dockerfile.training
image: simulation_system_training
environment:
RUNTIME_ENVIRONMENT: local-mock
ENDPOINT_URL: http://minio:9000/
AWS_ACCESS_KEY_ID: *MINIO_USER
AWS_SECRET_ACCESS_KEY: *MINIO_PASS
tty: true
depends_on:
minio:
condition: service_healthy
# command:
# ["bash"]
# simulation_system_prediction:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.prediction
# image: simulation_system_prediction
# environment:
# RUNTIME_ENVIRONMENT: local-mock
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# simulation_system_training:
# condition: service_completed_successfully
# command:
# ["bash"]
# volumes:
# minio_storage: {}

View file

@ -1,118 +0,0 @@
from pathlib import Path
from core.Settings import (
RDSAP_RESPONSE,
FLOOR_LEVEL_MAP,
BUILT_FORM_REMAP,
EARLIEST_EPC_DATE,
FULLY_GLAZED_DESCRIPTIONS,
FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES
)
from model_data.BaseUtility import Definitions
from tqdm import tqdm
import pandas as pd
import numpy as np
from autogluon.tabular import TabularDataset, TabularPredictor
RANDOM_SEED = 0
DATA_DIRECTORY = Path(__file__).parent / 'data' / 'all-domestic-certificates'
FLOAT_COLUMNS = [
'NUMBER_OPEN_FIREPLACES',
'EXTENSION_COUNT',
'TOTAL_FLOOR_AREA',
'PHOTO_SUPPLY',
'FIXED_LIGHTING_OUTLETS_COUNT',
'FLOOR_HEIGHT',
'NUMBER_HABITABLE_ROOMS',
'LOW_ENERGY_LIGHTING',
'MULTI_GLAZE_PROPORTION',
'NUMBER_HEATED_ROOMS'
]
def create_raw_data():
"""
Extract all information to do a simple predictor for RDSAP
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# directories = directories[0:10]
dfs = []
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
# Remove any bad uprns and ignore old/bad data
df = df[~pd.isnull(df["UPRN"])]
df = df[df["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
df = df[df["TRANSACTION_TYPE"] != "new dwelling"]
df = df[~df["FLOOR_LEVEL"].isin(["top floor", "mid floor"])]
# Change multi glaze proportion
no_multi_glaze_proportion_index = pd.isnull(df["MULTI_GLAZE_PROPORTION"]) & (
df["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
df.loc[no_multi_glaze_proportion_index, 'MULTI_GLAZE_PROPORTION'] = 100
# Recast
df["UPRN"] = df["UPRN"].astype(int).astype(str)
df['MAIN_HEATING_CONTROLS'] = df['MAIN_HEATING_CONTROLS'].astype(float)
# Sort Data
df = df.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Map all anomaly values to None
data_anomaly_map = dict(zip(Definitions.DATA_ANOMALY_MATCHES, [None] * len(Definitions.DATA_ANOMALY_MATCHES)))
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
df = df.replace(data_anomaly_map)
df = df.replace(np.NAN, None)
# Remap certain columns
df['FLOOR_LEVEL'] = df['FLOOR_LEVEL'].replace(FLOOR_LEVEL_MAP)
df['BUILT_FROM'] = df['BUILT_FORM'].replace(BUILT_FORM_REMAP)
# Keep only possible modelling columns
df = df[[RDSAP_RESPONSE] + list(set(FIXED_FEATURES + LATEST_FIELD + COMPONENT_FEATURES))]
# Reduce memory usage
# df.memory_usage()
# df.dtypes
df[RDSAP_RESPONSE] = pd.to_numeric(df[RDSAP_RESPONSE], downcast='unsigned')
df[FLOAT_COLUMNS] = df[FLOAT_COLUMNS].apply(pd.to_numeric, downcast='float')
dfs.append(df)
data = pd.concat(dfs)
data.to_parquet('./energy_predictor_data.parquet')
cleaned_data = data.dropna()
# GIves you primarily flats
cleaned_data.to_parquet('./energy_predictor_cleaned_data.parquet')
def main():
data = TabularDataset(data='./model_build_data/energy_data/cleaned_data/train_validation_data.parquet')
subsample_size = round(len(data) / 100)
data = data.sample(subsample_size, random_state=RANDOM_SEED)
predictor_RDSAP = TabularPredictor(
label=RDSAP_RESPONSE,
path="agModels-predictENERGY",
problem_type="regression",
eval_metric='mean_absolute_error'
).fit(data, time_limit=800, presets='high_quality', excluded_model_types=['KNN', 'CAT'])
test_data = TabularDataset('./model_build_data/energy_data/cleaned_data/test_data.parquet')
performance = predictor_RDSAP.evaluate(test_data)
predictions = predictor_RDSAP.predict(test_data)
predictor_RDSAP.feature_importance(test_data)
if __name__ == "__main__":
main()

View file

@ -1,188 +0,0 @@
"""
Script to load MLModel class and generate predictions
"""
import os
import json
import argparse
from pathlib import Path
import pandas as pd
from typing import Optional
from datetime import datetime
from MLModel.Models import AutogluonModel
from core.Logger import logger
from core.DataLoader import dataloader_factory
from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.RegistryHandler import RegistryHandler
from core.Settings import (
BASE_REGISTRY_PATH,
REGISTRY_FILE,
PREDICTION_LOCATION,
PREDICTION_FILE,
METADATA_FILE,
TIMESTAMP_FORMAT,
MODEL_DIRECTORY,
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
# DataFrame)
# TEST_DATA = DataLoader.load(filepath="../simulation_system/model_build_data/change_data/rdsap_full/test_data.parquet")
# DATA = TEST_DATA.sample(1)
# For testing in dev s3
# Data path can be passed as so:
# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--target-column",
type=str,
help="The response variable you are predicting for",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
parser.add_argument(
"--model-path",
type=str,
help="If you wish to use a specific model, specify the model path here",
)
parser.add_argument("--data", type=str, help="Json data for predictions")
parser.add_argument(
"--data-path", type=str, help="Location of Parquet dataset to load for training"
)
args = parser.parse_args()
return args
def prediction(
target_column: str = "RDSAP_CHANGE",
model_path: str | None = None,
data: Optional[pd.DataFrame | str] = None,
data_path: Optional[str] = None,
):
"""
Main pipeline function
"""
if model_path is not None:
logger.info("User specified a model to load - ignoring registry")
model_location = model_path
model_type = model_path
model_name = model_path
else:
# TODO: Think about where registry will sit/ type
logger.info("Loading best model from registry")
metrics = Metrics()
registry_handler = RegistryHandler()
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, client=CLIENT, metrics=metrics
)
# registry_df = pd.read_csv(registry_path)
best_model_df = registry_df[registry_df["best_model"]]
model_location = best_model_df["model_location"].values[0]
model_type = best_model_df["model_type"].values[0]
model_name = best_model_df["model_name"].values[0]
logger.info("--- Model Info: ---")
logger.info(f"Model type: {model_type}")
logger.info(f"Model name: {model_name}")
logger.info(f"Model location: {model_location}")
logger.info("--- Loading Data ---")
if data is None and data_path is None:
logger.error("No Data/Data Path passed")
exit(1)
if data_path and data is None:
logger.info("Loading data from provided path")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None)
if data is None:
raise ValueError("No data loaded")
else:
logger.info("Using data provided")
data = json.loads(str(data))
data = pd.DataFrame([data])
logger.info("--- Loading Model ---")
if model_type == "autogluon":
logger.info("Using an Autogluon model")
model = AutogluonModel()
else:
raise ValueError("No other model currently")
# In lambda, only the /tmp folder is writable
model_folder = "/tmp" if RUNTIME_ENVIRONMENT in ["dev", "prod"] else "local_model"
model.load_model(filepath=model_location, client=CLIENT, model_folder=model_folder)
logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data)
# logger.info(pd.concat([data["id"], prediction], axis=1))
return pd.concat([data["id"], prediction], axis=1)
# Save prediction some where?
# prediction.to_csv("s3?")
# TODO: Check how we want to structure outputs
# For now, just categorise by uprn and timestamp
# Assume one uprn coming in for now
# uprn = data.index.values[0]
# # Saving prediction local for now
# # TODO: change uprn to TARGET_ID, put in setting
# logger.info("--- Outputting prediction and metadata --- ")
# output_base = PREDICTION_LOCATION / target_column / uprn / TIMESTAMP
# output_base.mkdir(parents=True, exist_ok=True)
# json_prediction = prediction.to_json(output_base / PREDICTION_FILE)
# prediction_metadata = {
# "model_type": model_type,
# "model_name": model_name,
# "model_location": model_location,
# "model_settings": model.model_metadata(),
# }
# pd.DataFrame([prediction_metadata]).to_json(output_base / METADATA_FILE)
return json_prediction
if __name__ == "__main__":
args = ingest_arguments()
# Data can be passed in as JSON string: python3 predictions.py --data '{"TOTAL_FLOOR_AREA": 1}'
# Data path can be passed as so: python3 predictions.py --data-path
# ./model_build_data/change_data/rdsap_full/test_data.parquet
prediction(
target_column=args.target_column,
model_path=args.model_path,
data=args.data,
data_path=args.data_path,
)

View file

@ -1,118 +0,0 @@
"""
Script to regenerate metrics for all the models in the model registry
Key task:
- Load model registry
- For each model in the registry, generate the metrics (Key questions here is what if the test data changes)
- Save the new metrics out to s3 bucket
"""
import os
import argparse
from s3pathlib import S3Path
import pandas as pd
from tqdm import tqdm
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.Settings import (
OPTIMISE_METRIC,
MODEL_DIRECTORY,
REGISTRY_FILE,
BEST_MODEL_COLUMN_NAME,
)
from MLModel.Models import model_factory
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--test-filepath",
type=str,
help="Location of Parquet dataset to load for testing",
required=True,
)
parser.add_argument(
"--target-column",
type=str,
help="The response variable",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
args = parser.parse_args()
return args
def regenerate_metrics(test_filepath: str, target_column: str) -> None:
"""
Recreate all metrics for all models
"""
logger.info("--- Loading test data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
test_df = dataloader.load(filepath=test_filepath)
logger.info("--- Loading model registry ---")
logger.info(f"Loading registry for {target_column} models")
registry_df = dataloader.load(
filepath=S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri
)
logger.info("Extract non-metric columns")
registry_df = registry_df[["model_type", "model_name", "model_location"]]
logger.info("--- Regenerating metrics ---")
metric_suite = Metrics()
metrics_df = pd.DataFrame(columns=metric_suite.list_metric_functions())
for _, row in tqdm(registry_df.iterrows()):
logger.info(f"--- Loading Model ({row['model_name']}) ---")
model = model_factory(model_type=row["model_type"])()
model.load_model(filepath=row["model_location"])
metrics = metric_suite.generate_metric_suite(
model=model, data=test_df, target_column=target_column
)
# Add metrics row by row
metrics_df = pd.concat([metrics_df, metrics], axis=0).reset_index(drop=True)
# Add metrics df to registry df side by side
registry_df = pd.concat([registry_df, metrics_df], axis=1)
logger.info(f"--- Sorting by Optimise Metric ({OPTIMISE_METRIC}) ---")
registry_df = sort_by_metric(
data=registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving model metrics ---")
registry_df.to_csv(S3Path(MODEL_DIRECTORY, target_column, REGISTRY_FILE).uri)
if __name__ == "__main__":
logger.info("---Begin Pipeline to regenerate metrics---")
logger.info("---Ingest Arguments---")
args = ingest_arguments()
regenerate_metrics(
test_filepath=args.test_filepath, target_column=args.target_column
)

View file

@ -1,208 +0,0 @@
absl-py==1.4.0
accelerate==0.16.0
aiohttp==3.8.5
aiohttp-cors==0.7.0
aiosignal==1.3.1
aliyun-python-sdk-core==2.13.36
aliyun-python-sdk-kms==2.16.1
antlr4-python3-runtime==4.9.3
asttokens==2.2.1
async-timeout==4.0.3
attrs==23.1.0
autogluon==0.8.2
autogluon.common==0.8.2
autogluon.core==0.8.2
autogluon.features==0.8.2
autogluon.multimodal==0.8.2
autogluon.tabular==0.8.2
autogluon.timeseries==0.8.2
backcall==0.2.0
beautifulsoup4==4.12.2
blessed==1.20.0
blis==0.7.10
boto3==1.28.25
botocore==1.31.25
cachetools==5.3.1
catalogue==2.0.9
catboost==1.2
certifi==2023.7.22
cffi==1.15.1
charset-normalizer==3.2.0
click==8.1.6
cloudpickle==2.2.1
colorama==0.4.6
colorful==0.5.5
comm==0.1.4
confection==0.1.1
contourpy==1.1.0
crcmod==1.7
cryptography==41.0.3
cycler==0.11.0
cymem==2.0.7
datasets==2.14.4
debugpy==1.6.7
decorator==5.1.1
defusedxml==0.7.1
dill==0.3.7
distlib==0.3.7
evaluate==0.3.0
executing==1.2.0
fastai==2.7.12
fastcore==1.5.29
fastdownload==0.0.7
fastprogress==1.0.3
filelock==3.12.2
fonttools==4.42.0
frozenlist==1.4.0
fsspec==2023.6.0
future==0.18.3
gdown==4.7.1
gluonts==0.13.3
google-api-core==2.11.1
google-auth==2.22.0
google-auth-oauthlib==1.0.0
googleapis-common-protos==1.60.0
gpustat==1.1
graphviz==0.20.1
grpcio==1.50.0
huggingface-hub==0.16.4
hyperopt==0.2.7
idna==3.4
imageio==2.31.1
ipykernel==6.25.1
ipython==8.14.0
jedi==0.19.0
Jinja2==3.1.2
jmespath==0.10.0
joblib==1.3.2
jsonschema==4.17.3
jupyter_client==8.3.0
jupyter_core==5.3.1
kiwisolver==1.4.4
langcodes==3.3.0
lightgbm==3.3.5
lightning-utilities==0.9.0
llvmlite==0.40.1
Markdown==3.4.4
markdown-it-py==3.0.0
MarkupSafe==2.1.3
matplotlib==3.7.2
matplotlib-inline==0.1.6
mdurl==0.1.2
mlforecast==0.7.3
model-index==0.1.11
msgpack==1.0.5
multidict==6.0.4
multiprocess==0.70.15
murmurhash==1.0.9
nest-asyncio==1.5.7
networkx==3.1
nlpaug==1.1.11
nltk==3.8.1
nptyping==2.4.1
numba==0.57.1
numpy==1.24.4
nvidia-ml-py==12.535.77
oauthlib==3.2.2
omegaconf==2.2.3
opencensus==0.11.2
opencensus-context==0.1.3
opendatalab==0.0.10
openmim==0.3.9
openxlab==0.0.17
ordered-set==4.1.0
oss2==2.17.0
packaging==23.1
pandas==1.5.3
parso==0.8.3
pathy==0.10.2
patsy==0.5.3
pexpect==4.8.0
pickleshare==0.7.5
Pillow==9.5.0
platformdirs==3.10.0
plotly==5.16.0
preshed==3.0.8
prometheus-client==0.17.1
prompt-toolkit==3.0.39
protobuf==3.20.2
psutil==5.9.5
ptyprocess==0.7.0
pure-eval==0.2.2
py-spy==0.3.14
py4j==0.10.9.7
pyarrow==12.0.1
pyasn1==0.5.0
pyasn1-modules==0.3.0
pycparser==2.21
pycryptodome==3.18.0
pydantic==1.10.12
Pygments==2.16.1
pyparsing==3.0.9
pyrsistent==0.19.3
PySocks==1.7.1
pytesseract==0.3.10
python-dateutil==2.8.2
pytorch-lightning==1.9.5
pytorch-metric-learning==1.7.3
pytz==2023.3
PyWavelets==1.4.1
PyYAML==6.0.1
pyzmq==25.1.0
ray==2.3.1
regex==2023.8.8
requests==2.28.2
requests-oauthlib==1.3.1
responses==0.18.0
rich==13.4.2
rsa==4.9
s3transfer==0.6.1
safetensors==0.3.2
scikit-image==0.19.3
scikit-learn==1.2.2
scipy==1.11.1
seaborn==0.12.2
sentencepiece==0.1.99
seqeval==1.2.2
six==1.16.0
smart-open==6.3.0
soupsieve==2.4.1
spacy==3.6.1
spacy-legacy==3.0.12
spacy-loggers==1.0.4
srsly==2.4.7
stack-data==0.6.2
statsforecast==1.4.0
statsmodels==0.14.0
tabulate==0.9.0
tenacity==8.2.2
tensorboard==2.14.0
tensorboard-data-server==0.7.1
tensorboardX==2.6.2
text-unidecode==1.3
thinc==8.1.12
threadpoolctl==3.2.0
tifffile==2023.7.18
timm==0.9.5
tokenizers==0.13.3
toolz==0.12.0
torch==1.13.1
torchmetrics==0.11.4
torchvision==0.14.1
tornado==6.3.2
tqdm==4.65.1
traitlets==5.9.0
transformers==4.26.1
typer==0.9.0
typing_extensions==4.7.1
tzdata==2023.3
ujson==5.8.0
urllib3==1.26.16
virtualenv==20.24.3
wasabi==1.1.2
wcwidth==0.2.6
Werkzeug==2.3.6
window-ops==0.0.14
xgboost==1.7.6
xxhash==3.3.0
yarl==1.9.2

View file

@ -1,109 +0,0 @@
from model_data.simulation_system.core.Logger import logger
import argparse
import pandas as pd
from pathlib import Path
from model_data.simulation_system.core.Settings import RANDOM_SEED, TRAIN_AND_VALIDATION_DATA_NAME, TEST_DATA_NAME
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--filepath",
type=str,
help="Location of Parquet dataset to load",
required=True,
)
parser.add_argument(
"--output-folder",
type=str,
help="Location of Parquet dataset to save",
required=True,
)
parser.add_argument(
"--percentage",
type=float,
help="Percentage of data to use as test data",
default=None,
)
parser.add_argument(
"--volume", type=int, help="Volume of data to use as test data", default=None
)
parser.add_argument(
"--sampling",
type=str,
help="Type of sampling to do for test data",
choices=["random", "stratified"],
default="random",
)
args = parser.parse_args()
return args
def main(
filepath: str, output_folder: str, percentage: float, volume: int, sampling: str
):
"""
Load a dataset in and split out the training+validation data and the test data.
"""
logger.info("---Loading Data---")
data = pd.read_parquet(filepath).reset_index(drop=True)
if percentage and volume is None:
test_amount = round(len(data) * percentage)
elif percentage is None and volume:
test_amount = volume
elif percentage is None and volume is None:
logger.error(
"No amount specified - please specify either a percentage or volume"
)
exit(1)
else:
logger.info("Both percentage and volume specified - taking largest of the two")
test_amount = max(round(len(data) * percentage), volume)
logger.info(f"---Extracting {test_amount} from dataset to be test data")
train_validation_data = pd.DataFrame()
test_data = pd.DataFrame()
if sampling == "random":
logger.info("--- Using random sample method ---")
sample_index = data.sample(n=test_amount, random_state=RANDOM_SEED).index
train_validation_data = data.drop(sample_index)
test_data = data.iloc[sample_index]
elif sampling == "stratified":
# Not yet implemented
pass
logger.info("--- Saving data ---")
train_validation_data.to_parquet(
Path(output_folder) / TRAIN_AND_VALIDATION_DATA_NAME
)
test_data.to_parquet(Path(output_folder) / TEST_DATA_NAME)
logger.info(" ---Pipeline complete---")
if __name__ == "__main__":
logger.info("--- Generate test data pipeline ---")
args = ingest_arguments()
main(
filepath=args.filepath,
output_folder=args.output_folder,
percentage=args.percentage,
volume=args.volume,
sampling=args.sampling,
)

View file

@ -1,259 +0,0 @@
import argparse
import os
import shutil
from pathlib import Path
from datetime import datetime
import pandas as pd
from MLModel.Models import model_factory
from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor
from core.CloudClient import BotoClient
from core.RegistryHandler import RegistryHandler
from core.Settings import (
MODEL_DIRECTORY,
REGISTRY_FILE,
MODEL_FOLDER,
METRICS_FOLDER,
DEPLOYMENT_FOLDER,
SUBSAMPLE_FACTOR,
MODEL_HYPERPARAMETERS,
TIMESTAMP_FORMAT,
RESIDUAL_FILE,
BEST_MODEL_COLUMN_NAME,
OPTIMISE_METRIC,
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
# train_filepath = "./model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "./model_build_data/change_data/rdsap_full/test_data.parquet"
# target_column = "RDSAP_CHANGE"
# model_type = "autogluon"
# hyperparameter = HYPERPARAMETERS
# SUBSAMPLE_FACTOR = 200
# Mock location
# train_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet"
# test_filepath = "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
# To run script in local mode:
# python3 training.py --train-filepath ./model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath ./model_build_data/change_data/rdsap_full/test_data.parquet
# To run script in local-mock mode:
# python3 training.py --train-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet --test-filepath s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
def ingest_arguments() -> argparse.Namespace:
"""
Helper function to take in arguments from script start
"""
parser = argparse.ArgumentParser(description="Inputs for training script")
parser.add_argument(
"--train-filepath",
type=str,
help="Location of Parquet dataset to load for training",
required=True,
)
parser.add_argument(
"--test-filepath",
type=str,
help="Location of Parquet dataset to load for testing",
required=True,
)
parser.add_argument(
"--model-type",
type=str,
help="The type of model to train",
choices=["autogluon"],
default="autogluon",
)
parser.add_argument(
"--target-column",
type=str,
help="The response variable",
choices=["RDSAP_CHANGE", "HEAT_DEMAND_CHANGE"],
default="RDSAP_CHANGE",
)
args = parser.parse_args()
return args
def training(
train_filepath: str,
test_filepath: str,
target_column: str = "RDSAP_CHANGE",
model_type: str = "autogluon",
hyperparameters: dict | None = None,
) -> None:
"""
Pipeline to run training on the dataset
"""
logger.info("--- Loading data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
train_df = dataloader.load(client=CLIENT, filepath=train_filepath)
test_df = dataloader.load(client=CLIENT, filepath=test_filepath)
if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline")
logger.info("--- Feature processing ---")
feature_processor = FeatureProcessor()
# This is for convenience for now
subsample_amount = round(len(train_df) / SUBSAMPLE_FACTOR)
train_df = feature_processor.process(
train_df, target_column=target_column, subsample_amount=subsample_amount
)
test_df = feature_processor.process(test_df, target_column=target_column)
logger.info("--- Build Model ---")
logger.info("--- Load Hyperparameters ---")
if hyperparameters is None:
logger.info("Use base hyperparameters in settings")
hyperparameters = MODEL_HYPERPARAMETERS[model_type]
logger.info(f"Hyperparameters are: {hyperparameters}")
logger.info(
"--- Loading model configuration (Model type and Naming convention) ---"
)
# We might want to have hyperparameters in the names to make models more recognisable
model_toolkit = model_factory(
model_type=model_type, hyperparameters=hyperparameters
)
model_root = (
f"{target_column}-{model_toolkit['naming_attributes']}-{TIMESTAMP}".lower()
)
output_base = Path(MODEL_DIRECTORY) / target_column / model_type / model_root
# Will need to pass output path to model (for saving purposes)
model = model_toolkit["model"](output_filepath=output_base / MODEL_FOLDER)
model.train_model(
data=train_df, target_column=target_column, hyperparameters=hyperparameters
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath, client=CLIENT)
logger.info("--- Generate evaluation metrics ---")
metrics = Metrics()
metric_output_path = output_base / METRICS_FOLDER
metrics_df = model.model_evaluation(
validation_data=test_df,
target_column=target_column,
metrics_location=metric_output_path,
metrics=metrics,
)
logger.info("--- Generate metric outputs using predictions ---")
# metrics.generate_plot_suite()
plot_output_path = output_base / METRICS_FOLDER / RESIDUAL_FILE
metrics.generate_residual_plot(
actuals=test_df[target_column],
predictions=model.predictions,
target_column=target_column,
output_filepath=plot_output_path,
)
metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
# TODO: introduce a seperate script for model optimisation, and from there, optimise for deployment
# Imagining for now that the model trained here is the best model amongst all models built
logger.info("--- Optimising model for deployment ---")
deployment_model_path = output_base / DEPLOYMENT_FOLDER
model.optimise_model_for_deployment(deployment_path=deployment_model_path)
logger.info(
f"Optimised version of best model can be found at: {deployment_model_path}"
)
model.save_model(output_filepath=deployment_model_path, client=CLIENT)
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
# Loading registry from s3
logger.info("--- Append registry with new model ---")
registry_handler = RegistryHandler()
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, client=CLIENT, metrics=metrics
)
model_details_df = pd.DataFrame(
[
{
"model_type": model_type,
"model_name": model_root,
"model_location": deployment_model_path,
}
]
)
registry_row = pd.concat([model_details_df, metrics_df], axis=1)
registry_df = pd.concat([registry_df, registry_row], axis=0).reset_index(drop=True)
registry_df = sort_by_metric(
registry_df,
optimse_metric=OPTIMISE_METRIC,
best_model_column_name=BEST_MODEL_COLUMN_NAME,
)
logger.info("--- Saving new model to registry ---")
# Ensure the directory exists
registry_path.parent.mkdir(parents=True, exist_ok=True)
registry_df.to_csv(registry_path, index=False)
registry_handler.save_registry(output_filepath=registry_path, client=CLIENT)
logger.info("--- Clean up ---")
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():
logger.info("Removing local development files not in s3")
shutil.rmtree(Path(MODEL_DIRECTORY))
logger.info("--- Training Pipeline Complete --- ")
if __name__ == "__main__":
logger.info("---Begin Pipeline---")
logger.info("---Ingest Arguments---")
args = ingest_arguments()
training(
train_filepath=args.train_filepath,
test_filepath=args.test_filepath,
target_column=args.target_column,
model_type=args.model_type,
)

View file

@ -1,35 +0,0 @@
# Temporary input data
input_data = [
{
"address1": "28 Distillery Wharf",
"postcode": "w6 9bf"
},
{
"address1": "Flat 14 Godley V C House",
"postcode": "E2 0LP"
},
{
"address1": "49 Elderfield Road",
"postcode": "E5 0LF"
},
{
"address1": "26 Stanhope Road",
"postcode": "N6 5NG"
},
{
"address1": "Flat 3 Frederick Building",
"postcode": "N1 4BD"
},
{
"address1": "Flat 4 Frederick Building",
"postcode": "N1 4BD"
},
{
"address1": "Flat 28, 22 Adelina Grove",
"postcode": "E1 3BX"
},
{
"address1": "Flat 39, 239 Long Lane",
"postcode": "SE1 4PT"
}
]

View file

@ -1,37 +0,0 @@
import pytest
from model_data.EpcClean import EpcClean
from model_data.analysis.SapModel import SapModel
from model_data.tests.test_data.sap_model_data import data
class TestSapModel:
@pytest.fixture
def cleaner(self):
cleaner = EpcClean(data=data)
cleaner.clean()
return cleaner
@pytest.fixture
def model(self, cleaner):
model = SapModel(data, cleaner=cleaner)
return model
def test_run(self, model):
assert model.final_model is None
assert model.test_model is None
model.run()
assert model.final_model is not None
assert model.test_model is not None
# Note - this will potentially be different on different machines so may need to adjust these tests accordingly
# when running in CI/CD
assert (model.fit_error['Median Absolute Error'] - 1.7316860436422203) < 0.00001
assert (model.predict_error['Median Absolute Error'] - 2.85481857667385) < 0.00001
# final model doesn't do that well on this test data
assert (model.final_error['Median Absolute Error'] - 10.050349496213855) < 0.00001

View file

@ -91,8 +91,8 @@ class WallRecommendations(Definitions):
if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT:
raise NotImplementedError("Haven't handled the case of other u value units yet")
# TODO: It's worth thinking about this logic because depending on when properties were built,
# they're likely to be of a certain standard. E.g. properties built within a certain time
# TODO: It's worth thinking about this logic because depending on when property_change were built,
# they're likely to be of a certain standard. E.g. property_change built within a certain time
# period are likely to have cavity walls
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already

View file

@ -230,7 +230,7 @@ class TestWallRecommendations:
The important data for this recommendation is:
- u value of 0.16
- property built in 2014
Since properties built after 1990 are typically built with insulation and this property
Since property_change built after 1990 are typically built with insulation and this property
already has really good insulation, we do NOT recommend any measures for this property
"""
input_properties[0].year_built = 2014

View file

@ -1,62 +0,0 @@
service: sapmodel
provider:
name: aws
region: eu-west-2
architecture: x86_64
environment:
RUNTIME_ENVIRONMENT: ${env:RUNTIME_ENVIRONMENT}
MODEL_DIRECTORY_BUCKET: ${env:MODEL_DIRECTORY_BUCKET}
PREDICTIONS_BUCKET: ${env:PREDICTIONS_BUCKET}
DATA_BUCKET: ${env:DATA_BUCKET}
DOMAIN_NAME: ${env:DOMAIN_NAME}
ECR_URI: ${env:ECR_URI}
GITHUB_SHA: ${env:GITHUB_SHA}
iam:
role:
name: sapmodel_s3_access
statements:
# Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET
- Effect: Allow
Action:
- s3:*
# - s3:GetObject
# - s3:ListBucket
Resource:
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/*
- arn:aws:s3:::${env:DATA_BUCKET}
- arn:aws:s3:::${env:DATA_BUCKET}/*
# Allow reading and writing to PREDICTIONS_BUCKET
- Effect: Allow
Action:
- s3:*
# - s3:GetObject
# - s3:PutObject
# - s3:ListBucket
Resource:
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}/*
plugins:
- serverless-domain-manager
custom:
customDomain:
domainName: api.${self:provider.environment.DOMAIN_NAME}
basePath: 'sapmodel'
createRoute53Record: true
certificateArn: ${ssm:/ssl_certificate_arn}
functions:
sap_prediction_lambda:
image:
uri: ${env:ECR_URI}:${env:GITHUB_SHA}
# role: sapPredictionLambdaRole
events:
- http:
path: /predict
method: POST
timeout: 120 # Set max run time to 2 minutes - we shouldn't need this much time so this can be reviewed