Added in pre-processor

This commit is contained in:
Khalim Conn-Kowlessar 2023-09-13 18:06:48 +01:00
parent d5bb04a091
commit cde303a0c0
9 changed files with 154 additions and 122 deletions

View file

@ -1,32 +0,0 @@
columntypes = {
'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object',
'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64',
'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64',
'CONSTRUCTION_AGE_BAND': 'object', 'TRANSACTION_TYPE_STARTING': 'object',
'WALLS_DESCRIPTION_STARTING': 'object',
'FLOOR_DESCRIPTION_STARTING': 'object', 'LIGHTING_DESCRIPTION_STARTING': 'object',
'ROOF_DESCRIPTION_STARTING': 'object', 'MAINHEAT_DESCRIPTION_STARTING': 'object',
'HOTWATER_DESCRIPTION_STARTING': 'object', 'MAIN_FUEL_STARTING': 'object',
'MECHANICAL_VENTILATION_STARTING': 'object',
'SECONDHEAT_DESCRIPTION_STARTING': 'object', 'ENERGY_TARIFF_STARTING': 'object',
'SOLAR_WATER_HEATING_FLAG_STARTING': 'object', 'PHOTO_SUPPLY_STARTING': 'float64',
'WINDOWS_DESCRIPTION_STARTING': 'object', 'GLAZED_TYPE_STARTING': 'object',
'MULTI_GLAZE_PROPORTION_STARTING': 'float64', 'LOW_ENERGY_LIGHTING_STARTING': 'float64',
'NUMBER_OPEN_FIREPLACES_STARTING': 'float64', 'MAINHEATCONT_DESCRIPTION_STARTING': 'object',
'EXTENSION_COUNT_STARTING': 'float64', 'LODGEMENT_DATE_STARTING': 'object',
'TRANSACTION_TYPE_ENDING': 'object',
'WALLS_DESCRIPTION_ENDING': 'object', 'FLOOR_DESCRIPTION_ENDING': 'object',
'LIGHTING_DESCRIPTION_ENDING': 'object',
'ROOF_DESCRIPTION_ENDING': 'object', 'MAINHEAT_DESCRIPTION_ENDING': 'object',
'HOTWATER_DESCRIPTION_ENDING': 'object',
'MAIN_FUEL_ENDING': 'object', 'MECHANICAL_VENTILATION_ENDING': 'object',
'SECONDHEAT_DESCRIPTION_ENDING': 'object',
'ENERGY_TARIFF_ENDING': 'object', 'SOLAR_WATER_HEATING_FLAG_ENDING': 'object',
'PHOTO_SUPPLY_ENDING': 'float64',
'WINDOWS_DESCRIPTION_ENDING': 'object', 'GLAZED_TYPE_ENDING': 'object',
'MULTI_GLAZE_PROPORTION_ENDING': 'float64',
'LOW_ENERGY_LIGHTING_ENDING': 'float64', 'NUMBER_OPEN_FIREPLACES_ENDING': 'float64',
'MAINHEATCONT_DESCRIPTION_ENDING': 'object', 'EXTENSION_COUNT_ENDING': 'float64',
'LODGEMENT_DATE_ENDING': 'object',
'id': 'object'
}

View file

@ -11,6 +11,7 @@ from utils.logger import setup_logger
from utils.s3 import read_from_s3
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from recommendations.config import UPGRADES_MAP
from utils.uvalue_estimates import classify_decile_newvalues
from backend.app.db.utils import row2dict
from starlette.responses import Response
@ -33,16 +34,13 @@ from backend.app.db.functions.recommendations_functions import (
)
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.connection import db_engine
from backend.app.plan.columntypes import columntypes
from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from backend.app.utils import epc_to_sap_lower_bound, save_dataframe_to_s3_parquet, read_parquet_from_s3
from backend.app.utils import epc_to_sap_lower_bound, read_parquet_from_s3
from model_data.optimiser.optimiser_functions import prepare_input_measures
from model_data.simulation_system.core.DataProcessor import DataProcessor
from model_data.simulation_system.core.Settings import (
FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON
)
from model_data.simulation_system.core.Settings import COLUMNS_TO_MERGE_ON
# TODO: This is placeholder until data is stored in DB
from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls
@ -152,18 +150,42 @@ def get_cleaned():
return cleaned
def score_measures():
def create_recommendation_scoring_data(
property: Property,
recommendation: dict,
starting_epc_data: pd.DataFrame,
ending_epc_data: pd.DataFrame,
fixed_data: pd.DataFrame,
):
"""
This wrapper function prepares data to be passed to the sap model api
:return:
"""
scoring_dict = {
"UPRN": property.data["uprn"],
"id": "+".join([str(property.id), str(recommendation["recommendation_id"])]),
"LOCAL_AUTHORITY": property.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
}
# We update the description to indicate it's insulated
if recommendation["type"] == "wall_insulation":
scoring_dict["WALLS_DESCRIPTION_ENDING"] = UPGRADES_MAP[property.walls["clean_description"]]
elif recommendation["type"] == "floor_insulation":
scoring_dict["FLOOR_DESCRIPTION_ENDING"] = UPGRADES_MAP[property.floor["clean_description"]]
else:
raise NotImplementedError("Implement me")
return scoring_dict
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
Session = sessionmaker(bind=db_engine)
session = Session()
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
try:
@ -237,7 +259,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting components and properties recommendations")
# TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers
# TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers
# in as a dependency and then the optimisers can take the input measures in as part of the setup() method
recommendations = {}
recommendations_scoring_data = []
@ -334,29 +356,15 @@ async def trigger_plan(body: PlanTriggerRequest):
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["LODGEMENT_DATE_ENDING"] = created_at
scoring_map = {
'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)',
'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)',
'Solid, no insulation (assumed)': 'Solid, insulated (assumed)',
}
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
scoring_dict = {
"UPRN": p.data["uprn"],
"id": "+".join([str(p.id), str(rec["recommendation_id"])]),
"LOCAL_AUTHORITY": p.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
}
# We update the description to indicate it's insulated
if rec["type"] == "wall_insulation":
scoring_dict["WALLS_DESCRIPTION_ENDING"] = scoring_map[p.walls["clean_description"]]
elif rec["type"] == "floor_insulation":
scoring_dict["FLOOR_DESCRIPTION_ENDING"] = scoring_map[p.floor["clean_description"]]
else:
raise NotImplementedError("Implement me")
scoring_dict = create_recommendation_scoring_data(
property=p,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
recommendations_scoring_data.append(scoring_dict)
@ -382,39 +390,20 @@ async def trigger_plan(body: PlanTriggerRequest):
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"]
).drop(columns=["LOCAL_AUTHORITY"])
# Remap column types
recommendations_scoring_data = recommendations_scoring_data.astype(columntypes)
# Store parquet file in s3 for scoring
file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format(
portfolio_id=body.portfolio_id,
timestamp=created_at
sap_change_model_api = SAPChangeModelAPI(portfolio_id=body.portfolio_id, timestamp=created_at)
file_location = sap_change_model_api.upload_scoring_data(
df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET
)
logger.info("Storing scoring data to s3")
save_dataframe_to_s3_parquet(
df=recommendations_scoring_data,
bucket_name=get_settings().DATA_BUCKET,
file_key=file_location
)
logger.info("Making request to sap change api")
sap_change_model_api = SAPChangeModelAPI()
response = sap_change_model_api.predict(
file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location,
created_at=created_at,
portfolio_id=body.portfolio_id
)
# Retrieve the predictions
predictions = pd.DataFrame(read_csv_from_s3(
bucket_name=get_settings().PREDICTIONS_BUCKET,
filepath=response["storage_filepath"]
))
predictions = pd.DataFrame(
read_csv_from_s3(bucket_name=get_settings().PREDICTIONS_BUCKET, filepath=response["storage_filepath"])
)
# We round the predictions
predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(0)
# Extract property_id and recommendation_id
predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True)
# Insert the predictions into the recommendations and run the optimiser

View file

@ -1,32 +1,71 @@
import pandas as pd
import requests
from requests.exceptions import RequestException
from utils.logger import setup_logger
from utils.s3 import save_dataframe_to_s3_parquet
logger = setup_logger()
class SAPChangeModelAPI:
def __init__(self, base_url="https://api.dev.hestia.homes"):
def __init__(
self,
portfolio_id,
timestamp,
base_url="https://api.dev.hestia.homes",
):
"""
property_id (int, optional): :
:param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4.
:param timestamp: The creation timestamp to be passed in the request payload. Defaults to None.
:param base_url:
"""
self.base_url = base_url
self.portfolio_id = portfolio_id
self.timestamp = timestamp
def predict(self, file_location, property_id="", portfolio_id=4, created_at=None):
def upload_scoring_data(self, df: pd.DataFrame, bucket: str) -> str:
"""
The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on
This method allows the user to upload a table as a parquet file. This method will return the file
location, which can be used as the file location in the predict() method
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:return:
"""
# Store parquet file in s3 for scoring
file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format(
portfolio_id=self.portfolio_id,
timestamp=self.timestamp
)
logger.info("Storing scoring data to s3")
save_dataframe_to_s3_parquet(
df=df,
bucket_name=bucket,
file_key=file_location
)
return file_location
def predict(self, file_location):
"""Makes a POST request to the SAP Change Model API with the provided parameters.
Args:
file_location (str): The file location to be passed in the request payload.
property_id (int, optional): The property ID to be passed in the request payload. Defaults to 999.
portfolio_id (int, optional): The portfolio ID to be passed in the request payload. Defaults to 4.
created_at (str, optional): The creation timestamp to be passed in the request payload. Defaults to None.
Returns:
dict: The API response as a dictionary if the request was successful, None otherwise.
"""
logger.info("Making request to sap change api")
url = f"{self.base_url}/sapmodel/predict"
payload = {
"file_location": f"s3://retrofit-data-dev/{file_location}",
"property_id": property_id,
"portfolio_id": portfolio_id,
"created_at": created_at
"property_id": "", # This should get removed
"portfolio_id": self.portfolio_id,
"created_at": self.timestamp
}
try:

View file

@ -13,7 +13,8 @@ from model_data.simulation_system.core.Settings import (
BUILT_FORM_REMAP,
COLUMNS_TO_MERGE_ON,
COMPONENT_FEATURES,
FIXED_FEATURES
FIXED_FEATURES,
COLUMNTYPES
)
from typing import List
@ -195,6 +196,8 @@ class DataProcessor:
self.fill_na_fields()
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Final re-casting after data transformed and prepared
self.data = self.data.astype(COLUMNTYPES)
return self.data

View file

@ -167,3 +167,29 @@ DATA_PROCESSOR_SETTINGS = {
"epc_minimum_count": 1,
"column_mappings": {"UPRN": [int, str]},
}
# This has a manual mapping of the column types required
COLUMNTYPES = {
'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object',
'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64',
'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64',
'CONSTRUCTION_AGE_BAND': 'object',
'TRANSACTION_TYPE': 'object',
'WALLS_DESCRIPTION': 'object',
'FLOOR_DESCRIPTION': 'object',
'LIGHTING_DESCRIPTION': 'object',
'ROOF_DESCRIPTION': 'object',
'MAINHEAT_DESCRIPTION': 'object',
'HOTWATER_DESCRIPTION': 'object', 'MAIN_FUEL': 'object',
'MECHANICAL_VENTILATION': 'object',
'SECONDHEAT_DESCRIPTION': 'object', 'ENERGY_TARIFF': 'object',
'SOLAR_WATER_HEATING_FLAG': 'object', 'PHOTO_SUPPLY': 'float64',
'WINDOWS_DESCRIPTION': 'object',
'GLAZED_TYPE': 'object',
'MULTI_GLAZE_PROPORTION': 'float64',
'LOW_ENERGY_LIGHTING': 'float64',
'NUMBER_OPEN_FIREPLACES': 'float64',
'MAINHEATCONT_DESCRIPTION': 'object',
'EXTENSION_COUNT': 'float64',
'LODGEMENT_DATE': 'object',
}

View file

@ -2,7 +2,7 @@ import pandas as pd
from tqdm import tqdm
from pathlib import Path
from simulation_system.core.Settings import (
from model_data.simulation_system.core.Settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
@ -11,10 +11,10 @@ from simulation_system.core.Settings import (
COLUMNS_TO_MERGE_ON,
EARLIEST_EPC_DATE
)
from simulation_system.core.DataProcessor import DataProcessor
from utils import save_dataframe_to_s3_parquet
from model_data.simulation_system.core.DataProcessor import DataProcessor
from utils.s3 import save_dataframe_to_s3_parquet
DATA_DIRECTORY = Path(__file__).parent / "simulation_system" / "data" / "all-domestic-certificates"
DATA_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates"
def app():

View file

@ -1,5 +1,3 @@
import boto3
from io import BytesIO
import re
from textblob import TextBlob
@ -26,23 +24,3 @@ def correct_spelling(text):
corrected_text = ' '.join(corrected_words)
return corrected_text
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.
:param df: The pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
"""
# Convert the DataFrame to a Parquet format in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer)
# Create the boto3 client
client = boto3.client('s3')
# Upload the Parquet file to S3
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())

View file

@ -0,0 +1,8 @@
# This map defines the upgrades that are possible to be recommended by the recommendation engine
# For example,
# TODO: once we use cleaned descriptions, this should be updated using the cleaned descriptions
UPGRADES_MAP = {
'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)',
'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)',
'Solid, no insulation (assumed)': 'Solid, insulated (assumed)',
}

View file

@ -1,4 +1,5 @@
import boto3
from io import BytesIO
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
@ -42,3 +43,23 @@ def save_data_to_s3(data, bucket_name, s3_file_name):
print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}')
except Exception as e:
print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}')
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.
:param df: The pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
"""
# Convert the DataFrame to a Parquet format in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer)
# Create the boto3 client
client = boto3.client('s3')
# Upload the Parquet file to S3
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())