merged in main

This commit is contained in:
Michael Duong 2023-09-06 10:50:34 +01:00
commit 2fd3c4f6ae
34 changed files with 724 additions and 276 deletions

View file

@ -9,6 +9,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# TODO: use dvc to pull data, mkdir s3-mock, load data, then use docker compose
# - name: Build docker compose stack
# run: |
# cd model_data/simulation_system
# docker-compose up -d
- uses: actions/setup-python@v4
- uses: iterative/setup-cml@v1
- name: Train model

View file

@ -89,6 +89,7 @@ jobs:
ENVIRONMENT: ${{ github.ref_name }}
SECRET_KEY: ${{ secrets.NEXTAUTH_SECRET }}
PLAN_TRIGGER_BUCKET: 'retrofit-plan-inputs-${{ github.ref_name }}'
DATA_BUCKET: 'retrofit-data-${{ github.ref_name }}'
DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }}
EPC_AUTH_TOKEN: ${{ steps.set_auth_token.outputs.auth_token }}
DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }}

View file

@ -20,7 +20,7 @@ jobs:
- name: Install Serverless and plugins
run: |
npm install -g serverless
# npm install -g serverless-domain-manager
npm install -g serverless-domain-manager
- name: AWS credentials for dev
if: github.ref == 'refs/heads/dev'

View file

@ -0,0 +1,32 @@
columntypes = {
'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object',
'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64',
'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64',
'CONSTRUCTION_AGE_BAND': 'object', 'TRANSACTION_TYPE_STARTING': 'object',
'WALLS_DESCRIPTION_STARTING': 'object',
'FLOOR_DESCRIPTION_STARTING': 'object', 'LIGHTING_DESCRIPTION_STARTING': 'object',
'ROOF_DESCRIPTION_STARTING': 'object', 'MAINHEAT_DESCRIPTION_STARTING': 'object',
'HOTWATER_DESCRIPTION_STARTING': 'object', 'MAIN_FUEL_STARTING': 'object',
'MECHANICAL_VENTILATION_STARTING': 'object',
'SECONDHEAT_DESCRIPTION_STARTING': 'object', 'ENERGY_TARIFF_STARTING': 'object',
'SOLAR_WATER_HEATING_FLAG_STARTING': 'object', 'PHOTO_SUPPLY_STARTING': 'float64',
'WINDOWS_DESCRIPTION_STARTING': 'object', 'GLAZED_TYPE_STARTING': 'object',
'MULTI_GLAZE_PROPORTION_STARTING': 'float64', 'LOW_ENERGY_LIGHTING_STARTING': 'float64',
'NUMBER_OPEN_FIREPLACES_STARTING': 'float64', 'MAINHEATCONT_DESCRIPTION_STARTING': 'object',
'EXTENSION_COUNT_STARTING': 'float64', 'LODGEMENT_DATE_STARTING': 'object',
'TRANSACTION_TYPE_ENDING': 'object',
'WALLS_DESCRIPTION_ENDING': 'object', 'FLOOR_DESCRIPTION_ENDING': 'object',
'LIGHTING_DESCRIPTION_ENDING': 'object',
'ROOF_DESCRIPTION_ENDING': 'object', 'MAINHEAT_DESCRIPTION_ENDING': 'object',
'HOTWATER_DESCRIPTION_ENDING': 'object',
'MAIN_FUEL_ENDING': 'object', 'MECHANICAL_VENTILATION_ENDING': 'object',
'SECONDHEAT_DESCRIPTION_ENDING': 'object',
'ENERGY_TARIFF_ENDING': 'object', 'SOLAR_WATER_HEATING_FLAG_ENDING': 'object',
'PHOTO_SUPPLY_ENDING': 'float64',
'WINDOWS_DESCRIPTION_ENDING': 'object', 'GLAZED_TYPE_ENDING': 'object',
'MULTI_GLAZE_PROPORTION_ENDING': 'float64',
'LOW_ENERGY_LIGHTING_ENDING': 'float64', 'NUMBER_OPEN_FIREPLACES_ENDING': 'float64',
'MAINHEATCONT_DESCRIPTION_ENDING': 'object', 'EXTENSION_COUNT_ENDING': 'float64',
'LODGEMENT_DATE_ENDING': 'object',
'id': 'object'
}

View file

@ -15,6 +15,9 @@ from backend.app.db.utils import row2dict
from starlette.responses import Response
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
from datetime import datetime
import pandas as pd
import requests
# database interaction functions
from backend.app.db.functions.property_functions import (
@ -22,16 +25,20 @@ from backend.app.db.functions.property_functions import (
)
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.recommendations_functions import (
create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations,
upload_recommendations
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.connection import db_engine
from backend.app.plan.columntypes import columntypes
from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from backend.app.utils import epc_to_sap_lower_bound
from backend.app.utils import epc_to_sap_lower_bound, save_dataframe_to_s3_parquet, read_parquet_from_s3
from model_data.optimiser.optimiser_functions import prepare_input_measures
from model_data.simulation_system.core.DataProcessor import DataProcessor
from model_data.simulation_system.core.Settings import (
FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON
)
# TODO: This is placeholder until data is stored in DB
from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls
@ -212,6 +219,8 @@ async def trigger_plan(body: PlanTriggerRequest):
# TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
recommendations_scoring_data = []
for p in input_properties:
property_recommendations = []
@ -323,7 +332,111 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations[p.id] = property_recommendations
# Once we're done, we'll store:
# Finally, we'll prepare data for predicting the impact on SAP
# TODO: We should use the cleaned data from get_components in the data rather than the raw
# values. We should create a method in Property which takes the EPC data and inserts the cleaned
# data
epc_data = p.data.copy()
epc_data = pd.DataFrame([epc_data])
epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns]
starting_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_STARTING")
ending_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_ENDING")
fixed_data = epc_data[FIXED_FEATURES]
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["LODGEMENT_DATE_ENDING"] = datetime.now().strftime("%Y-%m-%d")
scoring_map = {
'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)',
'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)',
'Solid, no insulation (assumed)': 'Solid, insulated (assumed)',
}
for rec in property_recommendations:
scoring_dict = {
"UPRN": p.data["uprn"],
"id": "+".join([str(p.id), str(rec["recommendation_id"])]),
"LOCAL_AUTHORITY": p.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
}
# We update the description to indicate it's insulated
if rec["type"] == "wall_insulation":
scoring_dict["WALLS_DESCRIPTION_ENDING"] = scoring_map[p.walls["clean_description"]]
elif rec["type"] == "floor_insulation":
scoring_dict["FLOOR_DESCRIPTION_ENDING"] = scoring_map[p.floor["clean_description"]]
else:
raise NotImplementedError("Implement me")
recommendations_scoring_data.append(scoring_dict)
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Clean the data
cleaning_data = read_parquet_from_s3(
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"})
# Merge the cleaning data onto recommendations_scoring_data
recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
# Perform the same cleaning as in the model
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"]
)
recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"])
# Note: We might need to perform the full pre-processing here
data_processor = DataProcessor(filepath=None)
data_processor.insert_data(recommendations_scoring_data)
data_processor.remap_columns()
recommendations_scoring_data = data_processor.data
# Remap column types
recommendations_scoring_data = recommendations_scoring_data.astype(columntypes)
# Store parquet file in s3 for scoring
created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format(
portfolio_id=body.portfolio_id,
timestamp=created_at
)
save_dataframe_to_s3_parquet(
df=recommendations_scoring_data,
bucket_name="retrofit-data-dev",
file_key=file_location
)
# Call the sap change model
response = requests.post(
url="https://api.dev.hestia.homes/sapmodel/predict",
json={
"file_location": "s3://retrofit-data-dev/" + file_location,
"property_id": 999,
"portfolio_id": 4,
"created_at": created_at
}
)
# TODO: Handle the response depending on response code
# Retrieve the predictions
predictions = read_csv_from_s3(
bucket_name="retrofit-sap-predictions-dev",
filepath=f"{body.portfolio_id}/999/{created_at}.csv"
)
predictions = pd.DataFrame(predictions)
# We round the predictions
predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(0)
# Extract property_id and recommendation_id
predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True)
# 1) the property data
# 2) the property details (epc)
# 3) the recommendations
@ -344,6 +457,16 @@ async def trigger_plan(body: PlanTriggerRequest):
if not recommendations_to_upload:
continue
property_predictions = predictions[predictions["property_id"] == str(p.id)]
for rec in recommendations_to_upload:
# Insert the prediction for sap points
rec["sap_points"] = property_predictions[property_predictions["recommendation_id"] == str(
rec["recommendation_id"]
)]["RDSAP_CHANGE"].values[0]
if not rec["sap_points"]:
raise ValueError("Sap points missing")
# Create a plan
new_plan_id = create_plan(
session,

View file

@ -4,6 +4,8 @@ from io import StringIO
import string
import secrets
import logging
import pandas as pd
from io import BytesIO
def setup_logger(log_file=None, level=logging.INFO, overwrite_handler=False):
@ -117,3 +119,36 @@ def epc_to_sap_lower_bound(epc: str):
return 1
else:
raise ValueError("EPC rating should be between A and G")
def read_parquet_from_s3(bucket_name, file_key):
client = boto3.client('s3')
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
df = pd.read_parquet(BytesIO(csv_body))
return df
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.
:param df: The pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket)
"""
# Convert the DataFrame to a Parquet format in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer)
# Create the boto3 client
s3 = boto3.resource('s3')
# Upload the Parquet file to S3
s3.Object(bucket_name, file_key).put(Body=parquet_buffer.getvalue())

View file

@ -40,6 +40,7 @@ COPY ./model_data/config.py ./model_data/config.py
COPY ./model_data/optimiser/ ./model_data/optimiser/
COPY ./model_data/__init__.py ./model_data/__init__.py
COPY ./model_data/EpcClean.py ./model_data/EpcClean.py
COPT ./model_data/simulation_system/core/ ./model_data/simulation_system/core/
COPY ./model_data/utils.py ./model_data/utils.py
COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/
COPY ./datatypes/ ./datatypes/

View file

@ -32,3 +32,5 @@ psycopg2-binary
pytz==2023.3
mip==1.15.0
boto3==1.28.3
pandas==1.5.3
pyarrow==12.0.1

View file

@ -18,3 +18,5 @@ statsmodels
scikit-learn
pyspellchecker
textblob
boto3
pyarrow

View file

@ -0,0 +1 @@
local_model/*

View file

@ -6,6 +6,7 @@ ARG GID=100
# Install patches
RUN apt-get update && apt-get upgrade -y \
&& apt-get install libgomp1 -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists
@ -34,4 +35,4 @@ USER ${USER}
WORKDIR /home/simulation_system
# Run the python command
CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"]
CMD ["python3", "predictions.py", "--data-path", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"]

View file

@ -6,6 +6,7 @@ ARG GID=100
# Install patches
RUN apt-get update && apt-get upgrade -y \
&& apt-get install libgomp1 -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists
@ -34,4 +35,4 @@ USER ${USER}
WORKDIR /home/simulation_system
# Run the python command
CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"]
CMD ["python3", "training.py", "--train-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"]

View file

@ -8,6 +8,7 @@ Key tasks:
- Generate Inference
"""
import os
from typing import Any
from pathlib import Path
import pandas as pd
@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor
from core.Logger import logger
from core.Metrics import Metrics
from core.Settings import METRIC_FILENAME
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
AUTOGLUON_HYPERPARAMETERS = [
"problem_type",
@ -54,34 +55,55 @@ class AutogluonModel:
def load_model(
self,
filepath: str | Path,
s3_client: S3FSClient,
client: BotoClient,
model_folder: str = "local_model",
) -> None:
"""
Providing a path, this function will load the model to be used. Will load to internal variable
"""
filepath = str(filepath)
if s3_client.client is None:
if client.client is None:
logger.info("In local development mode - no need for s3 client")
self.model = TabularPredictor.load(path=filepath)
else:
logger.info(f"Loading model from s3")
s3_client.download_model(filepath=filepath, model_folder=model_folder)
self.model = TabularPredictor.load(path=model_folder)
logger.info(f"Loading model from s3 with filepath: %s and model_folder: %s" % (filepath, model_folder))
client.download_model(filepath=filepath, model_folder=model_folder)
self.model = TabularPredictor.load(path=str(Path(model_folder) / filepath))
def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
def save_model(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("In local development mode - no need for s3 client")
logger.info("Using AutoGluon Model - Model saving already occured")
else:
logger.info(f"Saving model into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
def train_model(
self, data: pd.DataFrame, target_column: str, hyperparameters: dict
) -> None:

View file

@ -3,17 +3,105 @@ Set up the client to be used for downloading and uploading model files
"""
import os
import s3fs
import boto3
from core.Logger import logger
class S3FSClient:
# class S3FSClient:
# """
# Set up the correct client to upload files to s3
# """
# def __init__(self, runtime_environment: str = "local") -> None:
# self.client: s3fs.S3FileSystem | None = None
# self.model_bucket: str
# self.client_factory(runtime_environment)
# self.determine_model_bucket(runtime_environment)
# def client_factory(self, runtime_environment: str = "local"):
# """
# Select the correct s3 client to use
# """
# if runtime_environment == "local":
# logger.info("No S3 client setup required")
# elif runtime_environment == "local-mock":
# logger.info(f"S3 settings for {runtime_environment}")
# self.client = s3fs.S3FileSystem(
# key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
# secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
# client_kwargs={
# "endpoint_url": os.environ.get(
# "ENDPOINT_URL", "http://localhost:9000"
# )
# },
# )
# elif runtime_environment in ["dev", "staging", "prod"]:
# logger.info(f"S3 settings for {runtime_environment}")
# # Key/ token should be in session/lambda for this
# self.client = s3fs.S3FileSystem()
# else:
# raise NotImplementedError("No correspnding runtime environment")
# def determine_model_bucket(self, runtime_environment: str) -> None:
# """
# For the given environment, return the correct bucket for models
# """
# if runtime_environment == "local":
# logger.info("In local development - no need for s3")
# elif runtime_environment in ["local-mock", "dev"]:
# # TODO: get from enironment
# self.model_bucket = "retrofit-model-directory-dev"
# elif runtime_environment in ["staging", "prod"]:
# self.model_bucket = f"retrofit-model-directory-{runtime_environment}"
# else:
# raise NotImplementedError("No corresponding runtime environment")
# def download_model(self, filepath: str, model_folder: str):
# """
# For the file path, download the model locally so that we can load the model
# """
# if self.client is None:
# logger.info("No need to download model as local development")
# else:
# def list_files_recursively(folder_path, client):
# all_files = []
# for root, dirs, files in client.walk(folder_path):
# for file in files:
# s3_path = os.path.join(root, file)
# all_files.append(s3_path)
# return all_files
# # List all files in the specified S3 folder and its subfolders
# files = list_files_recursively(
# f"{self.model_bucket}/{filepath}", client=self.client
# )
# # Download each file
# for file in files:
# # Extract the filename from the S3 path
# filename = file.split(filepath)[-1]
# # Define the local path where you want to save the file
# local_path = os.path.join(model_folder, filename)
# # Download the file from S3 to the local directory
# self.client.get(file, local_path)
# print(f"Downloaded {filename} to {local_path}")
# print("Download completed.")
class BotoClient:
"""
Set up the correct client to upload files to s3
Using boto3 to access the different aws storage configurations
"""
def __init__(self, runtime_environment: str = "local") -> None:
self.client: s3fs.S3FileSystem | None = None
self.client = None
self.model_bucket: str
self.client_factory(runtime_environment)
@ -28,19 +116,19 @@ class S3FSClient:
logger.info("No S3 client setup required")
elif runtime_environment == "local-mock":
logger.info(f"S3 settings for {runtime_environment}")
self.client = s3fs.S3FileSystem(
key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
client_kwargs={
"endpoint_url": os.environ.get(
"ENDPOINT_URL", "http://localhost:9000"
)
},
session = boto3.Session()
self.client = session.client(
service_name="s3",
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
aws_secret_access_key=os.environ.get(
"AWS_SECRET_ACCESS_KEY", "password"
),
endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000"),
)
elif runtime_environment in ["dev", "staging", "prod"]:
logger.info(f"S3 settings for {runtime_environment}")
# Key/ token should be in session/lambda for this
self.client = s3fs.S3FileSystem()
self.client = boto3.client("s3")
else:
raise NotImplementedError("No correspnding runtime environment")
@ -62,34 +150,34 @@ class S3FSClient:
"""
For the file path, download the model locally so that we can load the model
"""
# List all objects with the specified prefix in the bucket
if self.client is None:
logger.info("No need to download model as local development")
else:
raise ValueError("SHould not be in here!")
def list_files_recursively(folder_path, client):
all_files = []
for root, dirs, files in client.walk(folder_path):
for file in files:
s3_path = os.path.join(root, file)
all_files.append(s3_path)
return all_files
objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath)
# List all files in the specified S3 folder and its subfolders
files = list_files_recursively(
f"{self.model_bucket}/{filepath}", client=self.client
# Ensure the local directory for downloads exists
if not os.path.exists(model_folder):
os.makedirs(model_folder)
# Download each object with the specified prefix
for obj in objects.get("Contents", []):
# Get the object key (file path)
object_key = obj["Key"]
# Determine the local file path to save the object
local_file_path = os.path.join(
model_folder, object_key.split(f"{filepath}/")[-1]
)
# Download each file
for file in files:
# Extract the filename from the S3 path
filename = file.split(filepath)[-1]
# Create the local directory if it doesn't exist
local_directory = os.path.dirname(local_file_path)
if not os.path.exists(local_directory):
os.makedirs(local_directory)
# Define the local path where you want to save the file
local_path = os.path.join("local_model", filename)
# Download the object from S3 to the local directory
self.client.download_file(self.model_bucket, object_key, local_file_path)
print(f"Downloaded {object_key} to {local_file_path}")
# Download the file from S3 to the local directory
self.client.get(file, local_path)
print(f"Downloaded {filename} to {local_path}")
print("Download completed.")
print("Download completed.")

View file

@ -3,9 +3,10 @@ import os
from typing import Protocol
import boto3
from io import BytesIO, StringIO
from core.CloudClient import BotoClient
def read_parquet_from_s3(bucket_name, file_key):
def read_parquet_from_s3(client, bucket_name, file_key):
"""
Read a CSV file from S3 using boto3 and pandas.
@ -15,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key):
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Initialize the S3 client
s3_client = boto3.client("s3")
# Get the object
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key)
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
@ -28,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key):
return df
def read_csv_from_s3(bucket_name, file_key, index_col):
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
@ -38,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col):
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Initialize the S3 client
s3_client = boto3.client("s3")
# Get the object
s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key)
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
@ -57,7 +54,9 @@ class DataLoader(Protocol):
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None:
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame | None:
"""
Loading data from the relevant source
"""
@ -69,7 +68,9 @@ class LocalDataLoader:
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
@ -86,56 +87,36 @@ class LocalDataLoader:
return df
class S3MockDataLoader:
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
# TODO: Ingest these as environment variables in the docker compose file
storage_options = {
"key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
"secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
"client_kwargs": {
"endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000")
},
}
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
if not filepath.startswith("s3://"):
filepath = "s3://" + filepath
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath, storage_options=storage_options)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(
filepath, index_col=index_col, storage_options=storage_options
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files
"""
@staticmethod
def load(filepath: str, index_col: str | None = None) -> pd.DataFrame:
filepath_split = filepath.split("s3://")[-1].split("/", 1)
bucket = filepath_split[0]
key = filepath_split[1]
if filepath.endswith(".parquet"):
df = read_parquet_from_s3(bucket, key)
df = read_parquet_from_s3(
client=client.client, bucket_name=bucket, file_key=key
)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = read_csv_from_s3(bucket, key, index_col)
df = read_csv_from_s3(
client=client.client,
bucket_name=bucket,
file_key=key,
index_col=index_col,
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
@ -152,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
dataloader_types = {
"local": LocalDataLoader(),
"local-mock": S3MockDataLoader(),
"local-mock": S3DataLoader(),
"dev": S3DataLoader(),
"staging": S3DataLoader(),
"prod": S3DataLoader(),

View file

@ -21,17 +21,24 @@ class DataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path) -> None:
def __init__(self, filepath: Path | None) -> None:
self.filepath = filepath
self.data: pd.DataFrame
def load_data(self, low_memory=False) -> None:
if not self.filepath:
raise ValueError("No filepath specified")
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
def insert_data(self, data: pd.DataFrame) -> None:
self.data = data
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if not self.data:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.confine_data()
# TODO: CLean number of heated rooms and habitable rooms
@ -87,7 +94,7 @@ class DataProcessor:
# Remap certain columns
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
self.data = data
@ -264,3 +271,42 @@ class DataProcessor:
self.data["MULTI_GLAZE_PROPORTION"]
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100
@staticmethod
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
:param data_to_clean: DataFrame to be cleaned.
:param cleaning_data: DataFrame containing data for cleaning.
:param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this
differs depending on where the function is being used.
:return: Cleaned DataFrame.
"""
# Enforce data types
for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]:
data_to_clean[col] = data_to_clean[col].astype(float)
# Identify columns with non-NaN values
columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist()
# Calculate averages
cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg(
{"TOTAL_FLOOR_AREA": "mean", "FLOOR_HEIGHT": "mean"}
)
# Merge with the original data
data_to_clean = pd.merge(
data_to_clean,
cleaning_averages_to_merge,
on=columns_to_merge_on,
suffixes=("", "_AVERAGE"),
how="left",
)
# Fill NaN values with averages
for col in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]:
data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True)
data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True)
return data_to_clean

View file

@ -5,11 +5,12 @@ Key tasks:
- Given a model and test data, produce a suite of all metrics
"""
import os
import pandas as pd
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.Logger import logger
from core.Settings import (
RESIDUAL_TRUE_LABEL,
@ -64,18 +65,40 @@ class Metrics:
All metric functions used to generate a dictionary of metrics
"""
def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the metrics folders/files.
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("In local development mode - no need to upload")
else:
logger.info(f"Saving metrics into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
s3_location = client.model_bucket + "/" + str(output_filepath)
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
@staticmethod
def list_metric_functions() -> list:
"""

View file

@ -2,38 +2,65 @@
"""
from io import StringIO
import pandas as pd
from pathlib import Path
from core.Logger import logger
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.Settings import BEST_MODEL_COLUMN_NAME
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param aws_access_key_id: AWS Access Key ID
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
df = pd.read_csv(StringIO(csv_body), index_col=index_col)
return df
class RegistryHandler:
"""
Handles the loading of the registry depending on the environment
"""
def load_registry(
self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics
):
def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics):
"""
Depening on the environment, we will have to load from locally or s3 (mock/real)
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("Using local development - no need for s3 load")
return self.load_local_registry(
registry_path=registry_path, metrics=metrics
)
s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path)
logger.info(f"Check if registry exists")
if s3fs_client.client.exists(s3_location):
registry_df = pd.read_csv(
s3fs_client.client.open(s3_location), index_col=None
check_exists = client.client.list_objects_v2(
Bucket=client.model_bucket, Prefix=str(registry_path)
)
if "Contents" in check_exists:
logger.info("Loading existing registry")
registry_df = read_csv_from_s3(
client=client.client,
bucket_name=client.model_bucket,
file_key=str(registry_path),
index_col=None,
)
else:
logger.info("No registry found - creating new one")
@ -70,14 +97,18 @@ class RegistryHandler:
return registry_df
def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None:
def save_registry(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if s3fs_client.client is None:
if client.client is None:
logger.info("In local development mode - no need for s3 client")
else:
logger.info(f"Saving registry into s3")
s3_location = s3fs_client.model_bucket + "/" + str(output_filepath)
s3fs_client.client.put(str(output_filepath), s3_location, recursive=True)
s3_location = client.model_bucket + "/" + str(output_filepath)
client.client.upload_file(
str(output_filepath), client.model_bucket, str(output_filepath)
)
logger.info("Save complete")

View file

@ -99,7 +99,6 @@ COMPONENT_FEATURES = [
"WINDOWS_DESCRIPTION",
"GLAZED_TYPE",
"MULTI_GLAZE_PROPORTION",
"LIGHTING_DESCRIPTION",
"LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES",
"MAINHEATCONT_DESCRIPTION",

View file

@ -18,19 +18,20 @@ services:
timeout: 20s
retries: 3
# simulation_system_training:
# build:
# context: ./
# dockerfile: ./Dockerfiles/Dockerfile.training
# image: simulation_system_training
# environment:
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
# tty: true
# depends_on:
# minio:
# condition: service_healthy
simulation_system_training:
build:
context: ./
dockerfile: ./Dockerfiles/Dockerfile.training
image: simulation_system_training
environment:
RUNTIME_ENVIRONMENT: local-mock
ENDPOINT_URL: http://minio:9000/
AWS_ACCESS_KEY_ID: *MINIO_USER
AWS_SECRET_ACCESS_KEY: *MINIO_PASS
tty: true
depends_on:
minio:
condition: service_healthy
# command:
# ["bash"]
@ -40,6 +41,7 @@ services:
# dockerfile: ./Dockerfiles/Dockerfile.prediction
# image: simulation_system_prediction
# environment:
# RUNTIME_ENVIRONMENT: local-mock
# ENDPOINT_URL: http://minio:9000/
# AWS_ACCESS_KEY_ID: *MINIO_USER
# AWS_SECRET_ACCESS_KEY: *MINIO_PASS
@ -47,7 +49,7 @@ services:
# depends_on:
# simulation_system_training:
# condition: service_completed_successfully
# command:
# command:
# ["bash"]

View file

@ -3,7 +3,7 @@ import pandas as pd
from tqdm import tqdm
from pathlib import Path
from core.Settings import (
from simulation_system.core.Settings import (
MANDATORY_FIXED_FEATURES,
AVERAGE_FIXED_FEATURES,
LATEST_FIELD,
@ -12,9 +12,10 @@ from core.Settings import (
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
)
from core.DataProcessor import DataProcessor
from simulation_system.core.DataProcessor import DataProcessor
from utils import save_dataframe_to_s3_parquet
DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates"
DATA_DIRECTORY = Path(__file__).parent / "simulation_system" / "data" / "all-domestic-certificates"
# TODO: Have a look at temporal features
@ -30,6 +31,7 @@ def app():
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
cleaning_dataset = []
# 116
# 128048706
# PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic
@ -62,28 +64,10 @@ def app():
# property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1
# Extract the columns that are not all None
na_columns = property_data[COLUMNS_TO_MERGE_ON].isna().all()
cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list()
# Get the corresponding groupby and merge, and fill in NA values
cleaning_averages_to_merge = cleaning_averages.groupby(
cleaned_columns_to_merge_on
)[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
modified_property_data = pd.merge(
property_data,
cleaning_averages_to_merge,
on=cleaned_columns_to_merge_on,
suffixes=["", "_AVERAGE"],
)
modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[
"TOTAL_FLOOR_AREA"
].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"])
modified_property_data["FLOOR_HEIGHT"] = modified_property_data[
"FLOOR_HEIGHT"
].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"])
modified_property_data = modified_property_data.drop(
columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"]
modified_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=property_data,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
for field in AVERAGE_FIXED_FEATURES:
@ -107,7 +91,7 @@ def app():
variable_data = modified_property_data[
COMPONENT_FEATURES
+ ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE]
]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
@ -133,10 +117,10 @@ def app():
starting_record = starting_record[
COMPONENT_FEATURES + ["LODGEMENT_DATE"]
].add_suffix("_STARTING")
].add_suffix("_STARTING")
ending_record = ending_record[
COMPONENT_FEATURES + ["LODGEMENT_DATE"]
].add_suffix("_ENDING")
].add_suffix("_ENDING")
features = pd.concat([starting_record, ending_record])
@ -150,7 +134,18 @@ def app():
}
)
dataset.extend(property_model_data)
dataset.append(property_model_data)
cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_dataset.append(cleaning_averages)
# Store cleaning dataset in s3 as a parquet file
cleaning_dataset = pd.concat(cleaning_dataset)
save_dataframe_to_s3_parquet(
df=cleaning_dataset,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
output = pd.DataFrame(dataset)
output.to_parquet("./dataset.parquet")

View file

@ -1,3 +1,7 @@
import boto3
from botocore.exceptions import NoCredentialsError
import json
from io import StringIO
import os
import logging
@ -9,6 +13,33 @@ logger.setLevel(logging.INFO)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
def upload_dataframe_to_s3(df, bucket, s3_file_name):
"""
Upload a pandas DataFrame to an S3 bucket as CSV
:param df: DataFrame to upload
:param bucket: Bucket to upload to
:param s3_file_name: S3 object name
:return: True if file was uploaded, else False
"""
# Initialize the S3 client
s3 = boto3.client('s3')
csv_buffer = StringIO()
# Write the DataFrame to the buffer as CSV
df.to_csv(csv_buffer, index=False)
try:
# Upload the CSV from the buffer to S3
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
return True
except NoCredentialsError:
print("Credentials not available")
return False
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
@ -19,24 +50,44 @@ def handler(event, context):
# Assuming a file in a bucket landing for now?
# Assuming we have a model to use
data_path = event["file_location"]
property_id = event["property_id"]
portfolio_id = event["portfolio_id"]
created_at = event["created_at"]
body = json.loads(event["body"])
data_path = body["file_location"]
property_id = body["property_id"]
portfolio_id = body["portfolio_id"]
created_at = body["created_at"]
try:
# We could fix the model path but for the moment, we just take the best model path based on the registry
outputs = prediction(model_path=None, data_path=data_path)
# Store into s3, with key of {portfolio_id}-{property_id}
storage_filepath = f"s3://retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}/{portfolio_id}/{property_id}/" \
f"{created_at}.csv"
outputs.to_csv(storage_filepath)
return storage_filepath
storage_filepath = f"{portfolio_id}/{property_id}/{created_at}.csv"
upload_dataframe_to_s3(
df=outputs,
bucket=f"retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}",
s3_file_name=storage_filepath
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "Successfully processed input",
"storage_filepath": storage_filepath
})
}
except (Exception, KeyError, ValueError) as e:
logger.info("Prediction failed")
logger.info(e)
return {
"statusCode": 500,
"body": json.dumps({
"message": "Prediction failed",
"error": str(e)
})
}
if __name__ == "__main__":

View file

@ -5,13 +5,16 @@ Script to load MLModel class and generate predictions
import os
import json
import argparse
from pathlib import Path
import pandas as pd
from typing import Optional
from datetime import datetime
from MLModel.Models import AutogluonModel
from core.Logger import logger
from core.DataLoader import dataloader_factory
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.RegistryHandler import RegistryHandler
from core.Settings import (
BASE_REGISTRY_PATH,
REGISTRY_FILE,
@ -19,13 +22,13 @@ from core.Settings import (
PREDICTION_FILE,
METADATA_FILE,
TIMESTAMP_FORMAT,
MODEL_DIRECTORY,
)
TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
# For now just loading data first and then passing into function (i.e. as if we receive json data and convert to
@ -36,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# For testing in dev s3
# Data path can be passed as so:
# python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"
# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"
def ingest_arguments() -> argparse.Namespace:
@ -77,17 +80,6 @@ def prediction(
Main pipeline function
"""
if RUNTIME_ENVIRONMENT == "local":
registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE
if registry_path is None or not registry_path.exists():
logger.error("No registry path provided or registry doesn't exist")
exit(1)
elif RUNTIME_ENVIRONMENT == "dev":
registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv"
else:
raise NotImplemented("TO be implemented")
if model_path is not None:
logger.info("User specified a model to load - ignoring registry")
model_location = model_path
@ -96,7 +88,17 @@ def prediction(
else:
# TODO: Think about where registry will sit/ type
logger.info("Loading best model from registry")
registry_df = pd.read_csv(registry_path)
metrics = Metrics()
registry_handler = RegistryHandler()
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, client=CLIENT, metrics=metrics
)
# registry_df = pd.read_csv(registry_path)
best_model_df = registry_df[registry_df["best_model"]]
model_location = best_model_df["model_location"].values[0]
@ -115,18 +117,15 @@ def prediction(
if data_path and data is None:
logger.info("Loading data from provided path")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
data = dataloader.load(filepath=data_path, index_col="id")
data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None)
if data is None:
raise ValueError("No data loaded")
# # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION
# data = data.sample(1)
else:
logger.info("Using data provided")
data = json.loads(str(data))
data = pd.DataFrame([data])
print(data)
logger.info("--- Loading Model ---")
@ -134,17 +133,19 @@ def prediction(
logger.info("Using an Autogluon model")
model = AutogluonModel()
else:
logger.error("No other model currently")
exit(1)
raise ValueError("No other model currently")
model.load_model(
filepath=model_location, s3_client=CLIENT, model_folder="local_model"
)
# In lambda, only the /tmp folder is writable
model_folder = "/tmp" if RUNTIME_ENVIRONMENT in ["dev", "prod"] else "local_model"
model.load_model(filepath=model_location, client=CLIENT, model_folder=model_folder)
logger.info("--- Generating Predictions ---")
prediction = model.generate_predictions(data=data)
return pd.concat([pd.Series(data.index, name='id'), prediction], axis=1)
# logger.info(pd.concat([data["id"], prediction], axis=1))
return pd.concat([data["id"], prediction], axis=1)
# Save prediction some where?
# prediction.to_csv("s3?")

View file

@ -1,4 +1,6 @@
boto3
autogluon==0.8.2
pandas==1.5.3
s3fs==2023.6.0
seaborn==0.12.2
matplotlib==3.7.2
pre-commit==3.3.3

View file

@ -1,6 +1,5 @@
boto3
autogluon==0.8.2
pandas==1.5.3
s3fs
seaborn==0.12.2
matplotlib==3.7.2
matplotlib==3.7.2

View file

@ -1,7 +1,6 @@
autogluon==0.8.2
pandas==1.5.3
seaborn==0.12.2
s3fs==2023.6.0
pre-commit==3.3.3
dvc
dvc[s3]

View file

@ -10,7 +10,7 @@ from core.Logger import logger
from core.Metrics import Metrics, sort_by_metric
from core.DataLoader import dataloader_factory
from core.FeatureProcessor import FeatureProcessor
from core.CloudClient import S3FSClient
from core.CloudClient import BotoClient
from core.RegistryHandler import RegistryHandler
from core.Settings import (
MODEL_DIRECTORY,
@ -30,7 +30,8 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local")
CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT)
# CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT)
# FOR TESTING
@ -105,8 +106,8 @@ def training(
logger.info("--- Loading data ---")
dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT)
train_df = dataloader.load(filepath=train_filepath)
test_df = dataloader.load(filepath=test_filepath)
train_df = dataloader.load(client=CLIENT, filepath=train_filepath)
test_df = dataloader.load(client=CLIENT, filepath=test_filepath)
if train_df is None or test_df is None:
raise ValueError("No data Loaded - cancelling pipeline")
@ -153,7 +154,7 @@ def training(
)
logger.info("--- Save Model ---")
model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT)
model.save_model(output_filepath=model.output_filepath, client=CLIENT)
logger.info("--- Generate evaluation metrics ---")
metrics = Metrics()
@ -166,8 +167,6 @@ def training(
metrics=metrics,
)
metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT)
logger.info("--- Generate metric outputs using predictions ---")
# metrics.generate_plot_suite()
@ -179,7 +178,7 @@ def training(
output_filepath=plot_output_path,
)
metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT)
metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT)
# TODO: for cml, we might want to have class that outputs all data and plots to add to the report
# If we want residual plot/ any plots, we will need to self host
@ -196,7 +195,7 @@ def training(
f"Optimised version of best model can be found at: {deployment_model_path}"
)
model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT)
model.save_model(output_filepath=deployment_model_path, client=CLIENT)
# TODO: Need a model registry - for now have this as a CSV
# Save this in the model directory
@ -208,7 +207,7 @@ def training(
registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE
registry_df = registry_handler.load_registry(
registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics
registry_path=registry_path, client=CLIENT, metrics=metrics
)
model_details_df = pd.DataFrame(
@ -235,7 +234,7 @@ def training(
registry_path.parent.mkdir(parents=True, exist_ok=True)
registry_df.to_csv(registry_path, index=False)
registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT)
registry_handler.save_registry(output_filepath=registry_path, client=CLIENT)
logger.info("--- Clean up ---")
if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists():

View file

@ -1,3 +1,6 @@
import boto3
import pandas as pd
from io import BytesIO
import re
from textblob import TextBlob
@ -24,3 +27,23 @@ def correct_spelling(text):
corrected_text = ' '.join(corrected_words)
return corrected_text
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.
:param df: The pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
"""
# Convert the DataFrame to a Parquet format in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer)
# Create the boto3 client
client = boto3.client('s3')
# Upload the Parquet file to S3
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())

View file

@ -6,7 +6,7 @@ from backend.Property import Property
from recommendations.rdsap_tables import default_wall_thickness, age_band_data
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_uvalue_estimate, estimate_sap_points
get_recommended_part, get_uvalue_estimate
)
suspended_floor_insulation_parts = [
@ -323,7 +323,7 @@ class FloorRecommendations(Definitions):
"description": self._make_floor_description(part, depth),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": estimate_sap_points(),
"sap_points": None,
"cost": estimated_cost,
}
)

View file

@ -6,7 +6,7 @@ from backend.Property import Property
from model_data.BaseUtility import Definitions
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_uvalue_estimate, estimate_sap_points
get_recommended_part, get_uvalue_estimate
)
external_wall_insulation_parts = [
@ -256,6 +256,10 @@ class WallRecommendations(Definitions):
is_solid_brick = self.property.walls["is_solid_brick"]
insulation_thickness = self.property.walls["insulation_thickness"]
# We check if the wall is already insulated and if so, we exit
if insulation_thickness in ["average", "above average"]:
return
if u_value:
if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT:
raise NotImplementedError("Haven't handled the case of other u value units yet")
@ -350,7 +354,7 @@ class WallRecommendations(Definitions):
"description": "Install " + self._make_description(part, depth),
"starting_u_value": u_value,
"new_u_value": new_u_value,
"sap_points": estimate_sap_points(),
"sap_points": None,
"cost": estimated_cost,
}
)
@ -432,7 +436,7 @@ class WallRecommendations(Definitions):
),
"starting_u_value": u_value,
"new_u_value": combined_new_u_value,
"sap_points": estimate_sap_points(),
"sap_points": None,
"cost": ewi_esimtated_cost + iwi_esimtated_cost,
}
self.recommendations.append(recommendation)

View file

@ -4,15 +4,6 @@ from statistics import mean
import random
def estimate_sap_points():
"""
This is a placeholder function. We will implement the proper version soon
:return:
"""
return random.sample(range(4, 12), 1)[0]
def r_value_per_mm_to_u_value(depth_mm: int, r_value_per_mm: float):
"""
Converts R-value per mm to U-value in W/m²K.

View file

@ -12,63 +12,50 @@ provider:
DOMAIN_NAME: ${env:DOMAIN_NAME}
ECR_URI: ${env:ECR_URI}
GITHUB_SHA: ${env:GITHUB_SHA}
iam:
role:
name: sapmodel_s3_access
statements:
# Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET
- Effect: Allow
Action:
- s3:*
# - s3:GetObject
# - s3:ListBucket
Resource:
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/*
- arn:aws:s3:::${env:DATA_BUCKET}
- arn:aws:s3:::${env:DATA_BUCKET}/*
# Allow reading and writing to PREDICTIONS_BUCKET
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:ListBucket
Resource:
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}/*
#plugins:
# - serverless-domain-manager
#
#custom:
# customDomain:
# domainName: api.${self:provider.environment.DOMAIN_NAME}
# basePath: 'sapmodel'
# createRoute53Record: true
# certificateArn: ${ssm:/ssl_certificate_arn}
plugins:
- serverless-domain-manager
custom:
customDomain:
domainName: api.${self:provider.environment.DOMAIN_NAME}
basePath: 'sapmodel'
createRoute53Record: true
certificateArn: ${ssm:/ssl_certificate_arn}
functions:
sap_prediction_lambda:
image:
uri: ${env:ECR_URI}:${env:GITHUB_SHA}
role: sapPredictionLambdaRole
# role: sapPredictionLambdaRole
events:
- http:
path: /predict
method: POST
resources:
Resources:
sapPredictionLambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: sap-prediction-lambda-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: sapPredictionLambdaS3Access
PolicyDocument:
Version: '2012-10-17'
Statement:
# Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET
- Effect: Allow
Action:
- s3:GetObject
- s3:ListBucket
Resource:
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}
- arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/*
- arn:aws:s3:::${env:DATA_BUCKET}
- arn:aws:s3:::${env:DATA_BUCKET}/*
# Allow reading and writing to PREDICTIONS_BUCKET
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:ListBucket
Resource:
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}
- arn:aws:s3:::${env:PREDICTIONS_BUCKET}/*
timeout: 120 # Set max run time to 2 minutes - we shouldn't need this much time so this can be reviewed

View file

@ -9,6 +9,7 @@ provider:
ENVIRONMENT: ${env:ENVIRONMENT}
SECRET_KEY: ${env:SECRET_KEY}
PLAN_TRIGGER_BUCKET: ${env:PLAN_TRIGGER_BUCKET}
DATA_BUCKET: ${env:DATA_BUCKET}
DOMAIN_NAME: ${env:DOMAIN_NAME}
EPC_AUTH_TOKEN: ${env:EPC_AUTH_TOKEN}
DB_HOST: ${env:DB_HOST}