diff --git a/backend/app/plan/columntypes.py b/backend/app/plan/columntypes.py deleted file mode 100644 index 5c859300..00000000 --- a/backend/app/plan/columntypes.py +++ /dev/null @@ -1,32 +0,0 @@ -columntypes = { - 'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object', - 'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64', - 'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64', - 'CONSTRUCTION_AGE_BAND': 'object', 'TRANSACTION_TYPE_STARTING': 'object', - 'WALLS_DESCRIPTION_STARTING': 'object', - 'FLOOR_DESCRIPTION_STARTING': 'object', 'LIGHTING_DESCRIPTION_STARTING': 'object', - 'ROOF_DESCRIPTION_STARTING': 'object', 'MAINHEAT_DESCRIPTION_STARTING': 'object', - 'HOTWATER_DESCRIPTION_STARTING': 'object', 'MAIN_FUEL_STARTING': 'object', - 'MECHANICAL_VENTILATION_STARTING': 'object', - 'SECONDHEAT_DESCRIPTION_STARTING': 'object', 'ENERGY_TARIFF_STARTING': 'object', - 'SOLAR_WATER_HEATING_FLAG_STARTING': 'object', 'PHOTO_SUPPLY_STARTING': 'float64', - 'WINDOWS_DESCRIPTION_STARTING': 'object', 'GLAZED_TYPE_STARTING': 'object', - 'MULTI_GLAZE_PROPORTION_STARTING': 'float64', 'LOW_ENERGY_LIGHTING_STARTING': 'float64', - 'NUMBER_OPEN_FIREPLACES_STARTING': 'float64', 'MAINHEATCONT_DESCRIPTION_STARTING': 'object', - 'EXTENSION_COUNT_STARTING': 'float64', 'LODGEMENT_DATE_STARTING': 'object', - 'TRANSACTION_TYPE_ENDING': 'object', - 'WALLS_DESCRIPTION_ENDING': 'object', 'FLOOR_DESCRIPTION_ENDING': 'object', - 'LIGHTING_DESCRIPTION_ENDING': 'object', - 'ROOF_DESCRIPTION_ENDING': 'object', 'MAINHEAT_DESCRIPTION_ENDING': 'object', - 'HOTWATER_DESCRIPTION_ENDING': 'object', - 'MAIN_FUEL_ENDING': 'object', 'MECHANICAL_VENTILATION_ENDING': 'object', - 'SECONDHEAT_DESCRIPTION_ENDING': 'object', - 'ENERGY_TARIFF_ENDING': 'object', 'SOLAR_WATER_HEATING_FLAG_ENDING': 'object', - 'PHOTO_SUPPLY_ENDING': 'float64', - 'WINDOWS_DESCRIPTION_ENDING': 'object', 'GLAZED_TYPE_ENDING': 'object', - 'MULTI_GLAZE_PROPORTION_ENDING': 'float64', - 'LOW_ENERGY_LIGHTING_ENDING': 'float64', 'NUMBER_OPEN_FIREPLACES_ENDING': 'float64', - 'MAINHEATCONT_DESCRIPTION_ENDING': 'object', 'EXTENSION_COUNT_ENDING': 'float64', - 'LODGEMENT_DATE_ENDING': 'object', - 'id': 'object' -} diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index bed1f4b9..17a5fc88 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -11,6 +11,7 @@ from utils.logger import setup_logger from utils.s3 import read_from_s3 from recommendations.FloorRecommendations import FloorRecommendations from recommendations.WallRecommendations import WallRecommendations +from recommendations.config import UPGRADES_MAP from utils.uvalue_estimates import classify_decile_newvalues from backend.app.db.utils import row2dict from starlette.responses import Response @@ -33,16 +34,13 @@ from backend.app.db.functions.recommendations_functions import ( ) from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations from backend.app.db.connection import db_engine -from backend.app.plan.columntypes import columntypes from model_data.optimiser.GainOptimiser import GainOptimiser from model_data.optimiser.CostOptimiser import CostOptimiser -from backend.app.utils import epc_to_sap_lower_bound, save_dataframe_to_s3_parquet, read_parquet_from_s3 +from backend.app.utils import epc_to_sap_lower_bound, read_parquet_from_s3 from model_data.optimiser.optimiser_functions import prepare_input_measures from model_data.simulation_system.core.DataProcessor import DataProcessor -from model_data.simulation_system.core.Settings import ( - FIXED_FEATURES, COMPONENT_FEATURES, COLUMNS_TO_MERGE_ON -) +from model_data.simulation_system.core.Settings import COLUMNS_TO_MERGE_ON # TODO: This is placeholder until data is stored in DB from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls @@ -152,18 +150,42 @@ def get_cleaned(): return cleaned -def score_measures(): +def create_recommendation_scoring_data( + property: Property, + recommendation: dict, + starting_epc_data: pd.DataFrame, + ending_epc_data: pd.DataFrame, + fixed_data: pd.DataFrame, +): """ This wrapper function prepares data to be passed to the sap model api :return: """ + scoring_dict = { + "UPRN": property.data["uprn"], + "id": "+".join([str(property.id), str(recommendation["recommendation_id"])]), + "LOCAL_AUTHORITY": property.data["local-authority"], + **starting_epc_data.to_dict("records")[0], + **ending_epc_data.to_dict("records")[0], + **fixed_data.to_dict("records")[0] + } + + # We update the description to indicate it's insulated + if recommendation["type"] == "wall_insulation": + scoring_dict["WALLS_DESCRIPTION_ENDING"] = UPGRADES_MAP[property.walls["clean_description"]] + elif recommendation["type"] == "floor_insulation": + scoring_dict["FLOOR_DESCRIPTION_ENDING"] = UPGRADES_MAP[property.floor["clean_description"]] + else: + raise NotImplementedError("Implement me") + + return scoring_dict + @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") - Session = sessionmaker(bind=db_engine) - session = Session() + session = sessionmaker(bind=db_engine)() created_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") try: @@ -237,7 +259,7 @@ async def trigger_plan(body: PlanTriggerRequest): logger.info("Getting components and properties recommendations") - # TODO: Move this to a class. We probably was a Recommender class which takes the injects the optimisers + # TODO: Move this to a class. We probably want a Recommender class which takes the injects the optimisers # in as a dependency and then the optimisers can take the input measures in as part of the setup() method recommendations = {} recommendations_scoring_data = [] @@ -334,29 +356,15 @@ async def trigger_plan(body: PlanTriggerRequest): # We update the ending record with the recommended updates and we set lodgement date to today ending_epc_data["LODGEMENT_DATE_ENDING"] = created_at - scoring_map = { - 'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)', - 'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)', - 'Solid, no insulation (assumed)': 'Solid, insulated (assumed)', - } for recommendations_by_type in property_recommendations: for rec in recommendations_by_type: - scoring_dict = { - "UPRN": p.data["uprn"], - "id": "+".join([str(p.id), str(rec["recommendation_id"])]), - "LOCAL_AUTHORITY": p.data["local-authority"], - **starting_epc_data.to_dict("records")[0], - **ending_epc_data.to_dict("records")[0], - **fixed_data.to_dict("records")[0] - } - - # We update the description to indicate it's insulated - if rec["type"] == "wall_insulation": - scoring_dict["WALLS_DESCRIPTION_ENDING"] = scoring_map[p.walls["clean_description"]] - elif rec["type"] == "floor_insulation": - scoring_dict["FLOOR_DESCRIPTION_ENDING"] = scoring_map[p.floor["clean_description"]] - else: - raise NotImplementedError("Implement me") + scoring_dict = create_recommendation_scoring_data( + property=p, + recommendation=rec, + starting_epc_data=starting_epc_data, + ending_epc_data=ending_epc_data, + fixed_data=fixed_data, + ) recommendations_scoring_data.append(scoring_dict) @@ -382,39 +390,20 @@ async def trigger_plan(body: PlanTriggerRequest): cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"] ).drop(columns=["LOCAL_AUTHORITY"]) - # Remap column types - recommendations_scoring_data = recommendations_scoring_data.astype(columntypes) - # Store parquet file in s3 for scoring - - file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( - portfolio_id=body.portfolio_id, - timestamp=created_at + sap_change_model_api = SAPChangeModelAPI(portfolio_id=body.portfolio_id, timestamp=created_at) + file_location = sap_change_model_api.upload_scoring_data( + df=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET ) - - logger.info("Storing scoring data to s3") - save_dataframe_to_s3_parquet( - df=recommendations_scoring_data, - bucket_name=get_settings().DATA_BUCKET, - file_key=file_location - ) - - logger.info("Making request to sap change api") - sap_change_model_api = SAPChangeModelAPI() response = sap_change_model_api.predict( file_location="s3://{DATA_BUCKET}/".format(DATA_BUCKET=get_settings().DATA_BUCKET) + file_location, - created_at=created_at, - portfolio_id=body.portfolio_id ) # Retrieve the predictions - predictions = pd.DataFrame(read_csv_from_s3( - bucket_name=get_settings().PREDICTIONS_BUCKET, - filepath=response["storage_filepath"] - )) + predictions = pd.DataFrame( + read_csv_from_s3(bucket_name=get_settings().PREDICTIONS_BUCKET, filepath=response["storage_filepath"]) + ) - # We round the predictions predictions["RDSAP_CHANGE"] = predictions["RDSAP_CHANGE"].astype(float).round(0) - # Extract property_id and recommendation_id predictions[['property_id', 'recommendation_id']] = predictions['id'].str.split('+', expand=True) # Insert the predictions into the recommendations and run the optimiser diff --git a/backend/ml_models/sap_change_model/api.py b/backend/ml_models/sap_change_model/api.py index 4ef4dc23..fd15ccd1 100644 --- a/backend/ml_models/sap_change_model/api.py +++ b/backend/ml_models/sap_change_model/api.py @@ -1,32 +1,71 @@ +import pandas as pd import requests from requests.exceptions import RequestException from utils.logger import setup_logger +from utils.s3 import save_dataframe_to_s3_parquet logger = setup_logger() class SAPChangeModelAPI: - def __init__(self, base_url="https://api.dev.hestia.homes"): + def __init__( + self, + portfolio_id, + timestamp, + base_url="https://api.dev.hestia.homes", + ): + """ + property_id (int, optional): : + :param portfolio_id: The portfolio ID to be passed in the request payload. Defaults to 4. + :param timestamp: The creation timestamp to be passed in the request payload. Defaults to None. + :param base_url: + """ self.base_url = base_url + self.portfolio_id = portfolio_id + self.timestamp = timestamp - def predict(self, file_location, property_id="", portfolio_id=4, created_at=None): + def upload_scoring_data(self, df: pd.DataFrame, bucket: str) -> str: + """ + The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on + This method allows the user to upload a table as a parquet file. This method will return the file + location, which can be used as the file location in the predict() method + + :param df: Pandas dataframe with scoring data to be uploaded to s3 + :param bucket: Name of the bucket in s3 to upload to + :return: + """ + + # Store parquet file in s3 for scoring + file_location = "sap_change_predictions/{portfolio_id}/{timestamp}.parquet".format( + portfolio_id=self.portfolio_id, + timestamp=self.timestamp + ) + + logger.info("Storing scoring data to s3") + save_dataframe_to_s3_parquet( + df=df, + bucket_name=bucket, + file_key=file_location + ) + + return file_location + + def predict(self, file_location): """Makes a POST request to the SAP Change Model API with the provided parameters. Args: file_location (str): The file location to be passed in the request payload. - property_id (int, optional): The property ID to be passed in the request payload. Defaults to 999. - portfolio_id (int, optional): The portfolio ID to be passed in the request payload. Defaults to 4. - created_at (str, optional): The creation timestamp to be passed in the request payload. Defaults to None. Returns: dict: The API response as a dictionary if the request was successful, None otherwise. """ + logger.info("Making request to sap change api") url = f"{self.base_url}/sapmodel/predict" payload = { "file_location": f"s3://retrofit-data-dev/{file_location}", - "property_id": property_id, - "portfolio_id": portfolio_id, - "created_at": created_at + "property_id": "", # This should get removed + "portfolio_id": self.portfolio_id, + "created_at": self.timestamp } try: diff --git a/model_data/simulation_system/core/DataProcessor.py b/model_data/simulation_system/core/DataProcessor.py index db333eac..c02e6ed5 100644 --- a/model_data/simulation_system/core/DataProcessor.py +++ b/model_data/simulation_system/core/DataProcessor.py @@ -13,7 +13,8 @@ from model_data.simulation_system.core.Settings import ( BUILT_FORM_REMAP, COLUMNS_TO_MERGE_ON, COMPONENT_FEATURES, - FIXED_FEATURES + FIXED_FEATURES, + COLUMNTYPES ) from typing import List @@ -195,6 +196,8 @@ class DataProcessor: self.fill_na_fields() self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True) + # Final re-casting after data transformed and prepared + self.data = self.data.astype(COLUMNTYPES) return self.data diff --git a/model_data/simulation_system/core/Settings.py b/model_data/simulation_system/core/Settings.py index 01d4151e..8a03b553 100644 --- a/model_data/simulation_system/core/Settings.py +++ b/model_data/simulation_system/core/Settings.py @@ -167,3 +167,29 @@ DATA_PROCESSOR_SETTINGS = { "epc_minimum_count": 1, "column_mappings": {"UPRN": [int, str]}, } + +# This has a manual mapping of the column types required +COLUMNTYPES = { + 'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object', + 'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64', + 'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64', 'FLOOR_LEVEL': 'float64', + 'CONSTRUCTION_AGE_BAND': 'object', + 'TRANSACTION_TYPE': 'object', + 'WALLS_DESCRIPTION': 'object', + 'FLOOR_DESCRIPTION': 'object', + 'LIGHTING_DESCRIPTION': 'object', + 'ROOF_DESCRIPTION': 'object', + 'MAINHEAT_DESCRIPTION': 'object', + 'HOTWATER_DESCRIPTION': 'object', 'MAIN_FUEL': 'object', + 'MECHANICAL_VENTILATION': 'object', + 'SECONDHEAT_DESCRIPTION': 'object', 'ENERGY_TARIFF': 'object', + 'SOLAR_WATER_HEATING_FLAG': 'object', 'PHOTO_SUPPLY': 'float64', + 'WINDOWS_DESCRIPTION': 'object', + 'GLAZED_TYPE': 'object', + 'MULTI_GLAZE_PROPORTION': 'float64', + 'LOW_ENERGY_LIGHTING': 'float64', + 'NUMBER_OPEN_FIREPLACES': 'float64', + 'MAINHEATCONT_DESCRIPTION': 'object', + 'EXTENSION_COUNT': 'float64', + 'LODGEMENT_DATE': 'object', +} diff --git a/model_data/simulation_system/generate_rdsap_change.py b/model_data/simulation_system/generate_rdsap_change.py index 9aa16438..42317edd 100644 --- a/model_data/simulation_system/generate_rdsap_change.py +++ b/model_data/simulation_system/generate_rdsap_change.py @@ -2,7 +2,7 @@ import pandas as pd from tqdm import tqdm from pathlib import Path -from simulation_system.core.Settings import ( +from model_data.simulation_system.core.Settings import ( MANDATORY_FIXED_FEATURES, LATEST_FIELD, COMPONENT_FEATURES, @@ -11,10 +11,10 @@ from simulation_system.core.Settings import ( COLUMNS_TO_MERGE_ON, EARLIEST_EPC_DATE ) -from simulation_system.core.DataProcessor import DataProcessor -from utils import save_dataframe_to_s3_parquet +from model_data.simulation_system.core.DataProcessor import DataProcessor +from utils.s3 import save_dataframe_to_s3_parquet -DATA_DIRECTORY = Path(__file__).parent / "simulation_system" / "data" / "all-domestic-certificates" +DATA_DIRECTORY = Path(__file__).parent / "model_data" / "simulation_system" / "data" / "all-domestic-certificates" def app(): diff --git a/model_data/utils.py b/model_data/utils.py index 49bad31b..744914a4 100644 --- a/model_data/utils.py +++ b/model_data/utils.py @@ -1,5 +1,3 @@ -import boto3 -from io import BytesIO import re from textblob import TextBlob @@ -26,23 +24,3 @@ def correct_spelling(text): corrected_text = ' '.join(corrected_words) return corrected_text - - -def save_dataframe_to_s3_parquet(df, bucket_name, file_key): - """ - Save a pandas DataFrame to S3 as a Parquet file. - - :param df: The pandas DataFrame. - :param bucket_name: Name of the S3 bucket. - :param file_key: Key of the file (including directory path within the bucket). - """ - - # Convert the DataFrame to a Parquet format in memory - parquet_buffer = BytesIO() - df.to_parquet(parquet_buffer) - - # Create the boto3 client - client = boto3.client('s3') - - # Upload the Parquet file to S3 - client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue()) diff --git a/recommendations/config.py b/recommendations/config.py new file mode 100644 index 00000000..750453b0 --- /dev/null +++ b/recommendations/config.py @@ -0,0 +1,8 @@ +# This map defines the upgrades that are possible to be recommended by the recommendation engine +# For example, +# TODO: once we use cleaned descriptions, this should be updated using the cleaned descriptions +UPGRADES_MAP = { + 'Solid brick, as built, no insulation (assumed)': 'Solid brick, as built, insulated (assumed)', + 'Suspended, no insulation (assumed)': 'Suspended, insulated (assumed)', + 'Solid, no insulation (assumed)': 'Solid, insulated (assumed)', +} diff --git a/utils/s3.py b/utils/s3.py index b92f3132..c31f1520 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -1,4 +1,5 @@ import boto3 +from io import BytesIO from botocore.exceptions import NoCredentialsError, PartialCredentialsError @@ -42,3 +43,23 @@ def save_data_to_s3(data, bucket_name, s3_file_name): print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}') except Exception as e: print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}') + + +def save_dataframe_to_s3_parquet(df, bucket_name, file_key): + """ + Save a pandas DataFrame to S3 as a Parquet file. + + :param df: The pandas DataFrame. + :param bucket_name: Name of the S3 bucket. + :param file_key: Key of the file (including directory path within the bucket). + """ + + # Convert the DataFrame to a Parquet format in memory + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer) + + # Create the boto3 client + client = boto3.client('s3') + + # Upload the Parquet file to S3 + client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())