diff --git a/backend/Property.py b/backend/Property.py index a8fd925b..82c60439 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -292,9 +292,7 @@ class Property: self.epc_record, fixed_data ) - self.base_difference_record = TrainingDataset( - datasets=[difference_record], cleaned_lookup=cleaned_lookup - ) + self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup) # If we have variables that have been given to us by the landlord that we know are correct, whereas the EPC # may not be, we use them diff --git a/backend/app/assumptions.py b/backend/app/assumptions.py index d36266d3..d813e1a9 100644 --- a/backend/app/assumptions.py +++ b/backend/app/assumptions.py @@ -71,6 +71,8 @@ DESCRIPTIONS_TO_FUEL_TYPES = { }, 'Electric instantaneous at point of use, plus solar': {"fuel": "Electricity + Solar Thermal", "cop": 1}, "Electric storage heaters, Room heaters, electric": {"fuel": "Electricity", "cop": 1}, + 'Boiler and underfloor heating, oil': {"fuel": "Oil", "cop": 0.85}, + "Boiler and radiators, smokeless fuel": {"fuel": "Smokeless Fuel", "cop": 0.85}, } # These are the measure types where if there is a ventilation recommendation, we force the inclusion of it diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index a9979e31..cd73cce3 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -1,9 +1,19 @@ import boto3 +import json +import math +from datetime import datetime + +import pandas as pd from fastapi import APIRouter, Depends from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest from backend.app.config import get_settings +from sqlalchemy.orm import sessionmaker from utils.logger import setup_logger +from utils.s3 import read_excel_from_s3 +from backend.app.db.connection import db_engine + +from backend.app.db.functions.recommendations_functions import create_scenario logger = setup_logger() @@ -26,21 +36,86 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest): settings = get_settings() - # Serialize the PlanTriggerRequest into JSON try: - message_body = body.model_dump_json() + data = body.model_dump() except Exception as e: - logger.error("Failed to serialize request body: %s", e) + logger.error("Failed to parse request body: %s", e) return {"message": "Invalid request"}, 400 - try: - response = sqs_client.send_message( - QueueUrl=settings.ENGINE_SQS_URL, - MessageBody=message_body - ) - logger.info(f"SQS message sent. Message ID: {response.get('MessageId')}") - except Exception as e: - logger.error("Failed to send SQS message: %s", e) - return {"message": "Failed to trigger engine"}, 500 + # If file_format is domna_asset_list and type is xlsx, read and chunk it + if data.get("file_format") == "domna_asset_list" and data.get("file_type") == "xlsx": + try: + input_data: pd.DataFrame = read_excel_from_s3( + bucket_name=settings.PLAN_TRIGGER_BUCKET, + file_key=data.get("trigger_file_path"), + sheet_name=data.get("sheet_name"), + header_row=0 + ) + + total_rows = len(input_data) + chunk_size = 30 + total_chunks = math.ceil(total_rows / chunk_size) + + # We also need to create a new scenario and pass it to the SQS messages, if one doesn't + # exist + scenario_id = data.get("scenario_id") + if not scenario_id: + created_at = datetime.now().isoformat() + session = sessionmaker(bind=db_engine)() + + # Create a new scenario + new_scenario = create_scenario( + session=session, + scenario={ + "name": body.scenario_name, + "created_at": created_at, + "budget": body.budget, + "portfolio_id": body.portfolio_id, + "housing_type": body.housing_type, + "goal": body.goal, + "goal_value": body.goal_value, + "trigger_file_path": body.trigger_file_path, + "already_installed_file_path": body.already_installed_file_path, + "patches_file_path": body.patches_file_path, + "non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path, + "exclusions": body.exclusions, + "multi_plan": body.multi_plan + } + ) + scenario_id = new_scenario.id + # Insert the scenario ID into the data payload + data["scenario_id"] = scenario_id + + for i in range(total_chunks): + index_start = i * chunk_size + index_end = min((i + 1) * chunk_size, total_rows) + + message_payload = {**data, "index_start": index_start, "index_end": index_end} + + message_body = json.dumps(message_payload) + + response = sqs_client.send_message( + QueueUrl=settings.ENGINE_SQS_URL, + MessageBody=message_body + ) + logger.info( + f"Chunk {i} sent to SQS. Rows {index_start}–{index_end}. Message ID: {response.get('MessageId')}") + + except Exception as e: + logger.error("Error during Excel file handling: %s", e) + return {"message": "Failed to process asset list"}, 500 + + else: + # Fallback: Just send a single message + try: + message_body = json.dumps(data) + response = sqs_client.send_message( + QueueUrl=settings.ENGINE_SQS_URL, + MessageBody=message_body + ) + logger.info(f"SQS message sent. Message ID: {response.get('MessageId')}") + except Exception as e: + logger.error("Failed to send SQS message: %s", e) + return {"message": "Failed to trigger engine"}, 500 return {"message": "Plan job accepted"} diff --git a/backend/app/plan/schemas.py b/backend/app/plan/schemas.py index 2a388b2f..a6d21ae7 100644 --- a/backend/app/plan/schemas.py +++ b/backend/app/plan/schemas.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, Field, BeforeValidator +from pydantic import BaseModel, Field, BeforeValidator, model_validator from typing import Annotated, List, Optional, Literal # Example constants for validation @@ -104,7 +104,16 @@ class PlanTriggerRequest(BaseModel): simulate_sap_10: Optional[bool] = False # Add in optional fields which describe the format of the asset list being used - - file_type: Optional[Literal["csv", "xlsx"]] = None, - file_format: Optional[Literal["domna_asset_list"]] = None, + + file_type: Optional[Literal["csv", "xlsx"]] = None + file_format: Optional[Literal["domna_asset_list"]] = None sheet_name: Optional[str] = None + # If one of index_start or index_end is set, the other must be set too + index_start: Optional[int] = None + index_end: Optional[int] = None + + @model_validator(mode="after") + def check_indexes(self): + if (self.index_start is None) != (self.index_end is None): + raise ValueError("Both index_start and index_end must be set or both must be None") + return self diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 318f4a0e..0591eed6 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -4,6 +4,7 @@ from datetime import datetime from tqdm import tqdm import pandas as pd +import numpy as np from etl.epc.Record import EPCRecord from backend.SearchEpc import SearchEpc from sqlalchemy.exc import IntegrityError, OperationalError @@ -37,7 +38,7 @@ from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.Recommendations import Recommendations from utils.logger import setup_logger -from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 +from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 from backend.ml_models.Valuation import PropertyValuation from etl.bill_savings.KwhData import KwhData @@ -435,7 +436,69 @@ async def model_engine(body: PlanTriggerRequest): try: session.begin() logger.info("Getting the inputs") - plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) + + if body.file_type == "xlsx": + plan_input = read_excel_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, + file_key=body.trigger_file_path, + sheet_name=body.sheet_name, + header_row=0, + ) + + # We now handle the case where the input data is a Domna standardised assset list + if body.file_format == "domna_asset_list": + # We rename the columns to match the expected format + plan_input = plan_input.rename( + columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"} + ) + # Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remote UPRN + plan_input["uprn"] = np.where(plan_input["estimated"].isin([1, True]), None, plan_input["uprn"]) + # We handle the landlord property type and built form + plan_input["property_type"] = plan_input["landlord_property_type"].copy() + plan_input["built_form"] = plan_input["landlord_built_form"].copy() + plan_input["property_type"] = np.where( + plan_input["property_type"] == "unknown", + plan_input["epc_property_type"], + plan_input["property_type"] + ) + plan_input["built_form"] = np.where( + plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"] + ) + property_type_map = { + "house": "House", + "flat": "Flat", + "maisonette": "Maisonette", + "bungalow": "Bungalow", + "block house": "House", + "coach house": "House", + "bedsit": "Flat" + } + + built_form_map = { + "mid-terrace": "Mid-Terrace", + "end-terrace": "End-Terrace", + "semi-detached": "Semi-Detached", + "detached": "Detached", + "enclosed end-terrace": "Enclosed End-Terrace", + "enclosed mid-terrace": "Enclosed Mid-Terrace", + } + # We remap the values to match the EPC expected formats + plan_input["property_type"] = plan_input["property_type"].map(property_type_map) + plan_input["built_form"] = plan_input["built_form"].map(built_form_map) + + plan_input = plan_input.to_dict("records") + else: + raise ValueError("Other formats not yet supported") + + else: + plan_input = read_csv_from_s3( + bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path + ) + + # We then slide it on the indexes if they are provided + if body.index_start is not None and body.index_end is not None: + plan_input = plan_input[body.index_start:body.index_end] + # Check for duplicate UPRNS input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")] @@ -453,8 +516,18 @@ async def model_engine(body: PlanTriggerRequest): input_properties = [] for config in tqdm(plan_input): + + if config["landlord_property_id"] in ["LE113NWIC95", "NG241FBCT", "NG51BNIC"]: + continue + + if not pd.isnull(config.get("uprn")): + if int(float(config.get("uprn"))) < 0: + continue + # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly uprn = config.get("uprn", None) + if pd.isnull(uprn): + uprn = None if uprn: uprn = int(float(uprn)) @@ -469,6 +542,9 @@ async def model_engine(body: PlanTriggerRequest): epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) # For the moment, our OS API access is unavailable, so we skip and interpolate epc_searcher.find_property(skip_os=True) + # TODO: Placeholder + if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list": + epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED # We check for an energy assessment we have performed on this property: energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 83a85b78..5d3720fc 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -1,9 +1,16 @@ -import numpy as np import pandas as pd from typing import List from etl.epc.Record import EPCDifferenceRecord from etl.epc.ValidationConfiguration import DatasetValidationConfiguration from etl.epc.settings import EARLIEST_EPC_DATE +from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes +from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes +from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes +from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes +from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes +from etl.epc_clean.epc_attributes.MainheatControlAttributes import MainheatControlAttributes +from etl.epc_clean.epc_attributes.WindowAttributes import WindowAttributes +from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes from recommendations.rdsap_tables import england_wales_age_band_lookup from recommendations.recommendation_utils import ( @@ -492,6 +499,7 @@ class TrainingDataset(BaseDataset): """ if component == "walls": + expanded_df = expanded_df[ (expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) & ( @@ -657,6 +665,17 @@ class TrainingDataset(BaseDataset): components_to_expand = cols_to_drop.keys() + cleaning_lookup = { + "walls": WallAttributes, + "floor": FloorAttributes, + "roof": RoofAttributes, + "hotwater": HotWaterAttributes, + "mainheat": MainHeatAttributes, + "mainheatcont": MainheatControlAttributes, + "windows": WindowAttributes, + "main-fuel": MainFuelAttributes, + } + for component in components_to_expand: # TODO: change cleaned dataframe to have underscores instead of dashes if component == "main-fuel": @@ -675,6 +694,35 @@ class TrainingDataset(BaseDataset): cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key]) + # We handle a specific edge case where we're missing information for the original description + descriptions = [x for x in self.df[left_on_starting].unique() if pd.notnull(x)] + # take any not in the cleaned lookup + missing_descriptions = [ + x for x in descriptions if x not in cleaned_lookup_df_for_key["original_description"].values + ] + if missing_descriptions: + # We handle them here + cleaner = cleaning_lookup[component] + cleaned_data = [] + for x in missing_descriptions: + desc_cleaner = cleaner(x) + cleaned = desc_cleaner.process() + cleaned_data.append( + { + "original_description": x, + "clean_description": desc_cleaner.description.replace("(assumed)", + "").rstrip().capitalize(), + **cleaned + } + ) + cleaned_lookup_df_for_key = pd.concat( + [ + cleaned_lookup_df_for_key, + pd.DataFrame(cleaned_data), + ], + ignore_index=True, + ) + expanded_df = self.df.merge( cleaned_lookup_df_for_key, how="left", diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index 766de840..21794284 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -684,6 +684,7 @@ class RetrieveFindMyEpc: ], "Increase loft insulation to 250mm": ["loft_insulation"], "Solar photovoltaics panels, 25% of roof area": ["solar_pv"], + 'Air or ground source heat pump': ["air_source_heat_pump"], } survey = True diff --git a/etl/spatial/OpenUprnClient.py b/etl/spatial/OpenUprnClient.py index c0cd3992..36cf2d7b 100644 --- a/etl/spatial/OpenUprnClient.py +++ b/etl/spatial/OpenUprnClient.py @@ -139,7 +139,7 @@ class OpenUprnClient: uprn_filenames = read_dataframe_from_s3_parquet( bucket_name=bucket_name, file_key="spatial/filename_meta.parquet" ) - + # If we have a domna asset list, we uprns = [p.uprn for p in input_properties if p.uprn_source != SearchEpc.UPRN_SOURCE_SIMULATED] uprn_map = cls.make_uprn_map(uprns, uprn_filenames) diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index dbb7d674..3a2815bc 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -498,24 +498,33 @@ class WallRecommendations(Definitions): Helper function to set the starting simulation config """ - simulation_config = {} - if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]: - if wall_ending_config["is_cavity_wall"]: - efficiency_data = [ - x for x in cavity_wall_energy_eff if - x["construction-age-band"] == self.property.construction_age_band - ][0] - elif wall_ending_config["internal_insulation"]: - efficiency_data = [ - x for x in iwi_energy_eff if - x["construction-age-band"] == self.property.construction_age_band - ][0] - else: - efficiency_data = [ - x for x in ewi_energy_eff if - x["construction-age-band"] == self.property.construction_age_band - ][0] + if wall_ending_config["is_cavity_wall"]: + efficiency_data = [ + x for x in cavity_wall_energy_eff if + x["construction-age-band"] == self.property.construction_age_band + ][0] + elif wall_ending_config["internal_insulation"]: + efficiency_data = [ + x for x in iwi_energy_eff if + x["construction-age-band"] == self.property.construction_age_band + ][0] + else: + efficiency_data = [ + x for x in ewi_energy_eff if + x["construction-age-band"] == self.property.construction_age_band + ][0] + if self.property.data["walls-energy-eff"] == "Good" and efficiency_data["walls-energy-eff"] not in [ + "Good", "Very Good" + ]: + simulation_config = { + "walls_energy_eff_ending": self.property.data["walls-energy-eff"] + } + elif self.property.data["walls-energy-eff"] == "Very Good": + simulation_config = { + "walls_energy_eff_ending": "Very Good" + } + else: simulation_config = { "walls_energy_eff_ending": efficiency_data["walls-energy-eff"] } diff --git a/utils/s3.py b/utils/s3.py index 1a686b55..e70669d0 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -198,7 +198,7 @@ def read_pickle_from_s3(bucket_name, s3_file_name): return data -def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True): +def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None): """ Read an Excel file from an S3 bucket and return it as a pandas DataFrame. @@ -206,6 +206,7 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True): :param file_key: Key of the file (including directory path within the bucket). :param header_row: The row number to use as the header (0-indexed). :param drop_all_na: Whether to drop columns where all values are NaN. + :param sheet_name: The name of the sheet to read from the Excel file. If None, reads the first sheet. :return: A pandas DataFrame containing the data from the Excel file. """ @@ -217,7 +218,7 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True): excel_buffer = read_io_from_s3(bucket_name, file_key) # Read the Excel file into a pandas DataFrame - df = pd.read_excel(excel_buffer, header=header_row) + df = pd.read_excel(excel_buffer, header=header_row, sheet_name=sheet_name) # Drop columns where all values are NaN if drop_all_na: