From 637ada9cb3b9fdf48371205c368e2b01d9135893 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 15:47:26 +0100 Subject: [PATCH 1/6] Implementing sap model api to backend wip --- backend/app/plan/router.py | 110 +++++++++++++++++- backend/requirements/base.txt | 2 + model_data/simulation_system/core/Settings.py | 1 - recommendations/WallRecommendations.py | 4 + 4 files changed, 115 insertions(+), 2 deletions(-) diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 15646c00..7db2f31f 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -15,6 +15,8 @@ from backend.app.db.utils import row2dict from starlette.responses import Response from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError +from datetime import datetime +import pandas as pd # database interaction functions from backend.app.db.functions.property_functions import ( @@ -212,6 +214,8 @@ async def trigger_plan(body: PlanTriggerRequest): # TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers # in as a dependency and then the optimisers can take the input measures in as part of the setup() method recommendations = {} + recommendations_scoring_data = [] + for p in input_properties: property_recommendations = [] @@ -323,7 +327,111 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations - # Once we're done, we'll store: + # Finally, we'll prepare data for predicting the impact on SAP + from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES + epc_data = p.data.copy() + epc_data = pd.DataFrame([epc_data]) + epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns] + starting_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_STARTING") + ending_epc_data = epc_data[COMPONENT_FEATURES + ["LODGEMENT_DATE"]].copy().add_suffix("_ENDING") + fixed_data = epc_data[FIXED_FEATURES] + + # We update the ending record with the recommended updates and we set lodgement date to today + ending_epc_data["LODGEMENT_DATE_ENDING"] = datetime.now().strftime("%Y-%m-%d") + + scoring_map = { + 'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)', + 'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)', + 'Solid, no insulation (assumed)': 'Solid, insulated (assumed)', + } + for rec in property_recommendations: + scoring_dict = { + "UPRN": p.data["uprn"], + "id": "+".join([str(p.id), str(rec["recommendation_id"])]), + **starting_epc_data.to_dict("records")[0], + **ending_epc_data.to_dict("records")[0], + **fixed_data.to_dict("records")[0] + } + + # We update the description to indicate it's insulated + if rec["type"] == "wall_insulation": + scoring_dict["WALLS_DESCRIPTION_ENDING"] = scoring_map[p.walls["clean_description"]] + elif rec["type"] == "floor_insulation": + scoring_dict["FLOOR_DESCRIPTION_ENDING"] = scoring_map[p.floor["clean_description"]] + else: + raise NotImplementedError("Implement me") + + recommendations_scoring_data.append(scoring_dict) + + recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) + # TODO: We need to clean the data + + column_types = data.dtypes.to_dict() + columntypes = {} + for k, v in column_types.items(): + if k not in recommendations_scoring_data.columns: + continue + columntypes[k] = v.name + + columntypes["id"] = "object" + for col in recommendations_scoring_data.columns: + recommendations_scoring_data[col] = recommendations_scoring_data[col].astype(columntypes[col]) + + recommendations_scoring_data = recommendations_scoring_data.astype(columntypes) + + + column_types = {'UPRN': dtype('O'), 'RDSAP_CHANGE': dtype('int64'), 'HEAT_DEMAND_CHANGE': dtype('int64'), + 'TOTAL_FLOOR_AREA': dtype('float64'), 'FLOOR_HEIGHT': dtype('float64'), 'PROPERTY_TYPE': dtype('O'), + 'BUILT_FORM': dtype('O'), 'CONSTITUENCY': dtype('O'), 'NUMBER_HABITABLE_ROOMS': dtype('float64'), + 'NUMBER_HEATED_ROOMS': dtype('float64'), 'FIXED_LIGHTING_OUTLETS_COUNT': dtype('float64'), + 'FLOOR_LEVEL': dtype('float64'), 'CONSTRUCTION_AGE_BAND': dtype('O'), 'TRANSACTION_TYPE_STARTING': dtype('O'), + 'WALLS_DESCRIPTION_STARTING': dtype('O'), 'FLOOR_DESCRIPTION_STARTING': dtype('O'), + 'LIGHTING_DESCRIPTION_STARTING': dtype('O'), 'ROOF_DESCRIPTION_STARTING': dtype('O'), + 'MAINHEAT_DESCRIPTION_STARTING': dtype('O'), 'HOTWATER_DESCRIPTION_STARTING': dtype('O'), + 'MAIN_FUEL_STARTING': dtype('O'), 'MECHANICAL_VENTILATION_STARTING': dtype('O'), + 'SECONDHEAT_DESCRIPTION_STARTING': dtype('O'), 'ENERGY_TARIFF_STARTING': dtype('O'), + 'SOLAR_WATER_HEATING_FLAG_STARTING': dtype('O'), 'PHOTO_SUPPLY_STARTING': dtype('float64'), + 'WINDOWS_DESCRIPTION_STARTING': dtype('O'), 'GLAZED_TYPE_STARTING': dtype('O'), + 'MULTI_GLAZE_PROPORTION_STARTING': dtype('float64'), 'LOW_ENERGY_LIGHTING_STARTING': dtype('float64'), + 'NUMBER_OPEN_FIREPLACES_STARTING': dtype('float64'), 'MAINHEATCONT_DESCRIPTION_STARTING': dtype('O'), + 'EXTENSION_COUNT_STARTING': dtype('float64'), 'LODGEMENT_DATE_STARTING': dtype('O'), + 'TRANSACTION_TYPE_ENDING': dtype('O'), 'WALLS_DESCRIPTION_ENDING': dtype('O'), + 'FLOOR_DESCRIPTION_ENDING': dtype('O'), 'LIGHTING_DESCRIPTION_ENDING': dtype('O'), + 'ROOF_DESCRIPTION_ENDING': dtype('O'), 'MAINHEAT_DESCRIPTION_ENDING': dtype('O'), + 'HOTWATER_DESCRIPTION_ENDING': dtype('O'), 'MAIN_FUEL_ENDING': dtype('O'), + 'MECHANICAL_VENTILATION_ENDING': dtype('O'), 'SECONDHEAT_DESCRIPTION_ENDING': dtype('O'), + 'ENERGY_TARIFF_ENDING': dtype('O'), 'SOLAR_WATER_HEATING_FLAG_ENDING': dtype('O'), + 'PHOTO_SUPPLY_ENDING': dtype('float64'), 'WINDOWS_DESCRIPTION_ENDING': dtype('O'), + 'GLAZED_TYPE_ENDING': dtype('O'), 'MULTI_GLAZE_PROPORTION_ENDING': dtype('float64'), + 'LOW_ENERGY_LIGHTING_ENDING': dtype('float64'), 'NUMBER_OPEN_FIREPLACES_ENDING': dtype('float64'), + 'MAINHEATCONT_DESCRIPTION_ENDING': dtype('O'), 'EXTENSION_COUNT_ENDING': dtype('float64'), + 'LODGEMENT_DATE_ENDING': dtype('O'), 'id': dtype('int64')} + + # Example data file + + from io import BytesIO + import boto3 + def read_parquet_from_s3(bucket_name, file_key): + client = boto3.client('s3') + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read() + df = pd.read_parquet(BytesIO(csv_body)) + + return df + data = read_parquet_from_s3( + bucket_name="retrofit-data-dev", file_key="model_build_data/change_data/rdsap_full/test_data_with_id.parquet" + ) + data = data.head(5) + + + # We query the sap difference model api to get the estimated impact on sap + for property_id, recommendations in recommendations.items(): + + # 1) the property data # 2) the property details (epc) # 3) the recommendations diff --git a/backend/requirements/base.txt b/backend/requirements/base.txt index 0b7337e4..de173db2 100644 --- a/backend/requirements/base.txt +++ b/backend/requirements/base.txt @@ -32,3 +32,5 @@ psycopg2-binary pytz==2023.3 mip==1.15.0 boto3==1.28.3 +pandas==1.5.3 +pyarrow==12.0.1 \ No newline at end of file diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index 7765f64a..9f6c2e12 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -99,7 +99,6 @@ COMPONENT_FEATURES = [ "WINDOWS_DESCRIPTION", "GLAZED_TYPE", "MULTI_GLAZE_PROPORTION", - "LIGHTING_DESCRIPTION", "LOW_ENERGY_LIGHTING", "NUMBER_OPEN_FIREPLACES", "MAINHEATCONT_DESCRIPTION", diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index fdd271be..570a46d7 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -256,6 +256,10 @@ class WallRecommendations(Definitions): is_solid_brick = self.property.walls["is_solid_brick"] insulation_thickness = self.property.walls["insulation_thickness"] + # We check if the wall is already insulated and if so, we exit + if insulation_thickness in ["average", "above average"]: + return + if u_value: if self.property.walls["thermal_transmittance_unit"] != self.U_VALUE_UNIT: raise NotImplementedError("Haven't handled the case of other u value units yet") From 0692140acff148b3a95e1d482c74aff723fb3e23 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 16:33:50 +0100 Subject: [PATCH 2/6] added creating of cleaning dataset and storage of cleaning data --- model_data/requirements/requirements.txt | 2 ++ .../generate_rdsap_change.py | 30 ++++++++++++++----- model_data/utils.py | 23 ++++++++++++++ 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/model_data/requirements/requirements.txt b/model_data/requirements/requirements.txt index 28fce331..d4de6b71 100644 --- a/model_data/requirements/requirements.txt +++ b/model_data/requirements/requirements.txt @@ -18,3 +18,5 @@ statsmodels scikit-learn pyspellchecker textblob +boto3 +pyarrow diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index c1ca56a6..8962e252 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -3,7 +3,7 @@ import pandas as pd from tqdm import tqdm from pathlib import Path -from core.Settings import ( +from simulation_system.core.Settings import ( MANDATORY_FIXED_FEATURES, AVERAGE_FIXED_FEATURES, LATEST_FIELD, @@ -12,9 +12,10 @@ from core.Settings import ( HEAT_DEMAND_RESPONSE, COLUMNS_TO_MERGE_ON, ) -from core.DataProcessor import DataProcessor +from simulation_system.core.DataProcessor import DataProcessor +from utils import save_dataframe_to_s3_parquet -DATA_DIRECTORY = Path(__file__).parent / "data" / "all-domestic-certificates" +DATA_DIRECTORY = Path(__file__).parent / "simulation_system" / "data" / "all-domestic-certificates" # TODO: Have a look at temporal features @@ -28,8 +29,12 @@ def app(): # List all subdirectories directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + # temp + needed = {'E09000030', 'E09000028', 'E09000013', 'E09000012', 'E09000014'} + directories = [d for d in directories if any(n in d.name for n in needed)] dataset = [] + cleaning_dataset = [] # 116 # 128048706 # PosixPath('/home/ubuntu/Documents/python/hestia/Model/model_data/simulation_system/data/all-domestic @@ -107,7 +112,7 @@ def app(): variable_data = modified_property_data[ COMPONENT_FEATURES + ["LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE] - ] + ] # Note: we look at changes between subsequent EPCS, however we could look at other permutations # e.g. first vs second, second vs third and also first vs third @@ -133,10 +138,10 @@ def app(): starting_record = starting_record[ COMPONENT_FEATURES + ["LODGEMENT_DATE"] - ].add_suffix("_STARTING") + ].add_suffix("_STARTING") ending_record = ending_record[ COMPONENT_FEATURES + ["LODGEMENT_DATE"] - ].add_suffix("_ENDING") + ].add_suffix("_ENDING") features = pd.concat([starting_record, ending_record]) @@ -150,7 +155,18 @@ def app(): } ) - dataset.extend(property_model_data) + dataset.append(property_model_data) + + cleaning_averages["local-authority"] = df["LOCAL_AUTHORITY"].values[0] + cleaning_dataset.append(cleaning_averages) + + # Store cleaning dataset in s3 as a parquet file + cleaning_dataset = pd.concat(cleaning_dataset) + save_dataframe_to_s3_parquet( + df=cleaning_dataset, + bucket_name="retrofit-data-dev", + file_key="sap_change_model/cleaning_dataset.parquet", + ) output = pd.DataFrame(dataset) output.to_parquet("./dataset.parquet") diff --git a/model_data/utils.py b/model_data/utils.py index 744914a4..07642973 100644 --- a/model_data/utils.py +++ b/model_data/utils.py @@ -1,3 +1,6 @@ +import boto3 +import pandas as pd +from io import BytesIO import re from textblob import TextBlob @@ -24,3 +27,23 @@ def correct_spelling(text): corrected_text = ' '.join(corrected_words) return corrected_text + + +def save_dataframe_to_s3_parquet(df, bucket_name, file_key): + """ + Save a pandas DataFrame to S3 as a Parquet file. + + :param df: The pandas DataFrame. + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + """ + + # Convert the DataFrame to a Parquet format in memory + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer) + + # Create the boto3 client + client = boto3.client('s3') + + # Upload the Parquet file to S3 + client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue()) From d14c73ef66cf6409cd64cf3473ca6b5b6e9d4bda Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 16:34:16 +0100 Subject: [PATCH 3/6] removing some temp code --- model_data/simulation_system/generate_rdsap_change.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index 8962e252..cc9ad31c 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -29,9 +29,6 @@ def app(): # List all subdirectories directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - # temp - needed = {'E09000030', 'E09000028', 'E09000013', 'E09000012', 'E09000014'} - directories = [d for d in directories if any(n in d.name for n in needed)] dataset = [] cleaning_dataset = [] From 02208cbf4ab8ea29757430feb00681b2c248fcd3 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 18:03:25 +0100 Subject: [PATCH 4/6] implementing sap model api call to backend and fixing bug in DataProcessor --- .github/workflows/deploy_fastapi_backend.yml | 1 + backend/app/plan/router.py | 61 ++++++++++++++----- .../simulation_system/core/DataProcessor.py | 53 +++++++++++++++- .../generate_rdsap_change.py | 28 ++------- serverless.yml | 1 + 5 files changed, 102 insertions(+), 42 deletions(-) diff --git a/.github/workflows/deploy_fastapi_backend.yml b/.github/workflows/deploy_fastapi_backend.yml index 502a9eb8..8172e2cb 100644 --- a/.github/workflows/deploy_fastapi_backend.yml +++ b/.github/workflows/deploy_fastapi_backend.yml @@ -89,6 +89,7 @@ jobs: ENVIRONMENT: ${{ github.ref_name }} SECRET_KEY: ${{ secrets.NEXTAUTH_SECRET }} PLAN_TRIGGER_BUCKET: 'retrofit-plan-inputs-${{ github.ref_name }}' + DATA_BUCKET: 'retrofit-data-${{ github.ref_name }}' DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }} EPC_AUTH_TOKEN: ${{ steps.set_auth_token.outputs.auth_token }} DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }} diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 7db2f31f..4669ddfa 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -17,6 +17,8 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError from datetime import datetime import pandas as pd +from io import BytesIO +import boto3 # database interaction functions from backend.app.db.functions.property_functions import ( @@ -24,8 +26,7 @@ from backend.app.db.functions.property_functions import ( ) from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.recommendations_functions import ( - create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations, - upload_recommendations +create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.connection import db_engine @@ -34,6 +35,7 @@ from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser from backend.app.utils import epc_to_sap_lower_bound from model_data.optimiser.optimiser_functions import prepare_input_measures +from model_data.simulation_system.core.DataProcessor import DataProcessor # TODO: This is placeholder until data is stored in DB from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls @@ -131,6 +133,19 @@ def insert_temp_recommendation_id(property_recommendations): return property_recommendations +def read_parquet_from_s3(bucket_name, file_key): + client = boto3.client('s3') + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read() + df = pd.read_parquet(BytesIO(csv_body)) + + return df + + @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") @@ -328,7 +343,7 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations # Finally, we'll prepare data for predicting the impact on SAP - from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES + from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON epc_data = p.data.copy() epc_data = pd.DataFrame([epc_data]) epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns] @@ -348,6 +363,7 @@ async def trigger_plan(body: PlanTriggerRequest): scoring_dict = { "UPRN": p.data["uprn"], "id": "+".join([str(p.id), str(rec["recommendation_id"])]), + "LOCAL_AUTHORITY": p.data["local-authority"], **starting_epc_data.to_dict("records")[0], **ending_epc_data.to_dict("records")[0], **fixed_data.to_dict("records")[0] @@ -364,7 +380,32 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations_scoring_data.append(scoring_dict) recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) - # TODO: We need to clean the data + + # Clean the data + cleaning_data = read_parquet_from_s3( + bucket_name="retrofit-data-dev", + file_key="sap_change_model/cleaning_dataset.parquet", + ) + cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"}) + # Merge the cleaning data onto recommendations_scoring_data + + recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[ + ["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"] + ].replace("", None) + + # Perform the same cleaning as in the model + recommendations_scoring_data = DataProcessor.apply_averages_cleaning( + data_to_clean=recommendations_scoring_data, + cleaning_data=cleaning_data, + cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"] + ) + recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"]) + + # Note: We might need to perform the full pre-processing here + data_processor = DataProcessor(filepath=None) + data_processor.insert_data(recommendations_scoring_data) + data_processor.remap_columns() + recommendations_scoring_data = data_processor.data column_types = data.dtypes.to_dict() columntypes = {} @@ -409,19 +450,7 @@ async def trigger_plan(body: PlanTriggerRequest): # Example data file - from io import BytesIO - import boto3 - def read_parquet_from_s3(bucket_name, file_key): - client = boto3.client('s3') - # Get the object - s3_object = client.get_object(Bucket=bucket_name, Key=file_key) - - # Read the CSV body into a DataFrame - csv_body = s3_object["Body"].read() - df = pd.read_parquet(BytesIO(csv_body)) - - return df data = read_parquet_from_s3( bucket_name="retrofit-data-dev", file_key="model_build_data/change_data/rdsap_full/test_data_with_id.parquet" ) diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index 4c37176d..9863ec8e 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -21,17 +21,24 @@ class DataProcessor: Handle data loading and data preprocessing """ - def __init__(self, filepath: Path) -> None: + def __init__(self, filepath: Path | None) -> None: self.filepath = filepath + self.data = None def load_data(self, low_memory=False) -> None: + if not self.filepath: + raise ValueError("No filepath specified") self.data = pd.read_csv(self.filepath, low_memory=low_memory) + def insert_data(self, data: pd.DataFrame) -> None: + self.data = data + def pre_process(self) -> pd.DataFrame: """ Load data and begin initial cleaning """ - self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + if not self.data: + self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) self.confine_data() # TODO: CLean number of heated rooms and habitable rooms @@ -87,7 +94,7 @@ class DataProcessor: # Remap certain columns data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP) - data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) + data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP) self.data = data @@ -264,3 +271,43 @@ class DataProcessor: self.data["MULTI_GLAZE_PROPORTION"] ) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS)) self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100 + + @staticmethod + def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on): + """ + Clean the input DataFrame using averages from a cleaning DataFrame. + + :param data_to_clean: DataFrame to be cleaned. + :param cleaning_data: DataFrame containing data for cleaning. + :param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this + differs depending on where the function is being used. + :return: Cleaned DataFrame. + """ + # Enforce data types + for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: + data_to_clean[col] = data_to_clean[col].astype(float) + + # Identify columns with non-NaN values + columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist() + + # Calculate averages + cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg({ + "TOTAL_FLOOR_AREA": "mean", + "FLOOR_HEIGHT": "mean" + }) + + # Merge with the original data + data_to_clean = pd.merge( + data_to_clean, + cleaning_averages_to_merge, + on=columns_to_merge_on, + suffixes=("", "_AVERAGE"), + how='left' + ) + + # Fill NaN values with averages + for col in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]: + data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) + data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) + + return data_to_clean diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index cc9ad31c..502f7a06 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -64,28 +64,10 @@ def app(): # property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1 # Extract the columns that are not all None - na_columns = property_data[COLUMNS_TO_MERGE_ON].isna().all() - cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list() - - # Get the corresponding groupby and merge, and fill in NA values - cleaning_averages_to_merge = cleaning_averages.groupby( - cleaned_columns_to_merge_on - )[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() - - modified_property_data = pd.merge( - property_data, - cleaning_averages_to_merge, - on=cleaned_columns_to_merge_on, - suffixes=["", "_AVERAGE"], - ) - modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[ - "TOTAL_FLOOR_AREA" - ].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"]) - modified_property_data["FLOOR_HEIGHT"] = modified_property_data[ - "FLOOR_HEIGHT" - ].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"]) - modified_property_data = modified_property_data.drop( - columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"] + modified_property_data = DataProcessor.apply_averages_cleaning( + data_to_clean=property_data, + cleaning_data=cleaning_averages, + cols_to_merge_on=COLUMNS_TO_MERGE_ON ) for field in AVERAGE_FIXED_FEATURES: @@ -154,7 +136,7 @@ def app(): dataset.append(property_model_data) - cleaning_averages["local-authority"] = df["LOCAL_AUTHORITY"].values[0] + cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0] cleaning_dataset.append(cleaning_averages) # Store cleaning dataset in s3 as a parquet file diff --git a/serverless.yml b/serverless.yml index 89eb666a..1c2a5f73 100644 --- a/serverless.yml +++ b/serverless.yml @@ -9,6 +9,7 @@ provider: ENVIRONMENT: ${env:ENVIRONMENT} SECRET_KEY: ${env:SECRET_KEY} PLAN_TRIGGER_BUCKET: ${env:PLAN_TRIGGER_BUCKET} + DATA_BUCKET: ${env:DATA_BUCKET} DOMAIN_NAME: ${env:DOMAIN_NAME} EPC_AUTH_TOKEN: ${env:EPC_AUTH_TOKEN} DB_HOST: ${env:DB_HOST} From ee15ae84663909728df50b21981987a30ea29ed5 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 18:29:38 +0100 Subject: [PATCH 5/6] Implementing sap model into the backend, almost complete --- backend/app/plan/columntypes.py | 32 +++++ backend/app/plan/router.py | 115 ++++++++---------- backend/app/utils.py | 35 ++++++ backend/docker/Dockerfile | 1 + .../handlers/predictions_app.py | 15 ++- recommendations/FloorRecommendations.py | 4 +- recommendations/WallRecommendations.py | 6 +- recommendations/recommendation_utils.py | 9 -- 8 files changed, 136 insertions(+), 81 deletions(-) create mode 100644 backend/app/plan/columntypes.py diff --git a/backend/app/plan/columntypes.py b/backend/app/plan/columntypes.py new file mode 100644 index 00000000..5c859300 --- /dev/null +++ b/backend/app/plan/columntypes.py @@ -0,0 +1,32 @@ +columntypes = { + 'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object', + 'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64', + 'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64', + 'CONSTRUCTION_AGE_BAND': 'object', 'TRANSACTION_TYPE_STARTING': 'object', + 'WALLS_DESCRIPTION_STARTING': 'object', + 'FLOOR_DESCRIPTION_STARTING': 'object', 'LIGHTING_DESCRIPTION_STARTING': 'object', + 'ROOF_DESCRIPTION_STARTING': 'object', 'MAINHEAT_DESCRIPTION_STARTING': 'object', + 'HOTWATER_DESCRIPTION_STARTING': 'object', 'MAIN_FUEL_STARTING': 'object', + 'MECHANICAL_VENTILATION_STARTING': 'object', + 'SECONDHEAT_DESCRIPTION_STARTING': 'object', 'ENERGY_TARIFF_STARTING': 'object', + 'SOLAR_WATER_HEATING_FLAG_STARTING': 'object', 'PHOTO_SUPPLY_STARTING': 'float64', + 'WINDOWS_DESCRIPTION_STARTING': 'object', 'GLAZED_TYPE_STARTING': 'object', + 'MULTI_GLAZE_PROPORTION_STARTING': 'float64', 'LOW_ENERGY_LIGHTING_STARTING': 'float64', + 'NUMBER_OPEN_FIREPLACES_STARTING': 'float64', 'MAINHEATCONT_DESCRIPTION_STARTING': 'object', + 'EXTENSION_COUNT_STARTING': 'float64', 'LODGEMENT_DATE_STARTING': 'object', + 'TRANSACTION_TYPE_ENDING': 'object', + 'WALLS_DESCRIPTION_ENDING': 'object', 'FLOOR_DESCRIPTION_ENDING': 'object', + 'LIGHTING_DESCRIPTION_ENDING': 'object', + 'ROOF_DESCRIPTION_ENDING': 'object', 'MAINHEAT_DESCRIPTION_ENDING': 'object', + 'HOTWATER_DESCRIPTION_ENDING': 'object', + 'MAIN_FUEL_ENDING': 'object', 'MECHANICAL_VENTILATION_ENDING': 'object', + 'SECONDHEAT_DESCRIPTION_ENDING': 'object', + 'ENERGY_TARIFF_ENDING': 'object', 'SOLAR_WATER_HEATING_FLAG_ENDING': 'object', + 'PHOTO_SUPPLY_ENDING': 'float64', + 'WINDOWS_DESCRIPTION_ENDING': 'object', 'GLAZED_TYPE_ENDING': 'object', + 'MULTI_GLAZE_PROPORTION_ENDING': 'float64', + 'LOW_ENERGY_LIGHTING_ENDING': 'float64', 'NUMBER_OPEN_FIREPLACES_ENDING': 'float64', + 'MAINHEATCONT_DESCRIPTION_ENDING': 'object', 'EXTENSION_COUNT_ENDING': 'float64', + 'LODGEMENT_DATE_ENDING': 'object', + 'id': 'object' +} diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 4669ddfa..bd15525d 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -17,8 +17,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError from datetime import datetime import pandas as pd -from io import BytesIO -import boto3 +import requests # database interaction functions from backend.app.db.functions.property_functions import ( @@ -26,16 +25,20 @@ from backend.app.db.functions.property_functions import ( ) from backend.app.db.functions.materials_functions import get_materials from backend.app.db.functions.recommendations_functions import ( -create_plan, create_plan_recommendations, upload_recommendations + create_plan, create_plan_recommendations, upload_recommendations ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.connection import db_engine +from backend.app.plan.columntypes import columntypes from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser -from backend.app.utils import epc_to_sap_lower_bound +from backend.app.utils import epc_to_sap_lower_bound, save_dataframe_to_s3_parquet, read_parquet_from_s3 from model_data.optimiser.optimiser_functions import prepare_input_measures from model_data.simulation_system.core.DataProcessor import DataProcessor +from model_data.simulation_system.core.Settings import ( + FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON +) # TODO: This is placeholder until data is stored in DB from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls @@ -133,19 +136,6 @@ def insert_temp_recommendation_id(property_recommendations): return property_recommendations -def read_parquet_from_s3(bucket_name, file_key): - client = boto3.client('s3') - - # Get the object - s3_object = client.get_object(Bucket=bucket_name, Key=file_key) - - # Read the CSV body into a DataFrame - csv_body = s3_object["Body"].read() - df = pd.read_parquet(BytesIO(csv_body)) - - return df - - @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") @@ -343,7 +333,6 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations # Finally, we'll prepare data for predicting the impact on SAP - from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON epc_data = p.data.copy() epc_data = pd.DataFrame([epc_data]) epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns] @@ -407,59 +396,43 @@ async def trigger_plan(body: PlanTriggerRequest): data_processor.remap_columns() recommendations_scoring_data = data_processor.data - column_types = data.dtypes.to_dict() - columntypes = {} - for k, v in column_types.items(): - if k not in recommendations_scoring_data.columns: - continue - columntypes[k] = v.name - - columntypes["id"] = "object" - for col in recommendations_scoring_data.columns: - recommendations_scoring_data[col] = recommendations_scoring_data[col].astype(columntypes[col]) - + # Remap column types recommendations_scoring_data = recommendations_scoring_data.astype(columntypes) - - - column_types = {'UPRN': dtype('O'), 'RDSAP_CHANGE': dtype('int64'), 'HEAT_DEMAND_CHANGE': dtype('int64'), - 'TOTAL_FLOOR_AREA': dtype('float64'), 'FLOOR_HEIGHT': dtype('float64'), 'PROPERTY_TYPE': dtype('O'), - 'BUILT_FORM': dtype('O'), 'CONSTITUENCY': dtype('O'), 'NUMBER_HABITABLE_ROOMS': dtype('float64'), - 'NUMBER_HEATED_ROOMS': dtype('float64'), 'FIXED_LIGHTING_OUTLETS_COUNT': dtype('float64'), - 'FLOOR_LEVEL': dtype('float64'), 'CONSTRUCTION_AGE_BAND': dtype('O'), 'TRANSACTION_TYPE_STARTING': dtype('O'), - 'WALLS_DESCRIPTION_STARTING': dtype('O'), 'FLOOR_DESCRIPTION_STARTING': dtype('O'), - 'LIGHTING_DESCRIPTION_STARTING': dtype('O'), 'ROOF_DESCRIPTION_STARTING': dtype('O'), - 'MAINHEAT_DESCRIPTION_STARTING': dtype('O'), 'HOTWATER_DESCRIPTION_STARTING': dtype('O'), - 'MAIN_FUEL_STARTING': dtype('O'), 'MECHANICAL_VENTILATION_STARTING': dtype('O'), - 'SECONDHEAT_DESCRIPTION_STARTING': dtype('O'), 'ENERGY_TARIFF_STARTING': dtype('O'), - 'SOLAR_WATER_HEATING_FLAG_STARTING': dtype('O'), 'PHOTO_SUPPLY_STARTING': dtype('float64'), - 'WINDOWS_DESCRIPTION_STARTING': dtype('O'), 'GLAZED_TYPE_STARTING': dtype('O'), - 'MULTI_GLAZE_PROPORTION_STARTING': dtype('float64'), 'LOW_ENERGY_LIGHTING_STARTING': dtype('float64'), - 'NUMBER_OPEN_FIREPLACES_STARTING': dtype('float64'), 'MAINHEATCONT_DESCRIPTION_STARTING': dtype('O'), - 'EXTENSION_COUNT_STARTING': dtype('float64'), 'LODGEMENT_DATE_STARTING': dtype('O'), - 'TRANSACTION_TYPE_ENDING': dtype('O'), 'WALLS_DESCRIPTION_ENDING': dtype('O'), - 'FLOOR_DESCRIPTION_ENDING': dtype('O'), 'LIGHTING_DESCRIPTION_ENDING': dtype('O'), - 'ROOF_DESCRIPTION_ENDING': dtype('O'), 'MAINHEAT_DESCRIPTION_ENDING': dtype('O'), - 'HOTWATER_DESCRIPTION_ENDING': dtype('O'), 'MAIN_FUEL_ENDING': dtype('O'), - 'MECHANICAL_VENTILATION_ENDING': dtype('O'), 'SECONDHEAT_DESCRIPTION_ENDING': dtype('O'), - 'ENERGY_TARIFF_ENDING': dtype('O'), 'SOLAR_WATER_HEATING_FLAG_ENDING': dtype('O'), - 'PHOTO_SUPPLY_ENDING': dtype('float64'), 'WINDOWS_DESCRIPTION_ENDING': dtype('O'), - 'GLAZED_TYPE_ENDING': dtype('O'), 'MULTI_GLAZE_PROPORTION_ENDING': dtype('float64'), - 'LOW_ENERGY_LIGHTING_ENDING': dtype('float64'), 'NUMBER_OPEN_FIREPLACES_ENDING': dtype('float64'), - 'MAINHEATCONT_DESCRIPTION_ENDING': dtype('O'), 'EXTENSION_COUNT_ENDING': dtype('float64'), - 'LODGEMENT_DATE_ENDING': dtype('O'), 'id': dtype('int64')} - - # Example data file - - - data = read_parquet_from_s3( - bucket_name="retrofit-data-dev", file_key="model_build_data/change_data/rdsap_full/test_data_with_id.parquet" + # Store parquet file in s3 for scoring + created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( + portfolio_id=body.portfolio_id, + timestamp=created_at ) - data = data.head(5) + save_dataframe_to_s3_parquet( + df=recommendations_scoring_data, + bucket_name="retrofit-data-dev", + file_key=file_location + ) - # We query the sap difference model api to get the estimated impact on sap - for property_id, recommendations in recommendations.items(): + # Call the sap change model + response = requests.post( + url="https://api.dev.hestia.homes/sapmodel/predict", + json={ + "file_location": "s3://retrofit-data-dev/" + file_location, + "property_id": 999, + "portfolio_id": 4, + "created_at": created_at + } + ) + # TODO: Handle the response depending on response code + # Retrieve the predictions + predictions = read_csv_from_s3( + bucket_name="retrofit-sap-predictions-dev", + filepath=f"{body.portfolio_id}/999/{created_at}.csv" + ) + predictions = pd.DataFrame(predictions) + # We round the predictions + predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(0) + # Extract property_id and recommendation_id + predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True) # 1) the property data # 2) the property details (epc) @@ -481,6 +454,16 @@ async def trigger_plan(body: PlanTriggerRequest): if not recommendations_to_upload: continue + property_predictions = predictions[predictions["property_id"] == str(p.id)] + for rec in recommendations_to_upload: + # Insert the prediction for sap points + rec["sap_points"] = property_predictions[property_predictions["recommendation_id"] == str( + rec["recommendation_id"] + )]["RDSAP_CHANGE"].values[0] + + if not rec["sap_points"]: + raise ValueError("Sap points missing") + # Create a plan new_plan_id = create_plan( session, diff --git a/backend/app/utils.py b/backend/app/utils.py index bc052e74..7099eba1 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -4,6 +4,8 @@ from io import StringIO import string import secrets import logging +import pandas as pd +from io import BytesIO def setup_logger(log_file=None, level=logging.INFO, overwrite_handler=False): @@ -117,3 +119,36 @@ def epc_to_sap_lower_bound(epc: str): return 1 else: raise ValueError("EPC rating should be between A and G") + + +def read_parquet_from_s3(bucket_name, file_key): + client = boto3.client('s3') + + # Get the object + s3_object = client.get_object(Bucket=bucket_name, Key=file_key) + + # Read the CSV body into a DataFrame + csv_body = s3_object["Body"].read() + df = pd.read_parquet(BytesIO(csv_body)) + + return df + + +def save_dataframe_to_s3_parquet(df, bucket_name, file_key): + """ + Save a pandas DataFrame to S3 as a Parquet file. + + :param df: The pandas DataFrame. + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket) + """ + + # Convert the DataFrame to a Parquet format in memory + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer) + + # Create the boto3 client + s3 = boto3.resource('s3') + + # Upload the Parquet file to S3 + s3.Object(bucket_name, file_key).put(Body=parquet_buffer.getvalue()) diff --git a/backend/docker/Dockerfile b/backend/docker/Dockerfile index f23435a0..fd498cdb 100644 --- a/backend/docker/Dockerfile +++ b/backend/docker/Dockerfile @@ -40,6 +40,7 @@ COPY ./model_data/config.py ./model_data/config.py COPY ./model_data/optimiser/ ./model_data/optimiser/ COPY ./model_data/__init__.py ./model_data/__init__.py COPY ./model_data/EpcClean.py ./model_data/EpcClean.py +COPT ./model_data/simulation_system/core/ ./model_data/simulation_system/core/ COPY ./model_data/utils.py ./model_data/utils.py COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/ COPY ./datatypes/ ./datatypes/ diff --git a/model_data/simulation_system/handlers/predictions_app.py b/model_data/simulation_system/handlers/predictions_app.py index 5ea0d997..75d2aff6 100644 --- a/model_data/simulation_system/handlers/predictions_app.py +++ b/model_data/simulation_system/handlers/predictions_app.py @@ -70,11 +70,24 @@ def handler(event, context): s3_file_name=storage_filepath ) - return storage_filepath + return { + "statusCode": 200, + "body": json.dumps({ + "message": "Successfully processed input", + "storage_filepath": storage_filepath + }) + } except (Exception, KeyError, ValueError) as e: logger.info("Prediction failed") logger.info(e) + return { + "statusCode": 500, + "body": json.dumps({ + "message": "Prediction failed", + "error": str(e) + }) + } if __name__ == "__main__": diff --git a/recommendations/FloorRecommendations.py b/recommendations/FloorRecommendations.py index 3d53da69..b111090f 100644 --- a/recommendations/FloorRecommendations.py +++ b/recommendations/FloorRecommendations.py @@ -6,7 +6,7 @@ from backend.Property import Property from recommendations.rdsap_tables import default_wall_thickness, age_band_data from recommendations.recommendation_utils import ( r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, - get_recommended_part, get_uvalue_estimate, estimate_sap_points + get_recommended_part, get_uvalue_estimate ) suspended_floor_insulation_parts = [ @@ -323,7 +323,7 @@ class FloorRecommendations(Definitions): "description": self._make_floor_description(part, depth), "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": estimate_sap_points(), + "sap_points": None, "cost": estimated_cost, } ) diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index 570a46d7..9edbe969 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -6,7 +6,7 @@ from backend.Property import Property from model_data.BaseUtility import Definitions from recommendations.recommendation_utils import ( r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, - get_recommended_part, get_uvalue_estimate, estimate_sap_points + get_recommended_part, get_uvalue_estimate ) external_wall_insulation_parts = [ @@ -354,7 +354,7 @@ class WallRecommendations(Definitions): "description": "Install " + self._make_description(part, depth), "starting_u_value": u_value, "new_u_value": new_u_value, - "sap_points": estimate_sap_points(), + "sap_points": None, "cost": estimated_cost, } ) @@ -436,7 +436,7 @@ class WallRecommendations(Definitions): ), "starting_u_value": u_value, "new_u_value": combined_new_u_value, - "sap_points": estimate_sap_points(), + "sap_points": None, "cost": ewi_esimtated_cost + iwi_esimtated_cost, } self.recommendations.append(recommendation) diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index d35befd7..e53aeb17 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -4,15 +4,6 @@ from statistics import mean import random -def estimate_sap_points(): - """ - This is a placeholder function. We will implement the proper version soon - :return: - """ - - return random.sample(range(4, 12), 1)[0] - - def r_value_per_mm_to_u_value(depth_mm: int, r_value_per_mm: float): """ Converts R-value per mm to U-value in W/m²K. From e05e8ff6365af7e2eafa9bab721dcde7fee50d41 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 5 Sep 2023 18:32:30 +0100 Subject: [PATCH 6/6] Added TODO --- backend/app/plan/router.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index bd15525d..4b972a6a 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -333,6 +333,9 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations # Finally, we'll prepare data for predicting the impact on SAP + # TODO: We should use the cleaned data from get_components in the data rather than the raw + # values. We should create a method in Property which takes the EPC data and inserts the cleaned + # data epc_data = p.data.copy() epc_data = pd.DataFrame([epc_data]) epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns]