From 02208cbf4ab8ea29757430feb00681b2c248fcd3 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 18:03:25 +0100 Subject: [PATCH] implementing sap model api call to backend and fixing bug in DataProcessor --- .github/workflows/deploy_fastapi_backend.yml | 1 + backend/app/plan/router.py | 61 ++++++++++++++----- .../simulation_system/core/DataProcessor.py | 53 +++++++++++++++- .../generate_rdsap_change.py | 28 ++------- serverless.yml | 1 + 5 files changed, 102 insertions(+), 42 deletions(-) diff --git a/.github/workflows/deploy_fastapi_backend.yml b/.github/workflows/deploy_fastapi_backend.yml index 502a9eb8..8172e2cb 100644 --- a/.github/workflows/deploy_fastapi_backend.yml +++ b/.github/workflows/deploy_fastapi_backend.yml @@ -89,6 +89,7 @@ jobs: ENVIRONMENT: ${{ github.ref_name }} SECRET_KEY: ${{ secrets.NEXTAUTH_SECRET }} PLAN_TRIGGER_BUCKET: 'retrofit-plan-inputs-${{ github.ref_name }}' + DATA_BUCKET: 'retrofit-data-${{ github.ref_name }}' DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }} EPC_AUTH_TOKEN: ${{ steps.set_auth_token.outputs.auth_token }} DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }} diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 7db2f31f..4669ddfa 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -17,6 +17,8 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError from datetime import datetime import pandas as pd +from io import BytesIO +import boto3 # database interaction functions from backend.app.db.functions.property_functions import ( @@ -24,8 +26,7 @@ from backend.app.db.functions.property_functions import ( ) from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.recommendations_functions import ( - create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations, - upload_recommendations +create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.connection import db_engine @@ -34,6 +35,7 @@ from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser from backend.app.utils import epc_to_sap_lower_bound from model_data.optimiser.optimiser_functions import prepare_input_measures +from model_data.simulation_system.core.DataProcessor import DataProcessor # TODO: This is placeholder until data is stored in DB from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls @@ -131,6 +133,19 @@ def insert_temp_recommendation_id(property_recommendations): return property_recommendations +def read_parquet_from_s3(bucket_name, file_key): + client = boto3.client('s3') + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read() + df = pd.read_parquet(BytesIO(csv_body)) + + return df + + @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") @@ -328,7 +343,7 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations # Finally, we'll prepare data for predicting the impact on SAP - from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES + from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON epc_data = p.data.copy() epc_data = pd.DataFrame([epc_data]) epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns] @@ -348,6 +363,7 @@ async def trigger_plan(body: PlanTriggerRequest): scoring_dict = { "UPRN": p.data["uprn"], "id": "+".join([str(p.id), str(rec["recommendation_id"])]), + "LOCAL_AUTHORITY": p.data["local-authority"], **starting_epc_data.to_dict("records")[0], **ending_epc_data.to_dict("records")[0], **fixed_data.to_dict("records")[0] @@ -364,7 +380,32 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations_scoring_data.append(scoring_dict) recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) - # TODO: We need to clean the data + + # Clean the data + cleaning_data = read_parquet_from_s3( + bucket_name="retrofit-data-dev", + file_key="sap_change_model/cleaning_dataset.parquet", + ) + cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"}) + # Merge the cleaning data onto recommendations_scoring_data + + recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[ + ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] + ].replace("", None) + + # Perform the same cleaning as in the model + recommendations_scoring_data = DataProcessor.apply_averages_cleaning( + data_to_clean=recommendations_scoring_data, + cleaning_data=cleaning_data, + cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"] + ) + recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"]) + + # Note: We might need to perform the full pre-processing here + data_processor = DataProcessor(filepath=None) + data_processor.insert_data(recommendations_scoring_data) + data_processor.remap_columns() + recommendations_scoring_data = data_processor.data column_types = data.dtypes.to_dict() columntypes = {} @@ -409,19 +450,7 @@ async def trigger_plan(body: PlanTriggerRequest): # Example data file - from io import BytesIO - import boto3 - def read_parquet_from_s3(bucket_name, file_key): - client = boto3.client('s3') - # Get the object - s3_object = client.get_object(Bucket=bucket_name, Key=file_key) - - # Read the CSV body into a DataFrame - csv_body = s3_object["Body"].read() - df = pd.read_parquet(BytesIO(csv_body)) - - return df data = read_parquet_from_s3( bucket_name="retrofit-data-dev", file_key="model_build_data/change_data/rdsap_full/test_data_with_id.parquet" ) diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index 4c37176d..9863ec8e 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -21,17 +21,24 @@ class DataProcessor: Handle data loading and data preprocessing """ - def __init__(self, filepath: Path) -> None: + def __init__(self, filepath: Path | None) -> None: self.filepath = filepath + self.data = None def load_data(self, low_memory=False) -> None: + if not self.filepath: + raise ValueError("No filepath specified") self.data = pd.read_csv(self.filepath, low_memory=low_memory) + def insert_data(self, data: pd.DataFrame) -> None: + self.data = data + def pre_process(self) -> pd.DataFrame: """ Load data and begin initial cleaning """ - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + if not self.data: + self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) self.confine_data() # TODO: CLean number of heated rooms and habitable rooms @@ -87,7 +94,7 @@ class DataProcessor: # Remap certain columns data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) - data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) + data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) self.data = data @@ -264,3 +271,43 @@ class DataProcessor: self.data["MULTI_GLAZE_PROPORTION"] ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 + + @staticmethod + def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on): + """ + Clean the input DataFrame using averages from a cleaning DataFrame. + + :param data_to_clean: DataFrame to be cleaned. + :param cleaning_data: DataFrame containing data for cleaning. + :param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this + differs depending on where the function is being used. + :return: Cleaned DataFrame. + """ + # Enforce data types + for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: + data_to_clean[col] = data_to_clean[col].astype(float) + + # Identify columns with non-NaN values + columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist() + + # Calculate averages + cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg({ + "TOTAL_FLOOR_AREA": "mean", + "FLOOR_HEIGHT": "mean" + }) + + # Merge with the original data + data_to_clean = pd.merge( + data_to_clean, + cleaning_averages_to_merge, + on=columns_to_merge_on, + suffixes=("", "_AVERAGE"), + how='left' + ) + + # Fill NaN values with averages + for col in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]: + data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) + data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) + + return data_to_clean diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index cc9ad31c..502f7a06 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -64,28 +64,10 @@ def app(): # property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1 # Extract the columns that are not all None - na_columns = property_data[COLUMNS_TO_MERGE_ON].isna().all() - cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list() - - # Get the corresponding groupby and merge, and fill in NA values - cleaning_averages_to_merge = cleaning_averages.groupby( - cleaned_columns_to_merge_on - )[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() - - modified_property_data = pd.merge( - property_data, - cleaning_averages_to_merge, - on=cleaned_columns_to_merge_on, - suffixes=["", "_AVERAGE"], - ) - modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[ - "TOTAL_FLOOR_AREA" - ].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"]) - modified_property_data["FLOOR_HEIGHT"] = modified_property_data[ - "FLOOR_HEIGHT" - ].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"]) - modified_property_data = modified_property_data.drop( - columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + modified_property_data = DataProcessor.apply_averages_cleaning( + data_to_clean=property_data, + cleaning_data=cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON ) for field in AVERAGE_FIXED_FEATURES: @@ -154,7 +136,7 @@ def app(): dataset.append(property_model_data) - cleaning_averages["local-authority"] = df["LOCAL_AUTHORITY"].values[0] + cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0] cleaning_dataset.append(cleaning_averages) # Store cleaning dataset in s3 as a parquet file diff --git a/serverless.yml b/serverless.yml index 89eb666a..1c2a5f73 100644 --- a/serverless.yml +++ b/serverless.yml @@ -9,6 +9,7 @@ provider: ENVIRONMENT: ${env:ENVIRONMENT} SECRET_KEY: ${env:SECRET_KEY} PLAN_TRIGGER_BUCKET: ${env:PLAN_TRIGGER_BUCKET} + DATA_BUCKET: ${env:DATA_BUCKET} DOMAIN_NAME: ${env:DOMAIN_NAME} EPC_AUTH_TOKEN: ${env:EPC_AUTH_TOKEN} DB_HOST: ${env:DB_HOST}