mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
commit
324fe04e40
10 changed files with 261 additions and 42 deletions
|
|
@ -292,9 +292,7 @@ class Property:
|
|||
self.epc_record, fixed_data
|
||||
)
|
||||
|
||||
self.base_difference_record = TrainingDataset(
|
||||
datasets=[difference_record], cleaned_lookup=cleaned_lookup
|
||||
)
|
||||
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
|
||||
|
||||
# If we have variables that have been given to us by the landlord that we know are correct, whereas the EPC
|
||||
# may not be, we use them
|
||||
|
|
|
|||
|
|
@ -71,6 +71,8 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
|
|||
},
|
||||
'Electric instantaneous at point of use, plus solar': {"fuel": "Electricity + Solar Thermal", "cop": 1},
|
||||
"Electric storage heaters, Room heaters, electric": {"fuel": "Electricity", "cop": 1},
|
||||
'Boiler and underfloor heating, oil': {"fuel": "Oil", "cop": 0.85},
|
||||
"Boiler and radiators, smokeless fuel": {"fuel": "Smokeless Fuel", "cop": 0.85},
|
||||
}
|
||||
|
||||
# These are the measure types where if there is a ventilation recommendation, we force the inclusion of it
|
||||
|
|
|
|||
|
|
@ -1,9 +1,19 @@
|
|||
import boto3
|
||||
import json
|
||||
import math
|
||||
from datetime import datetime
|
||||
|
||||
import pandas as pd
|
||||
from fastapi import APIRouter, Depends
|
||||
from backend.app.dependencies import validate_token
|
||||
from backend.app.plan.schemas import PlanTriggerRequest
|
||||
from backend.app.config import get_settings
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from utils.logger import setup_logger
|
||||
from utils.s3 import read_excel_from_s3
|
||||
from backend.app.db.connection import db_engine
|
||||
|
||||
from backend.app.db.functions.recommendations_functions import create_scenario
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
|
@ -26,21 +36,86 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
|
|||
|
||||
settings = get_settings()
|
||||
|
||||
# Serialize the PlanTriggerRequest into JSON
|
||||
try:
|
||||
message_body = body.model_dump_json()
|
||||
data = body.model_dump()
|
||||
except Exception as e:
|
||||
logger.error("Failed to serialize request body: %s", e)
|
||||
logger.error("Failed to parse request body: %s", e)
|
||||
return {"message": "Invalid request"}, 400
|
||||
|
||||
try:
|
||||
response = sqs_client.send_message(
|
||||
QueueUrl=settings.ENGINE_SQS_URL,
|
||||
MessageBody=message_body
|
||||
)
|
||||
logger.info(f"SQS message sent. Message ID: {response.get('MessageId')}")
|
||||
except Exception as e:
|
||||
logger.error("Failed to send SQS message: %s", e)
|
||||
return {"message": "Failed to trigger engine"}, 500
|
||||
# If file_format is domna_asset_list and type is xlsx, read and chunk it
|
||||
if data.get("file_format") == "domna_asset_list" and data.get("file_type") == "xlsx":
|
||||
try:
|
||||
input_data: pd.DataFrame = read_excel_from_s3(
|
||||
bucket_name=settings.PLAN_TRIGGER_BUCKET,
|
||||
file_key=data.get("trigger_file_path"),
|
||||
sheet_name=data.get("sheet_name"),
|
||||
header_row=0
|
||||
)
|
||||
|
||||
total_rows = len(input_data)
|
||||
chunk_size = 30
|
||||
total_chunks = math.ceil(total_rows / chunk_size)
|
||||
|
||||
# We also need to create a new scenario and pass it to the SQS messages, if one doesn't
|
||||
# exist
|
||||
scenario_id = data.get("scenario_id")
|
||||
if not scenario_id:
|
||||
created_at = datetime.now().isoformat()
|
||||
session = sessionmaker(bind=db_engine)()
|
||||
|
||||
# Create a new scenario
|
||||
new_scenario = create_scenario(
|
||||
session=session,
|
||||
scenario={
|
||||
"name": body.scenario_name,
|
||||
"created_at": created_at,
|
||||
"budget": body.budget,
|
||||
"portfolio_id": body.portfolio_id,
|
||||
"housing_type": body.housing_type,
|
||||
"goal": body.goal,
|
||||
"goal_value": body.goal_value,
|
||||
"trigger_file_path": body.trigger_file_path,
|
||||
"already_installed_file_path": body.already_installed_file_path,
|
||||
"patches_file_path": body.patches_file_path,
|
||||
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
|
||||
"exclusions": body.exclusions,
|
||||
"multi_plan": body.multi_plan
|
||||
}
|
||||
)
|
||||
scenario_id = new_scenario.id
|
||||
# Insert the scenario ID into the data payload
|
||||
data["scenario_id"] = scenario_id
|
||||
|
||||
for i in range(total_chunks):
|
||||
index_start = i * chunk_size
|
||||
index_end = min((i + 1) * chunk_size, total_rows)
|
||||
|
||||
message_payload = {**data, "index_start": index_start, "index_end": index_end}
|
||||
|
||||
message_body = json.dumps(message_payload)
|
||||
|
||||
response = sqs_client.send_message(
|
||||
QueueUrl=settings.ENGINE_SQS_URL,
|
||||
MessageBody=message_body
|
||||
)
|
||||
logger.info(
|
||||
f"Chunk {i} sent to SQS. Rows {index_start}–{index_end}. Message ID: {response.get('MessageId')}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error during Excel file handling: %s", e)
|
||||
return {"message": "Failed to process asset list"}, 500
|
||||
|
||||
else:
|
||||
# Fallback: Just send a single message
|
||||
try:
|
||||
message_body = json.dumps(data)
|
||||
response = sqs_client.send_message(
|
||||
QueueUrl=settings.ENGINE_SQS_URL,
|
||||
MessageBody=message_body
|
||||
)
|
||||
logger.info(f"SQS message sent. Message ID: {response.get('MessageId')}")
|
||||
except Exception as e:
|
||||
logger.error("Failed to send SQS message: %s", e)
|
||||
return {"message": "Failed to trigger engine"}, 500
|
||||
|
||||
return {"message": "Plan job accepted"}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from pydantic import BaseModel, Field, BeforeValidator
|
||||
from pydantic import BaseModel, Field, BeforeValidator, model_validator
|
||||
from typing import Annotated, List, Optional, Literal
|
||||
|
||||
# Example constants for validation
|
||||
|
|
@ -104,7 +104,16 @@ class PlanTriggerRequest(BaseModel):
|
|||
simulate_sap_10: Optional[bool] = False
|
||||
|
||||
# Add in optional fields which describe the format of the asset list being used
|
||||
|
||||
file_type: Optional[Literal["csv", "xlsx"]] = None,
|
||||
file_format: Optional[Literal["domna_asset_list"]] = None,
|
||||
|
||||
file_type: Optional[Literal["csv", "xlsx"]] = None
|
||||
file_format: Optional[Literal["domna_asset_list"]] = None
|
||||
sheet_name: Optional[str] = None
|
||||
# If one of index_start or index_end is set, the other must be set too
|
||||
index_start: Optional[int] = None
|
||||
index_end: Optional[int] = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def check_indexes(self):
|
||||
if (self.index_start is None) != (self.index_end is None):
|
||||
raise ValueError("Both index_start and index_end must be set or both must be None")
|
||||
return self
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from datetime import datetime
|
|||
|
||||
from tqdm import tqdm
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from etl.epc.Record import EPCRecord
|
||||
from backend.SearchEpc import SearchEpc
|
||||
from sqlalchemy.exc import IntegrityError, OperationalError
|
||||
|
|
@ -37,7 +38,7 @@ from recommendations.optimiser.GainOptimiser import GainOptimiser
|
|||
from recommendations.optimiser.optimiser_functions import prepare_input_measures
|
||||
from recommendations.Recommendations import Recommendations
|
||||
from utils.logger import setup_logger
|
||||
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
|
||||
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
|
||||
from backend.ml_models.Valuation import PropertyValuation
|
||||
|
||||
from etl.bill_savings.KwhData import KwhData
|
||||
|
|
@ -435,7 +436,69 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
try:
|
||||
session.begin()
|
||||
logger.info("Getting the inputs")
|
||||
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
|
||||
|
||||
if body.file_type == "xlsx":
|
||||
plan_input = read_excel_from_s3(
|
||||
bucket_name=get_settings().PLAN_TRIGGER_BUCKET,
|
||||
file_key=body.trigger_file_path,
|
||||
sheet_name=body.sheet_name,
|
||||
header_row=0,
|
||||
)
|
||||
|
||||
# We now handle the case where the input data is a Domna standardised assset list
|
||||
if body.file_format == "domna_asset_list":
|
||||
# We rename the columns to match the expected format
|
||||
plan_input = plan_input.rename(
|
||||
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
|
||||
)
|
||||
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remote UPRN
|
||||
plan_input["uprn"] = np.where(plan_input["estimated"].isin([1, True]), None, plan_input["uprn"])
|
||||
# We handle the landlord property type and built form
|
||||
plan_input["property_type"] = plan_input["landlord_property_type"].copy()
|
||||
plan_input["built_form"] = plan_input["landlord_built_form"].copy()
|
||||
plan_input["property_type"] = np.where(
|
||||
plan_input["property_type"] == "unknown",
|
||||
plan_input["epc_property_type"],
|
||||
plan_input["property_type"]
|
||||
)
|
||||
plan_input["built_form"] = np.where(
|
||||
plan_input["built_form"] == "unknown", plan_input["epc_archetype"], plan_input["built_form"]
|
||||
)
|
||||
property_type_map = {
|
||||
"house": "House",
|
||||
"flat": "Flat",
|
||||
"maisonette": "Maisonette",
|
||||
"bungalow": "Bungalow",
|
||||
"block house": "House",
|
||||
"coach house": "House",
|
||||
"bedsit": "Flat"
|
||||
}
|
||||
|
||||
built_form_map = {
|
||||
"mid-terrace": "Mid-Terrace",
|
||||
"end-terrace": "End-Terrace",
|
||||
"semi-detached": "Semi-Detached",
|
||||
"detached": "Detached",
|
||||
"enclosed end-terrace": "Enclosed End-Terrace",
|
||||
"enclosed mid-terrace": "Enclosed Mid-Terrace",
|
||||
}
|
||||
# We remap the values to match the EPC expected formats
|
||||
plan_input["property_type"] = plan_input["property_type"].map(property_type_map)
|
||||
plan_input["built_form"] = plan_input["built_form"].map(built_form_map)
|
||||
|
||||
plan_input = plan_input.to_dict("records")
|
||||
else:
|
||||
raise ValueError("Other formats not yet supported")
|
||||
|
||||
else:
|
||||
plan_input = read_csv_from_s3(
|
||||
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path
|
||||
)
|
||||
|
||||
# We then slide it on the indexes if they are provided
|
||||
if body.index_start is not None and body.index_end is not None:
|
||||
plan_input = plan_input[body.index_start:body.index_end]
|
||||
|
||||
# Check for duplicate UPRNS
|
||||
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
|
||||
|
||||
|
|
@ -453,8 +516,18 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
|
||||
input_properties = []
|
||||
for config in tqdm(plan_input):
|
||||
|
||||
if config["landlord_property_id"] in ["LE113NWIC95", "NG241FBCT", "NG51BNIC"]:
|
||||
continue
|
||||
|
||||
if not pd.isnull(config.get("uprn")):
|
||||
if int(float(config.get("uprn"))) < 0:
|
||||
continue
|
||||
|
||||
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
||||
uprn = config.get("uprn", None)
|
||||
if pd.isnull(uprn):
|
||||
uprn = None
|
||||
if uprn:
|
||||
uprn = int(float(uprn))
|
||||
|
||||
|
|
@ -469,6 +542,9 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
|
||||
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
||||
epc_searcher.find_property(skip_os=True)
|
||||
# TODO: Placeholder
|
||||
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list":
|
||||
epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED
|
||||
|
||||
# We check for an energy assessment we have performed on this property:
|
||||
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
|
||||
|
|
|
|||
|
|
@ -1,9 +1,16 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import List
|
||||
from etl.epc.Record import EPCDifferenceRecord
|
||||
from etl.epc.ValidationConfiguration import DatasetValidationConfiguration
|
||||
from etl.epc.settings import EARLIEST_EPC_DATE
|
||||
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
|
||||
from etl.epc_clean.epc_attributes.FloorAttributes import FloorAttributes
|
||||
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
|
||||
from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes
|
||||
from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes
|
||||
from etl.epc_clean.epc_attributes.MainheatControlAttributes import MainheatControlAttributes
|
||||
from etl.epc_clean.epc_attributes.WindowAttributes import WindowAttributes
|
||||
from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes
|
||||
|
||||
from recommendations.rdsap_tables import england_wales_age_band_lookup
|
||||
from recommendations.recommendation_utils import (
|
||||
|
|
@ -492,6 +499,7 @@ class TrainingDataset(BaseDataset):
|
|||
"""
|
||||
|
||||
if component == "walls":
|
||||
|
||||
expanded_df = expanded_df[
|
||||
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"])
|
||||
& (
|
||||
|
|
@ -657,6 +665,17 @@ class TrainingDataset(BaseDataset):
|
|||
|
||||
components_to_expand = cols_to_drop.keys()
|
||||
|
||||
cleaning_lookup = {
|
||||
"walls": WallAttributes,
|
||||
"floor": FloorAttributes,
|
||||
"roof": RoofAttributes,
|
||||
"hotwater": HotWaterAttributes,
|
||||
"mainheat": MainHeatAttributes,
|
||||
"mainheatcont": MainheatControlAttributes,
|
||||
"windows": WindowAttributes,
|
||||
"main-fuel": MainFuelAttributes,
|
||||
}
|
||||
|
||||
for component in components_to_expand:
|
||||
# TODO: change cleaned dataframe to have underscores instead of dashes
|
||||
if component == "main-fuel":
|
||||
|
|
@ -675,6 +694,35 @@ class TrainingDataset(BaseDataset):
|
|||
|
||||
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
|
||||
|
||||
# We handle a specific edge case where we're missing information for the original description
|
||||
descriptions = [x for x in self.df[left_on_starting].unique() if pd.notnull(x)]
|
||||
# take any not in the cleaned lookup
|
||||
missing_descriptions = [
|
||||
x for x in descriptions if x not in cleaned_lookup_df_for_key["original_description"].values
|
||||
]
|
||||
if missing_descriptions:
|
||||
# We handle them here
|
||||
cleaner = cleaning_lookup[component]
|
||||
cleaned_data = []
|
||||
for x in missing_descriptions:
|
||||
desc_cleaner = cleaner(x)
|
||||
cleaned = desc_cleaner.process()
|
||||
cleaned_data.append(
|
||||
{
|
||||
"original_description": x,
|
||||
"clean_description": desc_cleaner.description.replace("(assumed)",
|
||||
"").rstrip().capitalize(),
|
||||
**cleaned
|
||||
}
|
||||
)
|
||||
cleaned_lookup_df_for_key = pd.concat(
|
||||
[
|
||||
cleaned_lookup_df_for_key,
|
||||
pd.DataFrame(cleaned_data),
|
||||
],
|
||||
ignore_index=True,
|
||||
)
|
||||
|
||||
expanded_df = self.df.merge(
|
||||
cleaned_lookup_df_for_key,
|
||||
how="left",
|
||||
|
|
|
|||
|
|
@ -684,6 +684,7 @@ class RetrieveFindMyEpc:
|
|||
],
|
||||
"Increase loft insulation to 250mm": ["loft_insulation"],
|
||||
"Solar photovoltaics panels, 25% of roof area": ["solar_pv"],
|
||||
'Air or ground source heat pump': ["air_source_heat_pump"],
|
||||
}
|
||||
|
||||
survey = True
|
||||
|
|
|
|||
|
|
@ -139,7 +139,7 @@ class OpenUprnClient:
|
|||
uprn_filenames = read_dataframe_from_s3_parquet(
|
||||
bucket_name=bucket_name, file_key="spatial/filename_meta.parquet"
|
||||
)
|
||||
|
||||
# If we have a domna asset list, we
|
||||
uprns = [p.uprn for p in input_properties if p.uprn_source != SearchEpc.UPRN_SOURCE_SIMULATED]
|
||||
uprn_map = cls.make_uprn_map(uprns, uprn_filenames)
|
||||
|
||||
|
|
|
|||
|
|
@ -498,24 +498,33 @@ class WallRecommendations(Definitions):
|
|||
Helper function to set the starting simulation config
|
||||
"""
|
||||
|
||||
simulation_config = {}
|
||||
if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]:
|
||||
if wall_ending_config["is_cavity_wall"]:
|
||||
efficiency_data = [
|
||||
x for x in cavity_wall_energy_eff if
|
||||
x["construction-age-band"] == self.property.construction_age_band
|
||||
][0]
|
||||
elif wall_ending_config["internal_insulation"]:
|
||||
efficiency_data = [
|
||||
x for x in iwi_energy_eff if
|
||||
x["construction-age-band"] == self.property.construction_age_band
|
||||
][0]
|
||||
else:
|
||||
efficiency_data = [
|
||||
x for x in ewi_energy_eff if
|
||||
x["construction-age-band"] == self.property.construction_age_band
|
||||
][0]
|
||||
if wall_ending_config["is_cavity_wall"]:
|
||||
efficiency_data = [
|
||||
x for x in cavity_wall_energy_eff if
|
||||
x["construction-age-band"] == self.property.construction_age_band
|
||||
][0]
|
||||
elif wall_ending_config["internal_insulation"]:
|
||||
efficiency_data = [
|
||||
x for x in iwi_energy_eff if
|
||||
x["construction-age-band"] == self.property.construction_age_band
|
||||
][0]
|
||||
else:
|
||||
efficiency_data = [
|
||||
x for x in ewi_energy_eff if
|
||||
x["construction-age-band"] == self.property.construction_age_band
|
||||
][0]
|
||||
|
||||
if self.property.data["walls-energy-eff"] == "Good" and efficiency_data["walls-energy-eff"] not in [
|
||||
"Good", "Very Good"
|
||||
]:
|
||||
simulation_config = {
|
||||
"walls_energy_eff_ending": self.property.data["walls-energy-eff"]
|
||||
}
|
||||
elif self.property.data["walls-energy-eff"] == "Very Good":
|
||||
simulation_config = {
|
||||
"walls_energy_eff_ending": "Very Good"
|
||||
}
|
||||
else:
|
||||
simulation_config = {
|
||||
"walls_energy_eff_ending": efficiency_data["walls-energy-eff"]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -198,7 +198,7 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
|
|||
return data
|
||||
|
||||
|
||||
def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True):
|
||||
def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None):
|
||||
"""
|
||||
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
|
||||
|
||||
|
|
@ -206,6 +206,7 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True):
|
|||
:param file_key: Key of the file (including directory path within the bucket).
|
||||
:param header_row: The row number to use as the header (0-indexed).
|
||||
:param drop_all_na: Whether to drop columns where all values are NaN.
|
||||
:param sheet_name: The name of the sheet to read from the Excel file. If None, reads the first sheet.
|
||||
:return: A pandas DataFrame containing the data from the Excel file.
|
||||
"""
|
||||
|
||||
|
|
@ -217,7 +218,7 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True):
|
|||
excel_buffer = read_io_from_s3(bucket_name, file_key)
|
||||
|
||||
# Read the Excel file into a pandas DataFrame
|
||||
df = pd.read_excel(excel_buffer, header=header_row)
|
||||
df = pd.read_excel(excel_buffer, header=header_row, sheet_name=sheet_name)
|
||||
|
||||
# Drop columns where all values are NaN
|
||||
if drop_all_na:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue