diff --git a/.github/workflows/cml.yml b/.github/workflows/cml.yml index be06486b..3a0f4cae 100644 --- a/.github/workflows/cml.yml +++ b/.github/workflows/cml.yml @@ -9,6 +9,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + # TODO: use dvc to pull data, mkdir s3-mock, load data, then use docker compose + # - name: Build docker compose stack + # run: | + # cd model_data/simulation_system + # docker-compose up -d - uses: actions/setup-python@v4 - uses: iterative/setup-cml@v1 - name: Train model diff --git a/.github/workflows/deploy_fastapi_backend.yml b/.github/workflows/deploy_fastapi_backend.yml index 502a9eb8..8172e2cb 100644 --- a/.github/workflows/deploy_fastapi_backend.yml +++ b/.github/workflows/deploy_fastapi_backend.yml @@ -89,6 +89,7 @@ jobs: ENVIRONMENT: ${{ github.ref_name }} SECRET_KEY: ${{ secrets.NEXTAUTH_SECRET }} PLAN_TRIGGER_BUCKET: 'retrofit-plan-inputs-${{ github.ref_name }}' + DATA_BUCKET: 'retrofit-data-${{ github.ref_name }}' DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }} EPC_AUTH_TOKEN: ${{ steps.set_auth_token.outputs.auth_token }} DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }} diff --git a/.github/workflows/deploy_sap_model_lambda.yml b/.github/workflows/deploy_sap_model_lambda.yml index 9e1e7012..fb4b8dde 100644 --- a/.github/workflows/deploy_sap_model_lambda.yml +++ b/.github/workflows/deploy_sap_model_lambda.yml @@ -20,7 +20,7 @@ jobs: - name: Install Serverless and plugins run: | npm install -g serverless - # npm install -g serverless-domain-manager + npm install -g serverless-domain-manager - name: AWS credentials for dev if: github.ref == 'refs/heads/dev' diff --git a/backend/app/plan/columntypes.py b/backend/app/plan/columntypes.py new file mode 100644 index 00000000..5c859300 --- /dev/null +++ b/backend/app/plan/columntypes.py @@ -0,0 +1,32 @@ +columntypes = { + 'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object', + 'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64', + 'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64', + 'CONSTRUCTION_AGE_BAND': 'object', 'TRANSACTION_TYPE_STARTING': 'object', + 'WALLS_DESCRIPTION_STARTING': 'object', + 'FLOOR_DESCRIPTION_STARTING': 'object', 'LIGHTING_DESCRIPTION_STARTING': 'object', + 'ROOF_DESCRIPTION_STARTING': 'object', 'MAINHEAT_DESCRIPTION_STARTING': 'object', + 'HOTWATER_DESCRIPTION_STARTING': 'object', 'MAIN_FUEL_STARTING': 'object', + 'MECHANICAL_VENTILATION_STARTING': 'object', + 'SECONDHEAT_DESCRIPTION_STARTING': 'object', 'ENERGY_TARIFF_STARTING': 'object', + 'SOLAR_WATER_HEATING_FLAG_STARTING': 'object', 'PHOTO_SUPPLY_STARTING': 'float64', + 'WINDOWS_DESCRIPTION_STARTING': 'object', 'GLAZED_TYPE_STARTING': 'object', + 'MULTI_GLAZE_PROPORTION_STARTING': 'float64', 'LOW_ENERGY_LIGHTING_STARTING': 'float64', + 'NUMBER_OPEN_FIREPLACES_STARTING': 'float64', 'MAINHEATCONT_DESCRIPTION_STARTING': 'object', + 'EXTENSION_COUNT_STARTING': 'float64', 'LODGEMENT_DATE_STARTING': 'object', + 'TRANSACTION_TYPE_ENDING': 'object', + 'WALLS_DESCRIPTION_ENDING': 'object', 'FLOOR_DESCRIPTION_ENDING': 'object', + 'LIGHTING_DESCRIPTION_ENDING': 'object', + 'ROOF_DESCRIPTION_ENDING': 'object', 'MAINHEAT_DESCRIPTION_ENDING': 'object', + 'HOTWATER_DESCRIPTION_ENDING': 'object', + 'MAIN_FUEL_ENDING': 'object', 'MECHANICAL_VENTILATION_ENDING': 'object', + 'SECONDHEAT_DESCRIPTION_ENDING': 'object', + 'ENERGY_TARIFF_ENDING': 'object', 'SOLAR_WATER_HEATING_FLAG_ENDING': 'object', + 'PHOTO_SUPPLY_ENDING': 'float64', + 'WINDOWS_DESCRIPTION_ENDING': 'object', 'GLAZED_TYPE_ENDING': 'object', + 'MULTI_GLAZE_PROPORTION_ENDING': 'float64', + 'LOW_ENERGY_LIGHTING_ENDING': 'float64', 'NUMBER_OPEN_FIREPLACES_ENDING': 'float64', + 'MAINHEATCONT_DESCRIPTION_ENDING': 'object', 'EXTENSION_COUNT_ENDING': 'float64', + 'LODGEMENT_DATE_ENDING': 'object', + 'id': 'object' +} diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 15646c00..4b972a6a 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -15,6 +15,9 @@ from backend.app.db.utils import row2dict from starlette.responses import Response from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError +from datetime import datetime +import pandas as pd +import requests # database interaction functions from backend.app.db.functions.property_functions import ( @@ -22,16 +25,20 @@ from backend.app.db.functions.property_functions import ( ) from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.recommendations_functions import ( - create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations, - upload_recommendations + create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.connection import db_engine +from backend.app.plan.columntypes import columntypes from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser -from backend.app.utils import epc_to_sap_lower_bound +from backend.app.utils import epc_to_sap_lower_bound, save_dataframe_to_s3_parquet, read_parquet_from_s3 from model_data.optimiser.optimiser_functions import prepare_input_measures +from model_data.simulation_system.core.DataProcessor import DataProcessor +from model_data.simulation_system.core.Settings import ( + FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON +) # TODO: This is placeholder until data is stored in DB from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls @@ -212,6 +219,8 @@ async def trigger_plan(body: PlanTriggerRequest): # TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers # in as a dependency and then the optimisers can take the input measures in as part of the setup() method recommendations = {} + recommendations_scoring_data = [] + for p in input_properties: property_recommendations = [] @@ -323,7 +332,111 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations - # Once we're done, we'll store: + # Finally, we'll prepare data for predicting the impact on SAP + # TODO: We should use the cleaned data from get_components in the data rather than the raw + # values. We should create a method in Property which takes the EPC data and inserts the cleaned + # data + epc_data = p.data.copy() + epc_data = pd.DataFrame([epc_data]) + epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns] + starting_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_STARTING") + ending_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_ENDING") + fixed_data = epc_data[FIXED_FEATURES] + + # We update the ending record with the recommended updates and we set lodgement date to today + ending_epc_data["LODGEMENT_DATE_ENDING"] = datetime.now().strftime("%Y-%m-%d") + + scoring_map = { + 'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)', + 'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)', + 'Solid, no insulation (assumed)': 'Solid, insulated (assumed)', + } + for rec in property_recommendations: + scoring_dict = { + "UPRN": p.data["uprn"], + "id": "+".join([str(p.id), str(rec["recommendation_id"])]), + "LOCAL_AUTHORITY": p.data["local-authority"], + **starting_epc_data.to_dict("records")[0], + **ending_epc_data.to_dict("records")[0], + **fixed_data.to_dict("records")[0] + } + + # We update the description to indicate it's insulated + if rec["type"] == "wall_insulation": + scoring_dict["WALLS_DESCRIPTION_ENDING"] = scoring_map[p.walls["clean_description"]] + elif rec["type"] == "floor_insulation": + scoring_dict["FLOOR_DESCRIPTION_ENDING"] = scoring_map[p.floor["clean_description"]] + else: + raise NotImplementedError("Implement me") + + recommendations_scoring_data.append(scoring_dict) + + recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) + + # Clean the data + cleaning_data = read_parquet_from_s3( + bucket_name="retrofit-data-dev", + file_key="sap_change_model/cleaning_dataset.parquet", + ) + cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"}) + # Merge the cleaning data onto recommendations_scoring_data + + recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[ + ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] + ].replace("", None) + + # Perform the same cleaning as in the model + recommendations_scoring_data = DataProcessor.apply_averages_cleaning( + data_to_clean=recommendations_scoring_data, + cleaning_data=cleaning_data, + cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"] + ) + recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"]) + + # Note: We might need to perform the full pre-processing here + data_processor = DataProcessor(filepath=None) + data_processor.insert_data(recommendations_scoring_data) + data_processor.remap_columns() + recommendations_scoring_data = data_processor.data + + # Remap column types + recommendations_scoring_data = recommendations_scoring_data.astype(columntypes) + # Store parquet file in s3 for scoring + created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( + portfolio_id=body.portfolio_id, + timestamp=created_at + ) + + save_dataframe_to_s3_parquet( + df=recommendations_scoring_data, + bucket_name="retrofit-data-dev", + file_key=file_location + ) + + # Call the sap change model + response = requests.post( + url="https://api.dev.hestia.homes/sapmodel/predict", + json={ + "file_location": "s3://retrofit-data-dev/" + file_location, + "property_id": 999, + "portfolio_id": 4, + "created_at": created_at + } + ) + # TODO: Handle the response depending on response code + + # Retrieve the predictions + predictions = read_csv_from_s3( + bucket_name="retrofit-sap-predictions-dev", + filepath=f"{body.portfolio_id}/999/{created_at}.csv" + ) + predictions = pd.DataFrame(predictions) + # We round the predictions + predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(0) + # Extract property_id and recommendation_id + predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True) + # 1) the property data # 2) the property details (epc) # 3) the recommendations @@ -344,6 +457,16 @@ async def trigger_plan(body: PlanTriggerRequest): if not recommendations_to_upload: continue + property_predictions = predictions[predictions["property_id"] == str(p.id)] + for rec in recommendations_to_upload: + # Insert the prediction for sap points + rec["sap_points"] = property_predictions[property_predictions["recommendation_id"] == str( + rec["recommendation_id"] + )]["RDSAP_CHANGE"].values[0] + + if not rec["sap_points"]: + raise ValueError("Sap points missing") + # Create a plan new_plan_id = create_plan( session, diff --git a/backend/app/utils.py b/backend/app/utils.py index bc052e74..7099eba1 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -4,6 +4,8 @@ from io import StringIO import string import secrets import logging +import pandas as pd +from io import BytesIO def setup_logger(log_file=None, level=logging.INFO, overwrite_handler=False): @@ -117,3 +119,36 @@ def epc_to_sap_lower_bound(epc: str): return 1 else: raise ValueError("EPC rating should be between A and G") + + +def read_parquet_from_s3(bucket_name, file_key): + client = boto3.client('s3') + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read() + df = pd.read_parquet(BytesIO(csv_body)) + + return df + + +def save_dataframe_to_s3_parquet(df, bucket_name, file_key): + """ + Save a pandas DataFrame to S3 as a Parquet file. + + :param df: The pandas DataFrame. + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket) + """ + + # Convert the DataFrame to a Parquet format in memory + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer) + + # Create the boto3 client + s3 = boto3.resource('s3') + + # Upload the Parquet file to S3 + s3.Object(bucket_name, file_key).put(Body=parquet_buffer.getvalue()) diff --git a/backend/docker/Dockerfile b/backend/docker/Dockerfile index f23435a0..fd498cdb 100644 --- a/backend/docker/Dockerfile +++ b/backend/docker/Dockerfile @@ -40,6 +40,7 @@ COPY ./model_data/config.py ./model_data/config.py COPY ./model_data/optimiser/ ./model_data/optimiser/ COPY ./model_data/__init__.py ./model_data/__init__.py COPY ./model_data/EpcClean.py ./model_data/EpcClean.py +COPT ./model_data/simulation_system/core/ ./model_data/simulation_system/core/ COPY ./model_data/utils.py ./model_data/utils.py COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/ COPY ./datatypes/ ./datatypes/ diff --git a/backend/requirements/base.txt b/backend/requirements/base.txt index 0b7337e4..de173db2 100644 --- a/backend/requirements/base.txt +++ b/backend/requirements/base.txt @@ -32,3 +32,5 @@ psycopg2-binary pytz==2023.3 mip==1.15.0 boto3==1.28.3 +pandas==1.5.3 +pyarrow==12.0.1 \ No newline at end of file diff --git a/model_data/requirements/requirements.txt b/model_data/requirements/requirements.txt index 28fce331..d4de6b71 100644 --- a/model_data/requirements/requirements.txt +++ b/model_data/requirements/requirements.txt @@ -18,3 +18,5 @@ statsmodels scikit-learn pyspellchecker textblob +boto3 +pyarrow diff --git a/model_data/simulation_system/.gitignore b/model_data/simulation_system/.gitignore new file mode 100644 index 00000000..5ec28995 --- /dev/null +++ b/model_data/simulation_system/.gitignore @@ -0,0 +1 @@ +local_model/* \ No newline at end of file diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.prediction b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction index c0efdb5f..88cea6c4 100644 --- a/model_data/simulation_system/Dockerfiles/Dockerfile.prediction +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.prediction @@ -6,6 +6,7 @@ ARG GID=100 # Install patches RUN apt-get update && apt-get upgrade -y \ + && apt-get install libgomp1 -y \ && apt-get clean \ && rm -rf /var/lib/apt/lists @@ -34,4 +35,4 @@ USER ${USER} WORKDIR /home/simulation_system # Run the python command -CMD ["python3", "predictions.py", "--data-path", "./model_build_data/change_data/rdsap_full/test_data.parquet"] +CMD ["python3", "predictions.py", "--data-path", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet"] diff --git a/model_data/simulation_system/Dockerfiles/Dockerfile.training b/model_data/simulation_system/Dockerfiles/Dockerfile.training index dcba0499..1eadee7b 100644 --- a/model_data/simulation_system/Dockerfiles/Dockerfile.training +++ b/model_data/simulation_system/Dockerfiles/Dockerfile.training @@ -6,6 +6,7 @@ ARG GID=100 # Install patches RUN apt-get update && apt-get upgrade -y \ + && apt-get install libgomp1 -y \ && apt-get clean \ && rm -rf /var/lib/apt/lists @@ -34,4 +35,4 @@ USER ${USER} WORKDIR /home/simulation_system # Run the python command -CMD ["python3", "training.py", "--train-filepath", "./model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "./model_build_data/change_data/rdsap_full/test_data.parquet"] +CMD ["python3", "training.py", "--train-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/train_validation_data.parquet", "--test-filepath", "s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet"] diff --git a/model_data/simulation_system/MLModel/Models.py b/model_data/simulation_system/MLModel/Models.py index 4a520e4b..ac8f1409 100644 --- a/model_data/simulation_system/MLModel/Models.py +++ b/model_data/simulation_system/MLModel/Models.py @@ -8,6 +8,7 @@ Key tasks: - Generate Inference """ +import os from typing import Any from pathlib import Path import pandas as pd @@ -15,7 +16,7 @@ from autogluon.tabular import TabularDataset, TabularPredictor from core.Logger import logger from core.Metrics import Metrics from core.Settings import METRIC_FILENAME -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient AUTOGLUON_HYPERPARAMETERS = [ "problem_type", @@ -54,34 +55,55 @@ class AutogluonModel: def load_model( self, filepath: str | Path, - s3_client: S3FSClient, + client: BotoClient, model_folder: str = "local_model", ) -> None: """ Providing a path, this function will load the model to be used. Will load to internal variable """ filepath = str(filepath) - if s3_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") self.model = TabularPredictor.load(path=filepath) else: - logger.info(f"Loading model from s3") - s3_client.download_model(filepath=filepath, model_folder=model_folder) - self.model = TabularPredictor.load(path=model_folder) + logger.info(f"Loading model from s3 with filepath: %s and model_folder: %s" % (filepath, model_folder)) + client.download_model(filepath=filepath, model_folder=model_folder) + self.model = TabularPredictor.load(path=str(Path(model_folder) / filepath)) - def save_model(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def save_model(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the model to be used. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") logger.info("Using AutoGluon Model - Model saving already occured") else: logger.info(f"Saving model into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + + self.directory_upload( + client=client, + local_directory=str(output_filepath), + bucket_name=client.model_bucket, + ) + logger.info("Save complete") + def directory_upload(self, client, local_directory, bucket_name): + + # Iterate through the local directory and upload each file + for root, dirs, files in os.walk(local_directory): + for file in files: + # Determine the local file path and S3 object key + local_file_path = os.path.join(root, file) + s3_object_key = os.path.relpath(local_file_path, local_directory) + + # Upload the file to S3 + client.client.upload_file(local_file_path, bucket_name, local_file_path) + + logger.info( + f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}" + ) + def train_model( self, data: pd.DataFrame, target_column: str, hyperparameters: dict ) -> None: diff --git a/model_data/simulation_system/core/CloudClient.py b/model_data/simulation_system/core/CloudClient.py index 5af65da5..8656a22e 100644 --- a/model_data/simulation_system/core/CloudClient.py +++ b/model_data/simulation_system/core/CloudClient.py @@ -3,17 +3,105 @@ Set up the client to be used for downloading and uploading model files """ import os -import s3fs +import boto3 from core.Logger import logger -class S3FSClient: +# class S3FSClient: +# """ +# Set up the correct client to upload files to s3 +# """ + +# def __init__(self, runtime_environment: str = "local") -> None: +# self.client: s3fs.S3FileSystem | None = None +# self.model_bucket: str + +# self.client_factory(runtime_environment) +# self.determine_model_bucket(runtime_environment) + +# def client_factory(self, runtime_environment: str = "local"): +# """ +# Select the correct s3 client to use +# """ + +# if runtime_environment == "local": +# logger.info("No S3 client setup required") +# elif runtime_environment == "local-mock": +# logger.info(f"S3 settings for {runtime_environment}") +# self.client = s3fs.S3FileSystem( +# key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), +# secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), +# client_kwargs={ +# "endpoint_url": os.environ.get( +# "ENDPOINT_URL", "http://localhost:9000" +# ) +# }, +# ) +# elif runtime_environment in ["dev", "staging", "prod"]: +# logger.info(f"S3 settings for {runtime_environment}") +# # Key/ token should be in session/lambda for this +# self.client = s3fs.S3FileSystem() +# else: +# raise NotImplementedError("No correspnding runtime environment") + +# def determine_model_bucket(self, runtime_environment: str) -> None: +# """ +# For the given environment, return the correct bucket for models +# """ +# if runtime_environment == "local": +# logger.info("In local development - no need for s3") +# elif runtime_environment in ["local-mock", "dev"]: +# # TODO: get from enironment +# self.model_bucket = "retrofit-model-directory-dev" +# elif runtime_environment in ["staging", "prod"]: +# self.model_bucket = f"retrofit-model-directory-{runtime_environment}" +# else: +# raise NotImplementedError("No corresponding runtime environment") + +# def download_model(self, filepath: str, model_folder: str): +# """ +# For the file path, download the model locally so that we can load the model +# """ + +# if self.client is None: +# logger.info("No need to download model as local development") +# else: + +# def list_files_recursively(folder_path, client): +# all_files = [] +# for root, dirs, files in client.walk(folder_path): +# for file in files: +# s3_path = os.path.join(root, file) +# all_files.append(s3_path) +# return all_files + +# # List all files in the specified S3 folder and its subfolders +# files = list_files_recursively( +# f"{self.model_bucket}/{filepath}", client=self.client +# ) + +# # Download each file +# for file in files: +# # Extract the filename from the S3 path +# filename = file.split(filepath)[-1] + +# # Define the local path where you want to save the file +# local_path = os.path.join(model_folder, filename) + +# # Download the file from S3 to the local directory +# self.client.get(file, local_path) +# print(f"Downloaded {filename} to {local_path}") + +# print("Download completed.") + + +class BotoClient: """ - Set up the correct client to upload files to s3 + Using boto3 to access the different aws storage configurations """ def __init__(self, runtime_environment: str = "local") -> None: - self.client: s3fs.S3FileSystem | None = None + self.client = None self.model_bucket: str self.client_factory(runtime_environment) @@ -28,19 +116,19 @@ class S3FSClient: logger.info("No S3 client setup required") elif runtime_environment == "local-mock": logger.info(f"S3 settings for {runtime_environment}") - self.client = s3fs.S3FileSystem( - key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), - secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), - client_kwargs={ - "endpoint_url": os.environ.get( - "ENDPOINT_URL", "http://localhost:9000" - ) - }, + session = boto3.Session() + self.client = session.client( + service_name="s3", + aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "admin"), + aws_secret_access_key=os.environ.get( + "AWS_SECRET_ACCESS_KEY", "password" + ), + endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000"), ) elif runtime_environment in ["dev", "staging", "prod"]: logger.info(f"S3 settings for {runtime_environment}") # Key/ token should be in session/lambda for this - self.client = s3fs.S3FileSystem() + self.client = boto3.client("s3") else: raise NotImplementedError("No correspnding runtime environment") @@ -62,34 +150,34 @@ class S3FSClient: """ For the file path, download the model locally so that we can load the model """ + # List all objects with the specified prefix in the bucket if self.client is None: - logger.info("No need to download model as local development") - else: + raise ValueError("SHould not be in here!") - def list_files_recursively(folder_path, client): - all_files = [] - for root, dirs, files in client.walk(folder_path): - for file in files: - s3_path = os.path.join(root, file) - all_files.append(s3_path) - return all_files + objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath) - # List all files in the specified S3 folder and its subfolders - files = list_files_recursively( - f"{self.model_bucket}/{filepath}", client=self.client + # Ensure the local directory for downloads exists + if not os.path.exists(model_folder): + os.makedirs(model_folder) + + # Download each object with the specified prefix + for obj in objects.get("Contents", []): + # Get the object key (file path) + object_key = obj["Key"] + + # Determine the local file path to save the object + local_file_path = os.path.join( + model_folder, object_key.split(f"{filepath}/")[-1] ) - # Download each file - for file in files: - # Extract the filename from the S3 path - filename = file.split(filepath)[-1] + # Create the local directory if it doesn't exist + local_directory = os.path.dirname(local_file_path) + if not os.path.exists(local_directory): + os.makedirs(local_directory) - # Define the local path where you want to save the file - local_path = os.path.join("local_model", filename) + # Download the object from S3 to the local directory + self.client.download_file(self.model_bucket, object_key, local_file_path) + print(f"Downloaded {object_key} to {local_file_path}") - # Download the file from S3 to the local directory - self.client.get(file, local_path) - print(f"Downloaded {filename} to {local_path}") - - print("Download completed.") + print("Download completed.") diff --git a/model_data/simulation_system/core/DataLoader.py b/model_data/simulation_system/core/DataLoader.py index 9cb82a8a..c5c10b0a 100644 --- a/model_data/simulation_system/core/DataLoader.py +++ b/model_data/simulation_system/core/DataLoader.py @@ -3,9 +3,10 @@ import os from typing import Protocol import boto3 from io import BytesIO, StringIO +from core.CloudClient import BotoClient -def read_parquet_from_s3(bucket_name, file_key): +def read_parquet_from_s3(client, bucket_name, file_key): """ Read a CSV file from S3 using boto3 and pandas. @@ -15,11 +16,9 @@ def read_parquet_from_s3(bucket_name, file_key): :param aws_secret_access_key: AWS Secret Access Key :return: DataFrame containing the CSV data. """ - # Initialize the S3 client - s3_client = boto3.client("s3") # Get the object - s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) # Read the CSV body into a DataFrame csv_body = s3_object["Body"].read() @@ -28,7 +27,7 @@ def read_parquet_from_s3(bucket_name, file_key): return df -def read_csv_from_s3(bucket_name, file_key, index_col): +def read_csv_from_s3(client, bucket_name, file_key, index_col): """ Read a CSV file from S3 using boto3 and pandas. @@ -38,11 +37,9 @@ def read_csv_from_s3(bucket_name, file_key, index_col): :param aws_secret_access_key: AWS Secret Access Key :return: DataFrame containing the CSV data. """ - # Initialize the S3 client - s3_client = boto3.client("s3") # Get the object - s3_object = s3_client.get_object(Bucket=bucket_name, Key=file_key) + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) # Read the CSV body into a DataFrame csv_body = s3_object["Body"].read().decode("utf-8") @@ -57,7 +54,9 @@ class DataLoader(Protocol): """ @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame | None: + def load( + client: BotoClient, filepath: str, index_col: str | None = None + ) -> pd.DataFrame | None: """ Loading data from the relevant source """ @@ -69,7 +68,9 @@ class LocalDataLoader: """ @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: + def load( + client: BotoClient, filepath: str, index_col: str | None = None + ) -> pd.DataFrame: if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") @@ -86,56 +87,36 @@ class LocalDataLoader: return df -class S3MockDataLoader: +class S3DataLoader: """ Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service """ @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: - - # TODO: Ingest these as environment variables in the docker compose file - storage_options = { - "key": os.environ.get("AWS_ACCESS_KEY_ID", "admin"), - "secret": os.environ.get("AWS_SECRET_ACCESS_KEY", "password"), - "client_kwargs": { - "endpoint_url": os.environ.get("ENDPOINT_URL", "http://localhost:9000") - }, - } + def load( + client: BotoClient, filepath: str, index_col: str | None = None + ) -> pd.DataFrame: if not filepath.startswith("s3://"): filepath = "s3://" + filepath - if filepath.endswith(".parquet"): - df = pd.read_parquet(filepath, storage_options=storage_options) - if index_col is not None: - df = df.set_index(index_col) - elif filepath.endswith(".csv"): - df = pd.read_csv( - filepath, index_col=index_col, storage_options=storage_options - ) - else: - raise ValueError(f"File format not supported for file: {filepath}") - - return df - - -class S3DataLoader: - """ - Implements the DataLoader Protocol for s3 files - """ - - @staticmethod - def load(filepath: str, index_col: str | None = None) -> pd.DataFrame: filepath_split = filepath.split("s3://")[-1].split("/", 1) bucket = filepath_split[0] key = filepath_split[1] + if filepath.endswith(".parquet"): - df = read_parquet_from_s3(bucket, key) + df = read_parquet_from_s3( + client=client.client, bucket_name=bucket, file_key=key + ) if index_col is not None: df = df.set_index(index_col) elif filepath.endswith(".csv"): - df = read_csv_from_s3(bucket, key, index_col) + df = read_csv_from_s3( + client=client.client, + bucket_name=bucket, + file_key=key, + index_col=index_col, + ) else: raise ValueError(f"File format not supported for file: {filepath}") @@ -152,7 +133,7 @@ def dataloader_factory(runtime_environment: str | None = None) -> DataLoader: dataloader_types = { "local": LocalDataLoader(), - "local-mock": S3MockDataLoader(), + "local-mock": S3DataLoader(), "dev": S3DataLoader(), "staging": S3DataLoader(), "prod": S3DataLoader(), diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index 4c37176d..313d5a25 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -21,17 +21,24 @@ class DataProcessor: Handle data loading and data preprocessing """ - def __init__(self, filepath: Path) -> None: + def __init__(self, filepath: Path | None) -> None: self.filepath = filepath + self.data: pd.DataFrame def load_data(self, low_memory=False) -> None: + if not self.filepath: + raise ValueError("No filepath specified") self.data = pd.read_csv(self.filepath, low_memory=low_memory) + def insert_data(self, data: pd.DataFrame) -> None: + self.data = data + def pre_process(self) -> pd.DataFrame: """ Load data and begin initial cleaning """ - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + if not self.data: + self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) self.confine_data() # TODO: CLean number of heated rooms and habitable rooms @@ -87,7 +94,7 @@ class DataProcessor: # Remap certain columns data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) - data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) + data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) self.data = data @@ -264,3 +271,42 @@ class DataProcessor: self.data["MULTI_GLAZE_PROPORTION"] ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 + + @staticmethod + def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on): + """ + Clean the input DataFrame using averages from a cleaning DataFrame. + + :param data_to_clean: DataFrame to be cleaned. + :param cleaning_data: DataFrame containing data for cleaning. + :param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this + differs depending on where the function is being used. + :return: Cleaned DataFrame. + """ + # Enforce data types + for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: + data_to_clean[col] = data_to_clean[col].astype(float) + + # Identify columns with non-NaN values + columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist() + + # Calculate averages + cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg( + {"TOTAL_FLOOR_AREA": "mean", "FLOOR_HEIGHT": "mean"} + ) + + # Merge with the original data + data_to_clean = pd.merge( + data_to_clean, + cleaning_averages_to_merge, + on=columns_to_merge_on, + suffixes=("", "_AVERAGE"), + how="left", + ) + + # Fill NaN values with averages + for col in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]: + data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) + data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) + + return data_to_clean diff --git a/model_data/simulation_system/core/Metrics.py b/model_data/simulation_system/core/Metrics.py index d1382d94..9369587d 100644 --- a/model_data/simulation_system/core/Metrics.py +++ b/model_data/simulation_system/core/Metrics.py @@ -5,11 +5,12 @@ Key tasks: - Given a model and test data, produce a suite of all metrics """ +import os import pandas as pd from pathlib import Path import seaborn as sns import matplotlib.pyplot as plt -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Logger import logger from core.Settings import ( RESIDUAL_TRUE_LABEL, @@ -64,18 +65,40 @@ class Metrics: All metric functions used to generate a dictionary of metrics """ - def upload_metrics(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the metrics folders/files. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need to upload") else: logger.info(f"Saving metrics into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + s3_location = client.model_bucket + "/" + str(output_filepath) + + self.directory_upload( + client=client, + local_directory=str(output_filepath), + bucket_name=client.model_bucket, + ) + logger.info("Save complete") + def directory_upload(self, client, local_directory, bucket_name): + + # Iterate through the local directory and upload each file + for root, dirs, files in os.walk(local_directory): + for file in files: + # Determine the local file path and S3 object key + local_file_path = os.path.join(root, file) + s3_object_key = os.path.relpath(local_file_path, local_directory) + + # Upload the file to S3 + client.client.upload_file(local_file_path, bucket_name, local_file_path) + + logger.info( + f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}" + ) + @staticmethod def list_metric_functions() -> list: """ diff --git a/model_data/simulation_system/core/RegistryHandler.py b/model_data/simulation_system/core/RegistryHandler.py index e492b2d2..e7ec641c 100644 --- a/model_data/simulation_system/core/RegistryHandler.py +++ b/model_data/simulation_system/core/RegistryHandler.py @@ -2,38 +2,65 @@ """ +from io import StringIO import pandas as pd from pathlib import Path from core.Logger import logger -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.Metrics import Metrics from core.Settings import BEST_MODEL_COLUMN_NAME +def read_csv_from_s3(client, bucket_name, file_key, index_col): + """ + Read a CSV file from S3 using boto3 and pandas. + + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + :param aws_access_key_id: AWS Access Key ID + :param aws_secret_access_key: AWS Secret Access Key + :return: DataFrame containing the CSV data. + """ + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read().decode("utf-8") + df = pd.read_csv(StringIO(csv_body), index_col=index_col) + + return df + + class RegistryHandler: """ Handles the loading of the registry depending on the environment """ - def load_registry( - self, registry_path: Path, s3fs_client: S3FSClient, metrics: Metrics - ): + def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics): """ Depening on the environment, we will have to load from locally or s3 (mock/real) """ - if s3fs_client.client is None: + if client.client is None: logger.info("Using local development - no need for s3 load") return self.load_local_registry( registry_path=registry_path, metrics=metrics ) - s3_location = "s3://" + s3fs_client.model_bucket + "/" + str(registry_path) - logger.info(f"Check if registry exists") - if s3fs_client.client.exists(s3_location): - registry_df = pd.read_csv( - s3fs_client.client.open(s3_location), index_col=None + + check_exists = client.client.list_objects_v2( + Bucket=client.model_bucket, Prefix=str(registry_path) + ) + + if "Contents" in check_exists: + logger.info("Loading existing registry") + registry_df = read_csv_from_s3( + client=client.client, + bucket_name=client.model_bucket, + file_key=str(registry_path), + index_col=None, ) else: logger.info("No registry found - creating new one") @@ -70,14 +97,18 @@ class RegistryHandler: return registry_df - def save_registry(self, output_filepath: Path, s3fs_client: S3FSClient) -> None: + def save_registry(self, output_filepath: Path, client: BotoClient) -> None: """ Providing a path, this function will save the model to be used. """ - if s3fs_client.client is None: + if client.client is None: logger.info("In local development mode - no need for s3 client") else: logger.info(f"Saving registry into s3") - s3_location = s3fs_client.model_bucket + "/" + str(output_filepath) - s3fs_client.client.put(str(output_filepath), s3_location, recursive=True) + s3_location = client.model_bucket + "/" + str(output_filepath) + + client.client.upload_file( + str(output_filepath), client.model_bucket, str(output_filepath) + ) + logger.info("Save complete") diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index 7765f64a..9f6c2e12 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -99,7 +99,6 @@ COMPONENT_FEATURES = [ "WINDOWS_DESCRIPTION", "GLAZED_TYPE", "MULTI_GLAZE_PROPORTION", - "LIGHTING_DESCRIPTION", "LOW_ENERGY_LIGHTING", "NUMBER_OPEN_FIREPLACES", "MAINHEATCONT_DESCRIPTION", diff --git a/model_data/simulation_system/docker-compose.yml b/model_data/simulation_system/docker-compose.yml index b14efeed..b5d74435 100644 --- a/model_data/simulation_system/docker-compose.yml +++ b/model_data/simulation_system/docker-compose.yml @@ -18,19 +18,20 @@ services: timeout: 20s retries: 3 - # simulation_system_training: - # build: - # context: ./ - # dockerfile: ./Dockerfiles/Dockerfile.training - # image: simulation_system_training - # environment: - # ENDPOINT_URL: http://minio:9000/ - # AWS_ACCESS_KEY_ID: *MINIO_USER - # AWS_SECRET_ACCESS_KEY: *MINIO_PASS - # tty: true - # depends_on: - # minio: - # condition: service_healthy + simulation_system_training: + build: + context: ./ + dockerfile: ./Dockerfiles/Dockerfile.training + image: simulation_system_training + environment: + RUNTIME_ENVIRONMENT: local-mock + ENDPOINT_URL: http://minio:9000/ + AWS_ACCESS_KEY_ID: *MINIO_USER + AWS_SECRET_ACCESS_KEY: *MINIO_PASS + tty: true + depends_on: + minio: + condition: service_healthy # command: # ["bash"] @@ -40,6 +41,7 @@ services: # dockerfile: ./Dockerfiles/Dockerfile.prediction # image: simulation_system_prediction # environment: + # RUNTIME_ENVIRONMENT: local-mock # ENDPOINT_URL: http://minio:9000/ # AWS_ACCESS_KEY_ID: *MINIO_USER # AWS_SECRET_ACCESS_KEY: *MINIO_PASS @@ -47,7 +49,7 @@ services: # depends_on: # simulation_system_training: # condition: service_completed_successfully - # command: + # command: # ["bash"] diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index c1ca56a6..502f7a06 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -3,7 +3,7 @@ import pandas as pd from tqdm import tqdm from pathlib import Path -from core.Settings import ( +from simulation_system.core.Settings import ( MANDATORY_FIXED_FEATURES, AVERAGE_FIXED_FEATURES, LATEST_FIELD, @@ -12,9 +12,10 @@ from core.Settings import ( HEAT_DEMAND_RESPONSE, COLUMNS_TO_MERGE_ON, ) -from core.DataProcessor import DataProcessor +from simulation_system.core.DataProcessor import DataProcessor +from utils import save_dataframe_to_s3_parquet -DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates" +DATA_DIRECTORY = Path(__file__).parent / "simulation_system" / "data" / "all-domestic-certificates" # TODO: Have a look at temporal features @@ -30,6 +31,7 @@ def app(): directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] dataset = [] + cleaning_dataset = [] # 116 # 128048706 # PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic @@ -62,28 +64,10 @@ def app(): # property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1 # Extract the columns that are not all None - na_columns = property_data[COLUMNS_TO_MERGE_ON].isna().all() - cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list() - - # Get the corresponding groupby and merge, and fill in NA values - cleaning_averages_to_merge = cleaning_averages.groupby( - cleaned_columns_to_merge_on - )[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() - - modified_property_data = pd.merge( - property_data, - cleaning_averages_to_merge, - on=cleaned_columns_to_merge_on, - suffixes=["", "_AVERAGE"], - ) - modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[ - "TOTAL_FLOOR_AREA" - ].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"]) - modified_property_data["FLOOR_HEIGHT"] = modified_property_data[ - "FLOOR_HEIGHT" - ].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"]) - modified_property_data = modified_property_data.drop( - columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + modified_property_data = DataProcessor.apply_averages_cleaning( + data_to_clean=property_data, + cleaning_data=cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON ) for field in AVERAGE_FIXED_FEATURES: @@ -107,7 +91,7 @@ def app(): variable_data = modified_property_data[ COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] - ] + ] # Note: we look at changes between subsequent EPCS, however we could look at other permutations # e.g. first vs second, second vs third and also first vs third @@ -133,10 +117,10 @@ def app(): starting_record = starting_record[ COMPONENT_FEATURES + ["LODGEMENT_DATE"] - ].add_suffix("_STARTING") + ].add_suffix("_STARTING") ending_record = ending_record[ COMPONENT_FEATURES + ["LODGEMENT_DATE"] - ].add_suffix("_ENDING") + ].add_suffix("_ENDING") features = pd.concat([starting_record, ending_record]) @@ -150,7 +134,18 @@ def app(): } ) - dataset.extend(property_model_data) + dataset.append(property_model_data) + + cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0] + cleaning_dataset.append(cleaning_averages) + + # Store cleaning dataset in s3 as a parquet file + cleaning_dataset = pd.concat(cleaning_dataset) + save_dataframe_to_s3_parquet( + df=cleaning_dataset, + bucket_name="retrofit-data-dev", + file_key="sap_change_model/cleaning_dataset.parquet", + ) output = pd.DataFrame(dataset) output.to_parquet("./dataset.parquet") diff --git a/model_data/simulation_system/handlers/predictions_app.py b/model_data/simulation_system/handlers/predictions_app.py index ea764276..75d2aff6 100644 --- a/model_data/simulation_system/handlers/predictions_app.py +++ b/model_data/simulation_system/handlers/predictions_app.py @@ -1,3 +1,7 @@ +import boto3 +from botocore.exceptions import NoCredentialsError +import json +from io import StringIO import os import logging @@ -9,6 +13,33 @@ logger.setLevel(logging.INFO) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") +def upload_dataframe_to_s3(df, bucket, s3_file_name): + """ + Upload a pandas DataFrame to an S3 bucket as CSV + + :param df: DataFrame to upload + :param bucket: Bucket to upload to + :param s3_file_name: S3 object name + :return: True if file was uploaded, else False + """ + + # Initialize the S3 client + s3 = boto3.client('s3') + csv_buffer = StringIO() + + # Write the DataFrame to the buffer as CSV + df.to_csv(csv_buffer, index=False) + + try: + # Upload the CSV from the buffer to S3 + s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue()) + print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}") + return True + except NoCredentialsError: + print("Credentials not available") + return False + + def handler(event, context): """ Take in event and trigger the prediction pipeline @@ -19,24 +50,44 @@ def handler(event, context): # Assuming a file in a bucket landing for now? # Assuming we have a model to use - data_path = event["file_location"] - property_id = event["property_id"] - portfolio_id = event["portfolio_id"] - created_at = event["created_at"] + body = json.loads(event["body"]) + + data_path = body["file_location"] + property_id = body["property_id"] + portfolio_id = body["portfolio_id"] + created_at = body["created_at"] try: # We could fix the model path but for the moment, we just take the best model path based on the registry outputs = prediction(model_path=None, data_path=data_path) # Store into s3, with key of {portfolio_id}-{property_id} - storage_filepath = f"s3://retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}/{portfolio_id}/{property_id}/" \ - f"{created_at}.csv" - outputs.to_csv(storage_filepath) - return storage_filepath + storage_filepath = f"{portfolio_id}/{property_id}/{created_at}.csv" + + upload_dataframe_to_s3( + df=outputs, + bucket=f"retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}", + s3_file_name=storage_filepath + ) + + return { + "statusCode": 200, + "body": json.dumps({ + "message": "Successfully processed input", + "storage_filepath": storage_filepath + }) + } except (Exception, KeyError, ValueError) as e: logger.info("Prediction failed") logger.info(e) + return { + "statusCode": 500, + "body": json.dumps({ + "message": "Prediction failed", + "error": str(e) + }) + } if __name__ == "__main__": diff --git a/model_data/simulation_system/model_build_data/change_data/rdsap_full/test_data_with_id.parquet b/model_data/simulation_system/model_build_data/change_data/rdsap_full/test_data_with_id.parquet new file mode 100644 index 00000000..d41b23da Binary files /dev/null and b/model_data/simulation_system/model_build_data/change_data/rdsap_full/test_data_with_id.parquet differ diff --git a/model_data/simulation_system/predictions.py b/model_data/simulation_system/predictions.py index 1aa3defa..1b3827d8 100644 --- a/model_data/simulation_system/predictions.py +++ b/model_data/simulation_system/predictions.py @@ -5,13 +5,16 @@ Script to load MLModel class and generate predictions import os import json import argparse +from pathlib import Path import pandas as pd from typing import Optional from datetime import datetime from MLModel.Models import AutogluonModel from core.Logger import logger from core.DataLoader import dataloader_factory -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient +from core.Metrics import Metrics +from core.RegistryHandler import RegistryHandler from core.Settings import ( BASE_REGISTRY_PATH, REGISTRY_FILE, @@ -19,13 +22,13 @@ from core.Settings import ( PREDICTION_FILE, METADATA_FILE, TIMESTAMP_FORMAT, + MODEL_DIRECTORY, ) TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev") -CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) - +CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT) # FOR TESTING # For now just loading data first and then passing into function (i.e. as if we receive json data and convert to @@ -36,7 +39,7 @@ CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) # For testing in dev s3 # Data path can be passed as so: # python3 predictions.py --data-path s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet -# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data.parquet" +# data_path="s3://retrofit-data-dev/model_build_data/change_data/rdsap_full/test_data_with_id.parquet" def ingest_arguments() -> argparse.Namespace: @@ -77,17 +80,6 @@ def prediction( Main pipeline function """ - if RUNTIME_ENVIRONMENT == "local": - registry_path = BASE_REGISTRY_PATH / target_column / REGISTRY_FILE - - if registry_path is None or not registry_path.exists(): - logger.error("No registry path provided or registry doesn't exist") - exit(1) - elif RUNTIME_ENVIRONMENT == "dev": - registry_path = "s3://retrofit-model-directory-dev/model_directory/RDSAP_CHANGE/model_registry.csv" - else: - raise NotImplemented("TO be implemented") - if model_path is not None: logger.info("User specified a model to load - ignoring registry") model_location = model_path @@ -96,7 +88,17 @@ def prediction( else: # TODO: Think about where registry will sit/ type logger.info("Loading best model from registry") - registry_df = pd.read_csv(registry_path) + + metrics = Metrics() + registry_handler = RegistryHandler() + + registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE + + registry_df = registry_handler.load_registry( + registry_path=registry_path, client=CLIENT, metrics=metrics + ) + + # registry_df = pd.read_csv(registry_path) best_model_df = registry_df[registry_df["best_model"]] model_location = best_model_df["model_location"].values[0] @@ -115,18 +117,15 @@ def prediction( if data_path and data is None: logger.info("Loading data from provided path") dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) - data = dataloader.load(filepath=data_path, index_col="id") + data = dataloader.load(client=CLIENT, filepath=data_path, index_col=None) if data is None: raise ValueError("No data loaded") - # # TODO: DOWNSAMPLING DOWN TO JUST USE ONE FOR PREDICTION - # data = data.sample(1) else: logger.info("Using data provided") data = json.loads(str(data)) data = pd.DataFrame([data]) - print(data) logger.info("--- Loading Model ---") @@ -134,17 +133,19 @@ def prediction( logger.info("Using an Autogluon model") model = AutogluonModel() else: - logger.error("No other model currently") - exit(1) + raise ValueError("No other model currently") - model.load_model( - filepath=model_location, s3_client=CLIENT, model_folder="local_model" - ) + # In lambda, only the /tmp folder is writable + model_folder = "/tmp" if RUNTIME_ENVIRONMENT in ["dev", "prod"] else "local_model" + + model.load_model(filepath=model_location, client=CLIENT, model_folder=model_folder) logger.info("--- Generating Predictions ---") prediction = model.generate_predictions(data=data) - return pd.concat([pd.Series(data.index, name='id'), prediction], axis=1) + # logger.info(pd.concat([data["id"], prediction], axis=1)) + + return pd.concat([data["id"], prediction], axis=1) # Save prediction some where? # prediction.to_csv("s3?") diff --git a/model_data/simulation_system/requirements/predictions/predictions-dev.txt b/model_data/simulation_system/requirements/predictions/predictions-dev.txt index 48bbb3ca..a9e65dc2 100644 --- a/model_data/simulation_system/requirements/predictions/predictions-dev.txt +++ b/model_data/simulation_system/requirements/predictions/predictions-dev.txt @@ -1,4 +1,6 @@ +boto3 autogluon==0.8.2 pandas==1.5.3 -s3fs==2023.6.0 +seaborn==0.12.2 +matplotlib==3.7.2 pre-commit==3.3.3 diff --git a/model_data/simulation_system/requirements/predictions/predictions.txt b/model_data/simulation_system/requirements/predictions/predictions.txt index 6aeeaa45..af2d681e 100644 --- a/model_data/simulation_system/requirements/predictions/predictions.txt +++ b/model_data/simulation_system/requirements/predictions/predictions.txt @@ -1,6 +1,5 @@ boto3 autogluon==0.8.2 pandas==1.5.3 -s3fs seaborn==0.12.2 -matplotlib==3.7.2 \ No newline at end of file +matplotlib==3.7.2 diff --git a/model_data/simulation_system/requirements/training/training-dev.txt b/model_data/simulation_system/requirements/training/training-dev.txt index a8136586..e440dc89 100644 --- a/model_data/simulation_system/requirements/training/training-dev.txt +++ b/model_data/simulation_system/requirements/training/training-dev.txt @@ -1,7 +1,6 @@ autogluon==0.8.2 pandas==1.5.3 seaborn==0.12.2 -s3fs==2023.6.0 pre-commit==3.3.3 dvc dvc[s3] diff --git a/model_data/simulation_system/training.py b/model_data/simulation_system/training.py index 4cc19ed3..b68d1c4d 100644 --- a/model_data/simulation_system/training.py +++ b/model_data/simulation_system/training.py @@ -10,7 +10,7 @@ from core.Logger import logger from core.Metrics import Metrics, sort_by_metric from core.DataLoader import dataloader_factory from core.FeatureProcessor import FeatureProcessor -from core.CloudClient import S3FSClient +from core.CloudClient import BotoClient from core.RegistryHandler import RegistryHandler from core.Settings import ( MODEL_DIRECTORY, @@ -30,7 +30,8 @@ TIMESTAMP = datetime.now().strftime(TIMESTAMP_FORMAT) RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "local") -CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) +CLIENT = BotoClient(runtime_environment=RUNTIME_ENVIRONMENT) +# CLIENT = S3FSClient(runtime_environment=RUNTIME_ENVIRONMENT) # FOR TESTING @@ -105,8 +106,8 @@ def training( logger.info("--- Loading data ---") dataloader = dataloader_factory(runtime_environment=RUNTIME_ENVIRONMENT) - train_df = dataloader.load(filepath=train_filepath) - test_df = dataloader.load(filepath=test_filepath) + train_df = dataloader.load(client=CLIENT, filepath=train_filepath) + test_df = dataloader.load(client=CLIENT, filepath=test_filepath) if train_df is None or test_df is None: raise ValueError("No data Loaded - cancelling pipeline") @@ -153,7 +154,7 @@ def training( ) logger.info("--- Save Model ---") - model.save_model(output_filepath=model.output_filepath, s3fs_client=CLIENT) + model.save_model(output_filepath=model.output_filepath, client=CLIENT) logger.info("--- Generate evaluation metrics ---") metrics = Metrics() @@ -166,8 +167,6 @@ def training( metrics=metrics, ) - metrics.upload_metrics(output_filepath=metric_output_path, s3fs_client=CLIENT) - logger.info("--- Generate metric outputs using predictions ---") # metrics.generate_plot_suite() @@ -179,7 +178,7 @@ def training( output_filepath=plot_output_path, ) - metrics.upload_metrics(output_filepath=plot_output_path, s3fs_client=CLIENT) + metrics.upload_metrics(output_filepath=metric_output_path, client=CLIENT) # TODO: for cml, we might want to have class that outputs all data and plots to add to the report # If we want residual plot/ any plots, we will need to self host @@ -196,7 +195,7 @@ def training( f"Optimised version of best model can be found at: {deployment_model_path}" ) - model.save_model(output_filepath=deployment_model_path, s3fs_client=CLIENT) + model.save_model(output_filepath=deployment_model_path, client=CLIENT) # TODO: Need a model registry - for now have this as a CSV # Save this in the model directory @@ -208,7 +207,7 @@ def training( registry_path = Path(MODEL_DIRECTORY) / target_column / REGISTRY_FILE registry_df = registry_handler.load_registry( - registry_path=registry_path, s3fs_client=CLIENT, metrics=metrics + registry_path=registry_path, client=CLIENT, metrics=metrics ) model_details_df = pd.DataFrame( @@ -235,7 +234,7 @@ def training( registry_path.parent.mkdir(parents=True, exist_ok=True) registry_df.to_csv(registry_path, index=False) - registry_handler.save_registry(output_filepath=registry_path, s3fs_client=CLIENT) + registry_handler.save_registry(output_filepath=registry_path, client=CLIENT) logger.info("--- Clean up ---") if RUNTIME_ENVIRONMENT != "local" and Path(MODEL_DIRECTORY).exists(): diff --git a/model_data/utils.py b/model_data/utils.py index 744914a4..07642973 100644 --- a/model_data/utils.py +++ b/model_data/utils.py @@ -1,3 +1,6 @@ +import boto3 +import pandas as pd +from io import BytesIO import re from textblob import TextBlob @@ -24,3 +27,23 @@ def correct_spelling(text): corrected_text = ' '.join(corrected_words) return corrected_text + + +def save_dataframe_to_s3_parquet(df, bucket_name, file_key): + """ + Save a pandas DataFrame to S3 as a Parquet file. + + :param df: The pandas DataFrame. + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + """ + + # Convert the DataFrame to a Parquet format in memory + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer) + + # Create the boto3 client + client = boto3.client('s3') + + # Upload the Parquet file to S3 + client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue()) diff --git a/recommendations/FloorRecommendations.py b/recommendations/FloorRecommendations.py index 3d53da69..b111090f 100644 --- a/recommendations/FloorRecommendations.py +++ b/recommendations/FloorRecommendations.py @@ -6,7 +6,7 @@ from backend.Property import Property from recommendations.rdsap_tables import default_wall_thickness, age_band_data from recommendations.recommendation_utils import ( r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, - get_recommended_part, get_uvalue_estimate, estimate_sap_points + get_recommended_part, get_uvalue_estimate ) suspended_floor_insulation_parts = [ @@ -323,7 +323,7 @@ class FloorRecommendations(Definitions): "description": self._make_floor_description(part, depth), "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": estimate_sap_points(), + "sap_points": None, "cost": estimated_cost, } ) diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index fdd271be..9edbe969 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -6,7 +6,7 @@ from backend.Property import Property from model_data.BaseUtility import Definitions from recommendations.recommendation_utils import ( r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, - get_recommended_part, get_uvalue_estimate, estimate_sap_points + get_recommended_part, get_uvalue_estimate ) external_wall_insulation_parts = [ @@ -256,6 +256,10 @@ class WallRecommendations(Definitions): is_solid_brick = self.property.walls["is_solid_brick"] insulation_thickness = self.property.walls["insulation_thickness"] + # We check if the wall is already insulated and if so, we exit + if insulation_thickness in ["average", "above average"]: + return + if u_value: if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT: raise NotImplementedError("Haven't handled the case of other u value units yet") @@ -350,7 +354,7 @@ class WallRecommendations(Definitions): "description": "Install " + self._make_description(part, depth), "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": estimate_sap_points(), + "sap_points": None, "cost": estimated_cost, } ) @@ -432,7 +436,7 @@ class WallRecommendations(Definitions): ), "starting_u_value": u_value, "new_u_value": combined_new_u_value, - "sap_points": estimate_sap_points(), + "sap_points": None, "cost": ewi_esimtated_cost + iwi_esimtated_cost, } self.recommendations.append(recommendation) diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index d35befd7..e53aeb17 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -4,15 +4,6 @@ from statistics import mean import random -def estimate_sap_points(): - """ - This is a placeholder function. We will implement the proper version soon - :return: - """ - - return random.sample(range(4, 12), 1)[0] - - def r_value_per_mm_to_u_value(depth_mm: int, r_value_per_mm: float): """ Converts R-value per mm to U-value in W/m²K. diff --git a/sapmodel.serverless.yml b/sapmodel.serverless.yml index c88eb952..77d9fc1f 100644 --- a/sapmodel.serverless.yml +++ b/sapmodel.serverless.yml @@ -12,63 +12,50 @@ provider: DOMAIN_NAME: ${env:DOMAIN_NAME} ECR_URI: ${env:ECR_URI} GITHUB_SHA: ${env:GITHUB_SHA} + iam: + role: + name: sapmodel_s3_access + statements: + # Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET + - Effect: Allow + Action: + - s3:* + # - s3:GetObject + # - s3:ListBucket + Resource: + - arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET} + - arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/* + - arn:aws:s3:::${env:DATA_BUCKET} + - arn:aws:s3:::${env:DATA_BUCKET}/* + # Allow reading and writing to PREDICTIONS_BUCKET + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - s3:ListBucket + Resource: + - arn:aws:s3:::${env:PREDICTIONS_BUCKET} + - arn:aws:s3:::${env:PREDICTIONS_BUCKET}/* -#plugins: -# - serverless-domain-manager -# -#custom: -# customDomain: -# domainName: api.${self:provider.environment.DOMAIN_NAME} -# basePath: 'sapmodel' -# createRoute53Record: true -# certificateArn: ${ssm:/ssl_certificate_arn} + +plugins: + - serverless-domain-manager + +custom: + customDomain: + domainName: api.${self:provider.environment.DOMAIN_NAME} + basePath: 'sapmodel' + createRoute53Record: true + certificateArn: ${ssm:/ssl_certificate_arn} functions: sap_prediction_lambda: image: uri: ${env:ECR_URI}:${env:GITHUB_SHA} - role: sapPredictionLambdaRole + # role: sapPredictionLambdaRole events: - http: path: /predict method: POST - -resources: - Resources: - sapPredictionLambdaRole: - Type: AWS::IAM::Role - Properties: - RoleName: sap-prediction-lambda-role - AssumeRolePolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Principal: - Service: - - lambda.amazonaws.com - Action: sts:AssumeRole - Policies: - - PolicyName: sapPredictionLambdaS3Access - PolicyDocument: - Version: '2012-10-17' - Statement: - # Allow reading from MODEL_DIRECTORY_BUCKET and DATA_BUCKET - - Effect: Allow - Action: - - s3:GetObject - - s3:ListBucket - Resource: - - arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET} - - arn:aws:s3:::${env:MODEL_DIRECTORY_BUCKET}/* - - arn:aws:s3:::${env:DATA_BUCKET} - - arn:aws:s3:::${env:DATA_BUCKET}/* - # Allow reading and writing to PREDICTIONS_BUCKET - - Effect: Allow - Action: - - s3:GetObject - - s3:PutObject - - s3:ListBucket - Resource: - - arn:aws:s3:::${env:PREDICTIONS_BUCKET} - - arn:aws:s3:::${env:PREDICTIONS_BUCKET}/* + timeout: 120 # Set max run time to 2 minutes - we shouldn't need this much time so this can be reviewed diff --git a/serverless.yml b/serverless.yml index 89eb666a..1c2a5f73 100644 --- a/serverless.yml +++ b/serverless.yml @@ -9,6 +9,7 @@ provider: ENVIRONMENT: ${env:ENVIRONMENT} SECRET_KEY: ${env:SECRET_KEY} PLAN_TRIGGER_BUCKET: ${env:PLAN_TRIGGER_BUCKET} + DATA_BUCKET: ${env:DATA_BUCKET} DOMAIN_NAME: ${env:DOMAIN_NAME} EPC_AUTH_TOKEN: ${env:EPC_AUTH_TOKEN} DB_HOST: ${env:DB_HOST}