Merge pull request #616 from Hestia-Homes/eco-eligiblity-bug

Eco eligiblity bug
This commit is contained in:
KhalimCK 2025-12-14 23:15:05 +08:00 committed by GitHub
commit 873ca92948
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 225 additions and 227 deletions

View file

@ -611,7 +611,10 @@ class Property:
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
# Handling edge case for walls
fill_with = False if description == "walls-description" else None
fill_dict = dict(zip(template.keys(), [fill_with] * len(template)))
fill_dict.update(
{
"original_description": self.data[description],
@ -1238,6 +1241,9 @@ class Property:
"electricity": "Electricity",
"biogas": "Smokeless Fuel",
"heat network": "Natural Gas (Community Scheme)",
"lpg": 'LPG',
"biodiesel": "Smokeless Fuel",
"b30d": "B30K Biofuel"
}
self.heating_energy_source = list({

View file

@ -12,7 +12,17 @@ db_string = connection_string.format(
dbname=get_settings().DB_NAME,
)
db_engine = create_engine(db_string, pool_size=5, max_overflow=5)
# db_engine = create_engine(db_string, pool_size=5, max_overflow=5)
# Adjusted database connection to decease pool size for serverless environments (from lambda) so that
# each lambda doesn't hog all connections
db_engine = create_engine(
db_string,
pool_size=1,
max_overflow=0, # Limit the number of extra connections. With this and pool size, we allow 1 connection per lambda
pool_pre_ping=True,
pool_recycle=300, # Forces SQLAlchemy to close and reopen any connection older than 300 seconds
)
def get_db_session():

View file

@ -238,7 +238,7 @@ def build_cloudwatch_log_url(start_ms: int) -> str:
)
def handle_error(session, msg, e, subtask_id, status=500, start_ms=None):
def handle_error(msg, e, subtask_id, status=500, start_ms=None):
# When the pipeline fails, handles error process
cloud_logs_url = build_cloudwatch_log_url(start_ms)
@ -249,5 +249,4 @@ def handle_error(session, msg, e, subtask_id, status=500, start_ms=None):
cloud_logs_url=cloud_logs_url
)
logger.error(msg, exc_info=True)
session.rollback()
return Response(status_code=status, content=msg)

View file

@ -10,6 +10,8 @@ from uuid import UUID
from backend.Funding import Funding
from backend.SearchEpc import SearchEpc
from contextlib import contextmanager
from sqlmodel import Session
from etl.epc.Record import EPCRecord
from sqlalchemy.exc import IntegrityError, OperationalError
@ -516,16 +518,59 @@ def averages_cleaning(prepared_epc: EPCRecord, cleaning_data: pd.DataFrame):
return prepared_epc
def extract_address_data(config, body):
"""
Simple helper to grab address data from the config
:return:
"""
uprn = config.get("uprn", None)
if pd.isnull(uprn):
uprn = None
if uprn:
uprn = int(float(uprn))
address1 = config.get("address", None)
# Handle domna address list format
if pd.isnull(address1) and body.file_format == "domna_asset_list":
address1 = config.get("domna_address_1", None)
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
full_address = config.get("domna_full_address", "") if body.file_format == "domna_asset_list" else None
if not isinstance(full_address, str): # Catch for when the full address is nan
full_address = None
return uprn, address1, full_address
@contextmanager
def db_session():
session = Session(db_engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
@contextmanager
def db_read_session():
session = Session(db_engine, expire_on_commit=False)
try:
yield session
finally:
session.close()
async def model_engine(body: PlanTriggerRequest):
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
start_ms = int(time.time() * 1000)
try:
session.begin()
logger.info("Getting the inputs")
if body.file_type == "xlsx":
@ -641,40 +686,32 @@ async def model_engine(body: PlanTriggerRequest):
input_properties, inspections_map, eco_packages = [], {}, {}
for config in tqdm(plan_input):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if pd.isnull(uprn):
uprn = None
if uprn:
uprn = int(float(uprn))
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
if uprn:
# if we have a UPRN, we check if we already have EPC data associated with this UPRN
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
if epc_cache["status"] == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
address1 = config.get("address", None)
# Handle domna address list format
if pd.isnull(address1) and body.file_format == "domna_asset_list":
address1 = config.get("domna_address_1", None)
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
full_address = config.get("domna_full_address", "") if body.file_format == "domna_asset_list" else None
if not isinstance(full_address, str): # Catch for when the full address is nan
full_address = None
uprn, address1, full_address = extract_address_data(config, body)
heating_system = parse_heating_system(config)
associated_uprns = []
if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat":
# We're running a remote assessment for a flat - we go and grab the associated
# UPRNS for other units in the same building
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
# ---------- 1) fetch data ----------
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
with db_read_session() as session:
epc_cache = {}
if uprn:
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
# For remote assessments of flats, we get associated UPRNs
associated_uprns = []
if body.event_type == "remote_assessment" and config.get("property_type") == "Flat":
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
# We check for an energy assessment we have performed on this property:
energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn(
session, uprn
)
# Extract from EPC cache
if epc_cache.get("status") == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
epc_searcher = SearchEpc(
address1=address1,
postcode=config["postcode"],
@ -692,28 +729,25 @@ async def model_engine(body: PlanTriggerRequest):
epc_searcher.find_property(skip_os=True, api_data=epc_api_data, overwrite_sap05=True)
epc_searcher.set_uprn_source(file_format=body.file_format)
# We check for an energy assessment we have performed on this property:
energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn(
session, uprn if uprn is not None else epc_searcher.uprn
)
# ---------- 2) ensure property exists ----------
with db_session() as session:
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment,
landlord_property_id=config.get("landlord_property_id")
)
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment, landlord_property_id=config.get("landlord_property_id")
)
if not property_id:
continue
if not is_new and not body.multi_plan:
if not property_id or (not is_new and not body.multi_plan):
continue
if is_new:
db_funcs.property_functions.create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
with db_session() as session:
db_funcs.property_functions.create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
# If we have an energy assessment in place, that is newer than all of the previous EPCs, we use that.
# Otherwise, we use the newest EPC
@ -789,18 +823,18 @@ async def model_engine(body: PlanTriggerRequest):
# 2) A real EPC
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
# We store this data
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn
):
# We store the EPC data we have found for this property
db_funcs.epc_functions.EpcStoreService.upsert_epc_data(
session=session,
uprn=epc_searcher.uprn,
epc_api=epc_searcher.data,
epc_page=epc_page_source.get("page_source"),
epc_page_rrn=epc_page_source.get("rrn"),
)
with db_session() as session:
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn
):
# We store the EPC data we have found for this property
db_funcs.epc_functions.EpcStoreService.upsert_epc_data(
session=session,
uprn=epc_searcher.uprn,
epc_api=epc_searcher.data,
epc_page=epc_page_source.get("page_source"),
epc_page_rrn=epc_page_source.get("rrn"),
)
if not input_properties:
return Response(status_code=204)
@ -811,7 +845,8 @@ async def model_engine(body: PlanTriggerRequest):
# aginst each property if
if inspections_map:
logger.info("Inserting inspections data")
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
with db_session() as session:
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
# Set up model api and warm up the lambdas
model_api = ModelApi(
@ -828,7 +863,8 @@ async def model_engine(body: PlanTriggerRequest):
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
materials = db_funcs.materials_functions.get_materials(session)
with db_read_session() as session:
materials = db_funcs.materials_functions.get_materials(session)
cleaned = get_cleaned()
project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
@ -861,24 +897,24 @@ async def model_engine(body: PlanTriggerRequest):
ofgem_consumption_averages=ofgem_consumption_averages,
body=body
)
input_properties = GoogleSolarApi.building_solar_analysis(
building_solar_config=building_solar_config,
input_properties=input_properties,
session=session,
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
)
input_properties = GoogleSolarApi.unit_solar_analysis(
unit_solar_config=unit_solar_config,
input_properties=input_properties,
session=session,
body=body,
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
inspections_map=inspections_map
)
with db_session() as session:
input_properties = GoogleSolarApi.building_solar_analysis(
building_solar_config=building_solar_config,
input_properties=input_properties,
session=session,
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
)
with db_session() as session:
input_properties = GoogleSolarApi.unit_solar_analysis(
unit_solar_config=unit_solar_config,
input_properties=input_properties,
session=session,
body=body,
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
inspections_map=inspections_map
)
# We also make a tweak - if the property has been flagged for solar but doesn't contain
# any panel performance, we ensure that we have a 3kWp and 4kWp option for the property
@ -1272,139 +1308,96 @@ async def model_engine(body: PlanTriggerRequest):
# We don't need to create a new scenario, we just use the existing one
scenario_id = body.scenario_id
else:
engine_scenario = db_funcs.recommendations_functions.create_scenario(
session=session,
scenario={
"name": body.scenario_name,
"created_at": created_at,
"budget": body.budget,
"portfolio_id": body.portfolio_id,
"housing_type": body.housing_type,
"goal": body.goal,
"goal_value": body.goal_value,
"trigger_file_path": body.trigger_file_path,
"already_installed_file_path": body.already_installed_file_path,
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
)
with db_session() as session:
engine_scenario = db_funcs.recommendations_functions.create_scenario(
session=session,
scenario={
"name": body.scenario_name,
"created_at": created_at,
"budget": body.budget,
"portfolio_id": body.portfolio_id,
"housing_type": body.housing_type,
"goal": body.goal,
"goal_value": body.goal_value,
"trigger_file_path": body.trigger_file_path,
"already_installed_file_path": body.already_installed_file_path,
"patches_file_path": body.patches_file_path,
"non_invasive_recommendations_file_path": body.non_invasive_recommendations_file_path,
"exclusions": body.exclusions,
"multi_plan": body.multi_plan
}
)
scenario_id = engine_scenario.id
property_valuation_increases = []
session.commit()
new_epc_bands = {}
property_value_increase_ranges = {}
for i in range(0, len(input_properties), BATCH_SIZE):
for i in tqdm(
range(0, len(input_properties), BATCH_SIZE), total=int(np.ceil(len(input_properties) / BATCH_SIZE))
):
try:
# Take a slice of the input_properties list to make a batch
batch_properties = input_properties[i:i + BATCH_SIZE]
with db_session() as session:
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
for p in batch_properties:
recommendations_to_upload = recommendations.get(p.id, [])
default_recommendations = [r for r in recommendations_to_upload if r["default"]]
total_sap_points = sum([r["sap_points"] for r in default_recommendations])
new_sap_points = float(p.data["current-energy-efficiency"]) + total_sap_points
new_epc = sap_to_epc(new_sap_points)
new_epc_bands[p.id] = new_epc
total_cost = sum([r["total"] for r in default_recommendations])
total_cost = sum([r["total"] for r in default_recommendations])
valuations = PropertyValuation.estimate(
property_instance=p, target_epc=new_epc, total_cost=total_cost
)
valuations = PropertyValuation.estimate(
property_instance=p, target_epc=new_epc, total_cost=total_cost
)
property_value_increase_ranges[p.id] = valuations
property_plan_data = db_funcs.recommendations_functions.prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc,
default_recommendations
)
property_plan_data = db_funcs.recommendations_functions.prepare_plan_data(
p, body, scenario_id, eco_packages, valuations, new_sap_points, new_epc, default_recommendations
)
# TODO - this is not right, especially if the existing run failed
if p.is_new:
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
db_funcs.property_functions.create_property_details_epc(session, property_details_epc)
db_funcs.property_functions.update_or_create_property_spatial_details(
session, p.uprn, p.spatial
)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
db_funcs.property_functions.update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
if not recommendations_to_upload:
continue
if not recommendations_to_upload:
continue
new_plan_id = db_funcs.recommendations_functions.create_plan(session, plan=property_plan_data)
db_funcs.recommendations_functions.upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
db_funcs.funding_functions.upload_funding(session, p, new_plan_id, recommendations_to_upload)
if valuations["current_value"] > 0:
property_valuation_increases.append(
valuations["average_increased_value"] - valuations["current_value"]
new_plan_id = db_funcs.recommendations_functions.create_plan(
session, plan=property_plan_data
)
# Commit the session after each batch
session.commit()
db_funcs.recommendations_functions.upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
db_funcs.funding_functions.upload_funding(
session, p, new_plan_id, recommendations_to_upload
)
except Exception as e:
# Rollback the session if an error occurs
session.rollback()
logger.warning("Failed i = %s" % str(i))
logger.error(f"An error occurred during batch starting at index {i}: {e}")
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
logger.info("Creating portfolio aggregations")
# We implement this in the simplest way possible which will be just to query the database for all
# recommendations associated to the portfolio and then aggregate them. This is not the most efficient
# way to do this, but it's the simplest and will be a process that we can re-use since when we change a
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
logger.info("Work completed, updating log status")
total_valuation_increase = sum(property_valuation_increases)
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))
# TODO - This code only pulls in the properties that have been updated in this run, but we need to
# aggregate all properties in the portfolio. We likely need to trigger a re-aggregation
aggregated_data = extract_portfolio_aggregation_data(
input_properties=input_properties,
total_valuation_increase=total_valuation_increase,
recommendations=recommendations,
new_epc_bands=new_epc_bands,
property_value_increase_ranges=property_value_increase_ranges
)
db_funcs.portfolio_functions.aggregate_portfolio_recommendations(
session,
portfolio_id=body.portfolio_id,
scenario_id=scenario_id,
total_valuation_increase=total_valuation_increase,
labour_days=labour_days,
aggregated_data=aggregated_data
)
# Commit final changes
session.commit()
except IntegrityError as e:
return handle_error(session, "Database integrity error.", e, body.subtask_id, 500, start_ms)
return handle_error("Database integrity error.", e, body.subtask_id, 500, start_ms)
except OperationalError as e:
return handle_error(session, "Database operational error.", e, body.subtask_id, 500, start_ms)
return handle_error("Database operational error.", e, body.subtask_id, 500, start_ms)
except ValueError as e:
return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400, start_ms)
return handle_error("Bad request: malformed data.", e, body.subtask_id, 400, start_ms)
except Exception as e: # General exception handling
return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500, start_ms)
finally:
session.close()
return handle_error("An unexpected error occurred.", e, body.subtask_id, 500, start_ms)
cloud_logs_url = build_cloudwatch_log_url(start_ms)
# Mark the subtask as successful

View file

@ -498,56 +498,43 @@ class TrainingDataset(BaseDataset):
Drop properties that have inconsistent data, i.e. changing material types
"""
starting_and_finishing_null = (
expanded_df["original_description"].isin([None, ""]) &
expanded_df["original_description_ending"].isin([None, ""])
)
if component == "walls":
expanded_df = expanded_df[
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"])
& (
expanded_df["is_solid_brick"]
== expanded_df["is_solid_brick_ending"]
)
& (
expanded_df["is_timber_frame"]
== expanded_df["is_timber_frame_ending"]
)
& (
expanded_df["is_granite_or_whinstone"]
== expanded_df["is_granite_or_whinstone_ending"]
)
& (expanded_df["is_cob"] == expanded_df["is_cob_ending"])
& (
expanded_df["is_sandstone_or_limestone"]
== expanded_df["is_sandstone_or_limestone_ending"]
starting_and_finishing_null | (
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"])
& (expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"])
& (expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"])
& (expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"])
& (expanded_df["is_cob"] == expanded_df["is_cob_ending"])
& (expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"])
)
]
elif component == "floor":
expanded_df = expanded_df[
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"])
& (expanded_df["is_solid"] == expanded_df["is_solid_ending"])
& (
expanded_df["another_property_below"]
== expanded_df["another_property_below_ending"]
)
& (
expanded_df["is_to_unheated_space"]
== expanded_df["is_to_unheated_space_ending"]
)
& (
expanded_df["is_to_external_air"]
== expanded_df["is_to_external_air_ending"]
starting_and_finishing_null | (
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"])
& (expanded_df["is_solid"] == expanded_df["is_solid_ending"])
& (expanded_df["another_property_below"] == expanded_df["another_property_below_ending"])
& (expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"])
& (expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"])
)
]
elif component == "roof":
expanded_df = expanded_df[
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"])
& (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"])
& (expanded_df["is_loft"] == expanded_df["is_loft_ending"])
& (expanded_df["is_flat"] == expanded_df["is_flat_ending"])
& (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"])
& (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"])
& (
expanded_df["has_dwelling_above"]
== expanded_df["has_dwelling_above_ending"]
starting_and_finishing_null | (
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"])
& (expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"])
& (expanded_df["is_loft"] == expanded_df["is_loft_ending"])
& (expanded_df["is_flat"] == expanded_df["is_flat_ending"])
& (expanded_df["is_thatched"] == expanded_df["is_thatched_ending"])
& (expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"])
& (expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"])
)
]
@ -677,7 +664,6 @@ class TrainingDataset(BaseDataset):
}
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
if component == "main-fuel":
cleaned_key = "main-fuel"
left_on_starting = "main_fuel_starting"

View file

@ -150,7 +150,7 @@ class RoofRecommendations:
return
# If we have a u-value and we don't have a non-invasive recommendation, we can't recommend anything
if u_value and not any(
if (u_value is not None) and not any(
x in MEASURE_MAP["roof_insulation"] for x in [r["type"] for r in self.property.non_invasive_recommendations]
):
# We don't have enough information to provide a recommendation

View file

@ -163,9 +163,8 @@ class WallRecommendations(Definitions):
if (
(insulation_thickness in ["average", "above average"])
or self.property.walls["is_filled_cavity"]
) and (
"cavity_extract_and_refill"
not in measures
or self.property.walls["clean_description"] is None
) and ("cavity_extract_and_refill" not in measures
):
return

View file

@ -201,6 +201,11 @@ def get_wall_u_value(
)
)
else:
# Handle rare edge case
if clean_description == "":
return 0
mapped_description = epc_wall_description_map[clean_description]
mapped_value = wall_uvalues_df[