implementing sap model api call to backend and fixing bug in DataProcessor

This commit is contained in:
Khalim Conn-Kowlessar 2023-09-05 18:03:25 +01:00
parent d14c73ef66
commit 02208cbf4a
5 changed files with 102 additions and 42 deletions

View file

@ -89,6 +89,7 @@ jobs:
ENVIRONMENT: ${{ github.ref_name }}
SECRET_KEY: ${{ secrets.NEXTAUTH_SECRET }}
PLAN_TRIGGER_BUCKET: 'retrofit-plan-inputs-${{ github.ref_name }}'
DATA_BUCKET: 'retrofit-data-${{ github.ref_name }}'
DOMAIN_NAME: ${{ steps.set_domain.outputs.domain }}
EPC_AUTH_TOKEN: ${{ steps.set_auth_token.outputs.auth_token }}
DB_HOST: ${{ steps.set_db_credentials.outputs.db_host }}

View file

@ -17,6 +17,8 @@ from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
from datetime import datetime
import pandas as pd
from io import BytesIO
import boto3
# database interaction functions
from backend.app.db.functions.property_functions import (
@ -24,8 +26,7 @@ from backend.app.db.functions.property_functions import (
)
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.recommendations_functions import (
create_plan, create_recommendation, create_recommendation_material, create_plan_recommendations,
upload_recommendations
create_plan, create_plan_recommendations, upload_recommendations
)
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.connection import db_engine
@ -34,6 +35,7 @@ from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from backend.app.utils import epc_to_sap_lower_bound
from model_data.optimiser.optimiser_functions import prepare_input_measures
from model_data.simulation_system.core.DataProcessor import DataProcessor
# TODO: This is placeholder until data is stored in DB
from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls
@ -131,6 +133,19 @@ def insert_temp_recommendation_id(property_recommendations):
return property_recommendations
def read_parquet_from_s3(bucket_name, file_key):
client = boto3.client('s3')
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
df = pd.read_parquet(BytesIO(csv_body))
return df
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
@ -328,7 +343,7 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations[p.id] = property_recommendations
# Finally, we'll prepare data for predicting the impact on SAP
from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES
from model_data.simulation_system.core.Settings import FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON
epc_data = p.data.copy()
epc_data = pd.DataFrame([epc_data])
epc_data.columns = [col.upper().replace("-", "_") for col in epc_data.columns]
@ -348,6 +363,7 @@ async def trigger_plan(body: PlanTriggerRequest):
scoring_dict = {
"UPRN": p.data["uprn"],
"id": "+".join([str(p.id), str(rec["recommendation_id"])]),
"LOCAL_AUTHORITY": p.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
@ -364,7 +380,32 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data.append(scoring_dict)
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# TODO: We need to clean the data
# Clean the data
cleaning_data = read_parquet_from_s3(
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
cleaning_data = cleaning_data.rename(columns={"local-authority": "LOCAL_AUTHORITY"})
# Merge the cleaning data onto recommendations_scoring_data
recommendations_scoring_data[["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]] = recommendations_scoring_data[
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
# Perform the same cleaning as in the model
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"]
)
recommendations_scoring_data = recommendations_scoring_data.drop(columns=["LOCAL_AUTHORITY"])
# Note: We might need to perform the full pre-processing here
data_processor = DataProcessor(filepath=None)
data_processor.insert_data(recommendations_scoring_data)
data_processor.remap_columns()
recommendations_scoring_data = data_processor.data
column_types = data.dtypes.to_dict()
columntypes = {}
@ -409,19 +450,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# Example data file
from io import BytesIO
import boto3
def read_parquet_from_s3(bucket_name, file_key):
client = boto3.client('s3')
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
df = pd.read_parquet(BytesIO(csv_body))
return df
data = read_parquet_from_s3(
bucket_name="retrofit-data-dev", file_key="model_build_data/change_data/rdsap_full/test_data_with_id.parquet"
)

View file

@ -21,17 +21,24 @@ class DataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path) -> None:
def __init__(self, filepath: Path | None) -> None:
self.filepath = filepath
self.data = None
def load_data(self, low_memory=False) -> None:
if not self.filepath:
raise ValueError("No filepath specified")
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
def insert_data(self, data: pd.DataFrame) -> None:
self.data = data
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
"""
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if not self.data:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.confine_data()
# TODO: CLean number of heated rooms and habitable rooms
@ -87,7 +94,7 @@ class DataProcessor:
# Remap certain columns
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FROM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
self.data = data
@ -264,3 +271,43 @@ class DataProcessor:
self.data["MULTI_GLAZE_PROPORTION"]
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100
@staticmethod
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
:param data_to_clean: DataFrame to be cleaned.
:param cleaning_data: DataFrame containing data for cleaning.
:param cols_to_merge_on: Columns on which merging is based. We pass cols_to_merge_on to this function as this
differs depending on where the function is being used.
:return: Cleaned DataFrame.
"""
# Enforce data types
for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]:
data_to_clean[col] = data_to_clean[col].astype(float)
# Identify columns with non-NaN values
columns_to_merge_on = data_to_clean[cols_to_merge_on].dropna().columns.tolist()
# Calculate averages
cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg({
"TOTAL_FLOOR_AREA": "mean",
"FLOOR_HEIGHT": "mean"
})
# Merge with the original data
data_to_clean = pd.merge(
data_to_clean,
cleaning_averages_to_merge,
on=columns_to_merge_on,
suffixes=("", "_AVERAGE"),
how='left'
)
# Fill NaN values with averages
for col in ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]:
data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True)
data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True)
return data_to_clean

View file

@ -64,28 +64,10 @@ def app():
# property_data[AVERAGE_FIXED_FEATURES].fillna(value=0).pct_change().iloc[-1] > 0.1
# Extract the columns that are not all None
na_columns = property_data[COLUMNS_TO_MERGE_ON].isna().all()
cleaned_columns_to_merge_on = na_columns.index[~na_columns].to_list()
# Get the corresponding groupby and merge, and fill in NA values
cleaning_averages_to_merge = cleaning_averages.groupby(
cleaned_columns_to_merge_on
)[["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
modified_property_data = pd.merge(
property_data,
cleaning_averages_to_merge,
on=cleaned_columns_to_merge_on,
suffixes=["", "_AVERAGE"],
)
modified_property_data["TOTAL_FLOOR_AREA"] = modified_property_data[
"TOTAL_FLOOR_AREA"
].fillna(modified_property_data["TOTAL_FLOOR_AREA_AVERAGE"])
modified_property_data["FLOOR_HEIGHT"] = modified_property_data[
"FLOOR_HEIGHT"
].fillna(modified_property_data["FLOOR_HEIGHT_AVERAGE"])
modified_property_data = modified_property_data.drop(
columns=["TOTAL_FLOOR_AREA_AVERAGE", "FLOOR_HEIGHT_AVERAGE"]
modified_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=property_data,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
for field in AVERAGE_FIXED_FEATURES:
@ -154,7 +136,7 @@ def app():
dataset.append(property_model_data)
cleaning_averages["local-authority"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_dataset.append(cleaning_averages)
# Store cleaning dataset in s3 as a parquet file

View file

@ -9,6 +9,7 @@ provider:
ENVIRONMENT: ${env:ENVIRONMENT}
SECRET_KEY: ${env:SECRET_KEY}
PLAN_TRIGGER_BUCKET: ${env:PLAN_TRIGGER_BUCKET}
DATA_BUCKET: ${env:DATA_BUCKET}
DOMAIN_NAME: ${env:DOMAIN_NAME}
EPC_AUTH_TOKEN: ${env:EPC_AUTH_TOKEN}
DB_HOST: ${env:DB_HOST}