diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index d28e6518..1a499d27 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -29,6 +29,7 @@ from backend.ml_models.api import ModelApi from backend.Property import Property from etl.epc.DataProcessor import DataProcessor from etl.epc.settings import COLUMNS_TO_MERGE_ON +from etl.solar.SolarPhotoSupply import SolarPhotoSupply from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures @@ -115,12 +116,7 @@ async def trigger_plan(body: PlanTriggerRequest): cleaning_data = read_dataframe_from_s3_parquet( bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) - photo_supply_lookup = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/photo_supply_lookup.parquet", - ) - floor_area_decile_thresholds = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet", - ) + photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET) logger.info("Getting spatial data") for p in input_properties: diff --git a/backend/ml_models/api.py b/backend/ml_models/api.py index e6947906..bc09f26c 100644 --- a/backend/ml_models/api.py +++ b/backend/ml_models/api.py @@ -2,8 +2,7 @@ import pandas as pd import requests from requests.exceptions import RequestException from utils.logger import setup_logger -from utils.s3 import save_dataframe_to_s3_parquet -from backend.app.utils import read_parquet_from_s3 +from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet logger = setup_logger() @@ -125,7 +124,7 @@ class ModelApi: # Retrieve the predictions predictions_df = pd.DataFrame( - read_parquet_from_s3( + read_dataframe_from_s3_parquet( bucket_name=predictions_bucket, file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] ) diff --git a/etl/solar/SolarPhotoSupply.py b/etl/solar/SolarPhotoSupply.py index dadb71f0..9fad1831 100644 --- a/etl/solar/SolarPhotoSupply.py +++ b/etl/solar/SolarPhotoSupply.py @@ -13,6 +13,13 @@ class SolarPhotoSupply: ] def __init__(self, file_directories, cleaned_lookup): + """ + Initialize the SolarPhotoSupply class with file directories and a cleaned lookup. Currently, this class + just works with locally stored data, but this could be extended to work with data stored in S3. + + :param file_directories: A list of directories where files are stored. + :param cleaned_lookup: A dictionary containing cleaned lookup data. + """ self.file_directories = file_directories self.results = [] @@ -24,7 +31,10 @@ class SolarPhotoSupply: self.floor_area_decile_thresholds = pd.DataFrame() def create_dataset(self): - + """ + Create a dataset from the provided file directories. This method processes the data files, + applies transformations, and aggregates data into a useful format. + """ results = [] logger.info("Creating solar photo supply dataset") @@ -100,6 +110,13 @@ class SolarPhotoSupply: @staticmethod def classify_floor_area(new_area, thresholds): + """ + Classify a given floor area into a decile based on provided thresholds. + + :param new_area: The new floor area to be classified. + :param thresholds: A list of thresholds used for classification. + :return: An integer representing the decile index. + """ for i, threshold in enumerate(thresholds): if new_area <= threshold: @@ -107,6 +124,10 @@ class SolarPhotoSupply: return len(thresholds) def save(self): + """ + Save the processed data to an S3 bucket in the parquet format. This method also handles + logging and validation to ensure data is present before saving. + """ if self.photo_supply_lookup.empty: raise ValueError("No data to save") @@ -127,6 +148,12 @@ class SolarPhotoSupply: @staticmethod def load(bucket): + """ + Load datasets from an S3 bucket. + + :param bucket: The name of the S3 bucket to load data from. + :return: A tuple containing photo supply lookup and floor area decile thresholds dataframes. + """ photo_supply_lookup = read_dataframe_from_s3_parquet( bucket_name=bucket, file_key="solar_pv_supply/photo_supply_lookup.parquet", ) @@ -135,3 +162,59 @@ class SolarPhotoSupply: ) return photo_supply_lookup, floor_area_decile_thresholds + + @classmethod + def filter_photo_supply_lookup( + cls, + photo_supply_lookup: pd.DataFrame, + floor_area_decile_thresholds: pd.DataFrame, + tenure: str, + built_form: str, + property_type: str, + construction_age_band: str, + is_flat: bool, + is_pitched: bool, + is_roof_room: bool, + floor_area: float + ): + + """ + Filter the photo supply lookup to find the most appropriate photo supply for a given property. + :param photo_supply_lookup: The photo supply lookup dataframe. + :param floor_area_decile_thresholds: The floor area decile thresholds dataframe. + :param tenure: The tenure of the property. + :param built_form: The built form of the property. + :param property_type: The property type of the property. + :param construction_age_band: The construction age band of the property. + :param is_flat: Whether the property has a flat roof. + :param is_pitched: Whether the property has a pitched roof. + :param is_roof_room: Whether the property has a roof room. + :param floor_area: The floor area of the property. + :return: + """ + + # Convert the tenure to lower case, as is done in the creation of the dataset + tenure = tenure.lower() + photo_supply_matched = photo_supply_lookup[ + (photo_supply_lookup["tenure"] == tenure) & + (photo_supply_lookup["built_form"] == built_form) & + (photo_supply_lookup["property_type"] == property_type) & + (photo_supply_lookup["construction_age_band"] == construction_age_band) & + (photo_supply_lookup["is_flat"] == is_flat) & + (photo_supply_lookup["is_pitched"] == is_pitched) & + (photo_supply_lookup["is_roof_room"] == is_roof_room) + ] + + if photo_supply_matched.empty: + raise ValueError("No photo supply matched") + + floor_area_decile = cls.classify_floor_area( + floor_area, floor_area_decile_thresholds["floor_area_decile_thresholds"].values + ) + + if floor_area_decile in photo_supply_matched["floor_area_decile"].values: + photo_supply_matched = photo_supply_matched[ + photo_supply_matched["floor_area_decile"] == floor_area_decile + ] + + return photo_supply_matched