allowing carbon and energy otimisation by removing slack

This commit is contained in:
Khalim Conn-Kowlessar 2025-07-31 19:13:16 +01:00
parent b05c17bcab
commit 2c19b89c77
13 changed files with 1035 additions and 54 deletions

View file

@ -887,6 +887,9 @@ class AssetList:
self.landlord_year_built
].apply(extract_year)
for x in self.standardised_asset_list[self.landlord_year_built].values:
extract_year(x)
# We now create standard lookups
to_remap = {
self.landlord_property_type: {
@ -1099,6 +1102,13 @@ class AssetList:
)
# Estimate the perimeter
# Handle funky edge case
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] = np.where(
(self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]] == 0),
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]].mean(),
self.standardised_asset_list[self.EPC_API_DATA_NAMES["total-floor-area"]]
)
self.standardised_asset_list[self.ATTRIBUTE_ESTIMATED_PERIMETER] = self.standardised_asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x[self.EPC_API_DATA_NAMES["total-floor-area"]] / x[self.ATTRIBUTE_NUMBER_OF_FLOORS],
@ -1753,7 +1763,9 @@ class AssetList:
# It's empty cavity
self.standardised_asset_list["cavity_is_empty"] |
# It's a cavity wall
(self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].str.contains("cavity"))
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(
["filled cavity", "partial insulated cavity"]
)
)
not_a_flat = (
@ -2097,6 +2109,7 @@ class AssetList:
RANGE_RE = re.compile(r'\b(\d+[A-Za-z]?)\s*[-]\s*(\d+[A-Za-z]?)\b')
NUM_RE = re.compile(r'\b\d+[A-Za-z]?\b') # captures 12, 12A, etc.
TO_RANGE_RE = re.compile(r'\b(\d+[A-Za-z]?)\s+(?:to|To|TO)\s+(\d+[A-Za-z]?)\b') # captures "13 to 15"
expanded_rows = []
@ -2121,11 +2134,12 @@ class AssetList:
# 1 ─ Range (e.g. 1-7)
m_range = RANGE_RE.search(addr)
if m_range:
to_range = TO_RANGE_RE.search(addr)
start, end = m_range.groups()
if m_range or to_range:
start, end = m_range.groups() if m_range else to_range.groups()
start, end = int(re.match(r'\d+', start)[0]), int(re.match(r'\d+', end)[0])
if start > end or (end - start) > 100:
if start > end or (end - start) > 200:
raise ValueError(f"Suspicious range '{addr}'")
# We define the looping range on whether we have odd, even or all numbers
@ -2137,10 +2151,12 @@ class AssetList:
for n in house_number_range:
new = row.copy()
new_addr = RANGE_RE.sub(str(n), addr, count=1)
range_text = m_range.group(0) if m_range else to_range.group(0)
new_addr = addr.replace(range_text, str(n))
# Build the new full address by also swapping out the range_text
original_full_address = new[self.STANDARD_FULL_ADDRESS]
new_full_address = original_full_address.replace(addr, new_addr)
new[self.STANDARD_ADDRESS_1] = new_addr
new_full_address = original_full_address.replace(range_text, str(n))
new[self.STANDARD_ADDRESS_1] = str(n)
new[self.STANDARD_FULL_ADDRESS] = new_full_address
new[self.STANDARD_PROPERTY_TYPE] = "flat"
# Keep a record of the previous address 1
@ -2155,7 +2171,7 @@ class AssetList:
# 2 ─ Explicit list (e.g. 1, 2, 5 Block) or split by an ampersand (e.g. 1 & 2 Block)
nums = NUM_RE.findall(addr)
if len(nums) > 1 and (',' in addr or '&' in addr):
if len(nums) > 1 and (',' in addr or '&' in addr or ' and ' in addr.lower()):
for n in nums:
new = row.copy()
new_addr = re.sub(NUM_RE, n, addr, count=1) # replace the first number only
@ -2174,6 +2190,10 @@ class AssetList:
expanded_blocks = pd.DataFrame(expanded_rows)
# Check for duplicated domna ids
if expanded_blocks[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError("expanded blocks has duplicated IDs")
# We drop the blocks from the standardised asset list and append on the expanded blocks
self.standardised_asset_list = self.standardised_asset_list[
self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE] != "block of flats"
@ -2318,18 +2338,37 @@ class AssetList:
(~group["cavity_reason"].str.contains("(unlikely to quality)", case=False, na=False, regex=False))
).sum()
n_empties_high_confidence = (
(group["identified_empty_cavity"] == True) &
(~group["SAP Category"].isin(["SAP Rating 69-75", "SAP Rating 76 or more"])) &
(~pd.isnull(group["cavity_reason"])) &
(~group["cavity_reason"].str.contains("(unlikely to quality)", case=False, na=False, regex=False))
).sum()
# Average age of the EPCs
group["time_since_epc"] = (
pd.to_datetime("now") - pd.to_datetime(
group[self.EPC_API_DATA_NAMES["inspection-date"]])
).dt.days
average_age_of_epc = group["time_since_epc"].mean()
works = group["hubspot_status"]
above_threshold = works.map(LABEL_TO_ENUM.get).dropna()
count_above = (above_threshold >= threshold).sum()
proportion_surveyed = count_above / len(works)
proportion_empty = n_empties / len(works)
proportion_empty_high_confidence = n_empties_high_confidence / len(works)
# We auto-populate any blocks that have greater than 50% proportion empty
block_analysis.append(
{
"Block Reference": block_reference,
"Block Size": len(group),
"average_age_of_epc": average_age_of_epc,
"Proportion of properties suryeyed": proportion_surveyed,
"Percentage of Empties": proportion_empty,
"Percentage of Empties (high confidence)": proportion_empty_high_confidence,
**cavity_breakdown.to_dict(),
}
)
@ -3345,6 +3384,8 @@ class AssetList:
property_type_col = "PROPERTY TYPE As per table emailed"
elif "PROPERTY TYPE" in master_data.columns:
property_type_col = "PROPERTY TYPE"
elif 'Property Type' in master_data.columns:
property_type_col = 'Property Type'
else:
property_type_col = "PROPERTY TYPE (SEE DEEMED SCORES SHEET) Eg. 3W_Flat_1 (As per Matrix)"
@ -3496,8 +3537,20 @@ class AssetList:
]
if df.shape[0] != 1:
# We have multiple matches
raise NotImplementedError("FIX ME")
# We have multiple matches - it's likely because the landlord has a duplicate
# that has been referenced in totally different ways so we just match to both
for _, x in df.iterrows():
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: x[self.STANDARD_LANDLORD_PROPERTY_ID],
}
)
continue
matched.append(
{
"row_id": row["row_id"],
@ -3594,6 +3647,10 @@ class AssetList:
self.master_surveyed, how="left", on=self.STANDARD_LANDLORD_PROPERTY_ID
)
# Make sure no dupes
if self.standardised_asset_list[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError("duplicated ids!")
# Finally, we keep a record of the unmatched
if unmatched_submissions:
self.unmatched_submissions = pd.concat(

View file

@ -59,6 +59,110 @@ def app():
Property UPRN
"""
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Broadlands"
data_filename = "Broadlands Asset List.xlsx"
sheet_name = "Assets"
postcode_column = 'POSTCODE'
fulladdress_column = None
address1_column = "Address1"
address1_method = None
address_cols_to_concat = ["Address1"]
missing_postcodes_method = None
landlord_year_built = "DATEBUILT"
landlord_os_uprn = None
landlord_property_type = "PropertyType"
landlord_built_form = "PropertyType"
landlord_wall_construction = None
landlord_heating_system = "Heating Fuel"
landlord_existing_pv = None
landlord_property_id = "Row ID"
outcomes_filename = [os.path.join(data_folder, "outcomes.xlsx")]
outcomes_sheetname = ["Sheet1"]
outcomes_postcode = ["Postcode"]
outcomes_houseno = ["No."]
outcomes_address = ["Address"]
outcomes_id = [None]
master_filepaths = [
os.path.join(data_folder, "eco3 submissions.csv"),
os.path.join(data_folder, "eco4 submissions.csv"),
]
master_to_asset_list_filepath = None
asset_list_header = 0
landlord_block_reference = None
master_id_colnames = [None, None]
landlord_roof_construction = None
phase = False
landlord_sap = None
ecosurv_landlords = "broadland"
#
# Community:
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/New Programme"
data_filename = "SUB EPC C to DOMNA - 24.07.25.xlsx"
sheet_name = "Sheet1"
postcode_column = 'POSTCODE'
fulladdress_column = "ADDRESS"
address1_column = None
address1_method = "house_number_extraction"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = "BUILD DATE"
landlord_os_uprn = None
landlord_property_type = "PROPERTY TYPE"
landlord_built_form = "Archetype" # Using the inspections archetype
landlord_wall_construction = "CONSTRUCTION TYPE"
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "UPRN"
landlord_sap = None
outcomes_filename = []
outcomes_sheetname = []
outcomes_postcode = []
outcomes_houseno = []
outcomes_id = []
outcomes_address = []
master_filepaths = []
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = None
asset_list_header = 1
landlord_block_reference = None
master_id_colnames = []
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Ealing/Programme Analysis"
data_filename = "EalingProjectRebuildJW210725.xlsx"
sheet_name = "Refine & Houses"
postcode_column = 'Postcode'
fulladdress_column = "Address"
address1_column = None
address1_method = "house_number_extraction"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = None
landlord_property_type = None # Using the inspections property type
landlord_built_form = None
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "Property ref"
landlord_sap = None
outcomes_filename = []
outcomes_sheetname = []
outcomes_postcode = []
outcomes_houseno = []
outcomes_id = []
outcomes_address = []
master_filepaths = []
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = None
asset_list_header = 0
landlord_block_reference = "Block Reference"
master_id_colnames = []
# TODO: Delete me
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/NRLA/"
data_filename = "20250716 Asset List.xlsx"
@ -148,7 +252,7 @@ def app():
landlord_existing_pv = None
landlord_property_id = "PropertyCode"
outcomes_filename = [os.path.join(data_folder, "Rooftop_Outcomes.xlsx")]
outcomes_sheetname = ["OUTCOMESs"]
outcomes_sheetname = ["OUTCOMES"]
outcomes_postcode = ["POSTCODE"]
outcomes_houseno = ["NO"]
outcomes_address = ["ADDRESS"]
@ -221,15 +325,15 @@ def app():
outcomes_houseno = []
outcomes_address = []
outcomes_id = []
master_filepaths = []
master_filepaths = [os.path.join(data_folder, "submissions.csv")]
master_to_asset_list_filepath = None
asset_list_header = 0
landlord_block_reference = None
master_id_colnames = []
master_id_colnames = [None]
landlord_roof_construction = None
phase = False
landlord_sap = None
ecosurv_landlords = None
ecosurv_landlords = "cds"
# Plus Dane
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Plus Dane/New Programme July 2025/"

View file

@ -385,6 +385,59 @@ BUILT_FORM_MAPPINGS = {
'Maisonette Over Shop': 'mid-floor',
'Medium Rise Flat': 'mid-floor',
'Maisonette Medium Rise': 'unknown',
'End-terraced house': 'end-terrace'
'End-terraced house': 'end-terrace',
'Ground floor study bedroom': 'ground floor',
'End terrace bungalow': 'end-terrace',
'End terrace house': 'end-terrace',
'Ground floor bedsit': 'ground floor',
'Detached bungalow': 'detached',
'Lower ground floor flat': 'ground floor',
'Mid terrace bungalow': 'mid-terrace',
'Mid terrace house': 'mid-terrace',
'Basement bedsit': 'basement',
'Ground floor flat': 'ground floor',
'Ground floor flat with study': 'ground floor',
'Basement flat': 'basement',
'Semi bungalow': 'semi-detached',
'2nd floor flat': 'mid-floor',
'General/Communal': 'unknown',
'Semi house': 'semi-detached',
'2nd floor flat with study': 'mid-floor',
'1st floor flat with study room': 'mid-floor',
'Cluster House': 'detached',
'Utility pod': 'unknown',
'3rd floor flat': 'mid-floor',
'4th floor flat': 'mid-floor',
'2nd floor study bedroom': 'mid-floor',
'1st floor study bedroom': 'mid-floor',
'Dormer bungalow': 'detached',
'1st floor flat': 'mid-floor',
'Block property': 'unknown',
'Utility pod - DDA compliant': 'unknown',
'2nd floor bedsit': 'mid-floor',
'1st floor bedsit': 'mid-floor',
'2nd/3rd floor duplex flat': 'mid-floor',
'Bungalow - Detached': 'detached',
'Maisonette - Detached': 'detached',
'Bedsit - Mid Terrace': 'mid-terrace',
'House - End Terrace': 'end-terrace',
'House - Mid Terrace': 'mid-terrace',
'Bungalow - End Terrace': 'end-terrace',
'Maisonette - End Terrace': 'end-terrace',
'Maisonette - Semi Detached': 'semi-detached',
'House - Detached': 'detached',
'Bedsit - End Terrace': 'end-terrace',
'House - Semi detached': 'semi-detached',
'Studio Flat - Mid Terrace': 'mid-terrace',
'Bungalow - Semi detached': 'semi-detached',
'Amenity Block - Detached': 'detached',
'Bungalow - Mid Terrace': 'mid-terrace',
'Amenity Block - Semi detached': 'semi-detached',
'Maisonette - Mid Terrace': 'mid-terrace',
'Chalet - Wheelchair': 'unknown',
'Studio Flat': 'unknown',
'Bungalow - Attached': 'semi-detached'
}

View file

@ -377,6 +377,60 @@ HEATING_MAPPINGS = {
'Warm air Electricity': 'warm air heating',
'None': 'no heating',
'Boiler None': 'unknown',
'Storage heaters Electricity': 'electric storage heaters'
'Storage heaters Electricity': 'electric storage heaters',
'Unknown when old solid fuel system was removed': 'solid fuel',
'Storage Heater': 'electric storage heaters',
'Combi': 'gas condensing combi',
'Combi condensing': 'gas condensing combi',
'Combi Condensing': 'gas condensing combi',
'Tenant Burner': 'unknown',
'Wall Mounted Condens': 'gas condensing boiler',
'Gas Pipework': 'unknown',
'Open Fire Bck Boiler': 'solid fuel',
'Back Boiler Unit': 'solid fuel',
'Sharedgasboiler': 'communal gas boiler',
'Wall Mntd Condensing': 'gas condensing boiler',
'Flr Standing Combi': 'gas combi boiler',
'Oil - Tenant': 'oil boiler',
'Open Flue Fire': 'solid fuel',
'Wall Mounted Fire': 'room heaters',
'Gas - Unvented Cylinder': 'gas boiler, radiators',
'Commercial Pipework': 'unknown',
'Wall Mntd Condensin': 'gas condensing boiler',
'Offpeakelectric': 'electric storage heaters',
'Closed Burner': 'unknown',
'Domesticgasboiler': 'gas boiler, radiators',
'Elec - Storage': 'electric storage heaters',
'Share Common Boiler': 'communal heating',
'Down Flow Heater': 'electric radiators',
'Inset Flame Effect': 'electric radiators',
'Closedmulti': 'unknown',
'Open Fire': 'solid fuel',
'Lpgas - Domesticgasboiler': 'gas boiler, radiators',
'Solarpvpanels': 'other',
'Renew - Ashp': 'air source heat pump',
'Room Sealed App': 'unknown',
'5 Year Periodic Insp': 'unknown',
'Solarthermal': 'other',
'Wall Mounted Combi': 'gas combi boiler',
'Woodburner': 'solid fuel',
'Sealed System Wl Mtd': 'unknown',
'Room Seal App': 'unknown',
'Shared Gas Boiler': 'communal gas boiler',
'Heating Distribution': 'unknown',
'Flr Standing Boiler': 'boiler - other fuel',
'Multifuel Burner': 'solid fuel',
'Gas - Shared': 'communal gas boiler',
'Wall Mounted Boiler': 'gas boiler, radiators',
'Tenant Boiler': 'gas boiler, radiators',
'Gas - Domesticgasboiler': 'gas boiler, radiators',
'Domestic gas boiler': 'gas boiler, radiators',
'Combination': 'unknown',
'Mains Electric': 'electric fuel',
'Unvented cylinder': 'other',
'MVHR & Heat Recovery': 'other',
'Solar': 'other'
}

View file

@ -283,6 +283,59 @@ PROPERTY_MAPPING = {
'Flat Over Shop': 'flat',
'Medium Rise Flat': 'flat',
'End Terraced Town House': 'house',
'Maisonette Medium Rise': 'maisonette'
'Maisonette Medium Rise': 'maisonette',
'Semi bungalow': 'bungalow',
'2nd floor flat': 'flat',
'End terrace bungalow': 'bungalow',
'End terrace house': 'house',
'Ground floor bedsit': 'bedsit',
'Detached bungalow': 'bungalow',
'Semi house': 'house',
'2nd floor flat with study': 'flat',
'1st floor flat with study room': 'flat',
'Lower ground floor flat': 'flat',
'Cluster House': 'house',
'Mid terrace bungalow': 'bungalow',
'Mid terrace house': 'house',
'Basement bedsit': 'bedsit',
'Detached house': 'house',
'3rd floor flat': 'flat',
'4th floor flat': 'flat',
'Dormer bungalow': 'bungalow',
'1st floor flat': 'flat',
'Ground floor flat': 'flat',
'Ground floor flat with study': 'flat',
'Basement flat': 'flat',
'2nd floor bedsit': 'bedsit',
'1st floor bedsit': 'bedsit',
'2nd/3rd floor duplex flat': 'flat',
'Ground floor study bedroom': 'other',
'General/Communal': 'other',
'Utility pod': 'other',
'2nd floor study bedroom': 'other',
'1st floor study bedroom': 'other',
'Block property': 'block of flats',
'Utility pod - DDA compliant': 'other',
'Bungalow - Detached': 'bungalow',
'Maisonette - Detached': 'maisonette',
'Bedsit - Mid Terrace': 'bedsit',
'Studio Flat': 'flat',
'House - End Terrace': 'house',
'House - Mid Terrace': 'house',
'Bungalow - End Terrace': 'bungalow',
'Bungalow - Attached': 'bungalow',
'Maisonette - End Terrace': 'maisonette',
'Maisonette - Semi Detached': 'maisonette',
'House - Detached': 'house',
'Bedsit - End Terrace': 'bedsit',
'House - Semi detached': 'house',
'Studio Flat - Mid Terrace': 'flat',
'Bungalow - Semi detached': 'bungalow',
'Bungalow - Mid Terrace': 'bungalow',
'Maisonette - Mid Terrace': 'maisonette',
'Chalet - Wheelchair': 'other',
'Amenity Block - Detached': 'other',
'Amenity Block - Semi detached': 'other'
}

View file

@ -1,3 +1,4 @@
from enum import Enum
import pandas as pd
import numpy as np
from typing import List
@ -413,6 +414,10 @@ class FundingOld:
self.whlg()
class EligibilityCaveats(Enum):
TENANT_ON_BENEFITS_OR_LOW_INCOME = "tenant_on_benefits_or_low_income"
class Funding:
"""
New class to handle funding calculation
@ -440,6 +445,9 @@ class Funding:
self.project_scores_matrix = project_scores_matrix
self.whlg_eligible_postcodes = whlg_eligible_postcodes
self.eco4_eligible = False
self.eligbility_caveat = None
@staticmethod
def get_sap_band(sap_score_number):
bands = [
@ -478,9 +486,8 @@ class Funding:
return "200"
@staticmethod
def eco4_prs_eligibility(
starting_sap: int, measures: List, mainheat_description: str, heating_control_description: str
self, starting_sap: int, measures: List, mainheat_description: str, heating_control_description: str
):
"""
Handles the eligibility criteria for private rental properties under eco
@ -509,11 +516,19 @@ class Funding:
# Is a renewable heating
ashp = "air_source_heat_pump" in measures
# Meets the EPC criteria, has the measure requirement and tenant must be on benefits
if meets_epc & (solar_renweable_heating or ashp or has_solid_wall):
return True
self.eco4_eligible = True
self.eligbility_caveat = EligibilityCaveats.TENANT_ON_BENEFITS_OR_LOW_INCOME
return
return False
def gbis_prs_eligibiltiy(self):
"""
Determines if a project is eligible for GBIS funding for private rental properties
"""
def calculate_full_project_abs(self):
# Filter the project scores matrix
@ -568,7 +583,7 @@ class Funding:
# 2) GBIS
if self.tenure == "Private":
is_eco4_eligible = self.eco4_prs_eligibility(
self.eco4_prs_eligibility(
starting_sap=starting_sap,
measures=measures,
mainheat_description=mainheat_description,
@ -578,7 +593,8 @@ class Funding:
# Need to implement
# 1) Package has to include an insulation measure
# 2) We should use the funding for the measure that has the largest partial project score
is_gbis_eligible = ()
# TODO: check the rules around GBIS eligibility and heating controls
self.gbis_prs_eligibiltiy()
if not is_eco4_eligible:
return

View file

@ -18,6 +18,12 @@ SPECIFIC_MEASURES = [
"cylinder_thermostat"
]
INSULATION_MEASURES = [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "flat_roof_insulation", "room_roof_insulation",
"suspended_floor_insulation", "solid_floor_insulation",
]
NON_INVASIVE_SPECIFIC_MEASURES = [
"trickle_vents", "draught_proofing", "mixed_glazing", "cavity_extract_and_refill",
"extension_cavity_wall_insulation"
@ -36,7 +42,7 @@ MEASURE_MAP = {
"heating_controls": ["roomstat_programmer_trvs", "time_temperature_zone_control"]
}
VALID_GOALS = ["Increasing EPC"]
VALID_GOALS = ["Increasing EPC", "Energy Savings", "Reducing CO2 emissions"]
VALID_HOUSING_TYPES = ["Social", "Private"]
VALID_EVENT_TYPES = ["remote_assessment"]
@ -74,7 +80,7 @@ class PlanTriggerRequest(BaseModel):
budget: Optional[float] = None
goal: Goal
housing_type: HousingType
goal_value: str
goal_value: Optional[str] = None
portfolio_id: int
trigger_file_path: str
already_installed_file_path: Optional[str] = None
@ -118,3 +124,10 @@ class PlanTriggerRequest(BaseModel):
if (self.index_start is None) != (self.index_end is None):
raise ValueError("Both index_start and index_end must be set or both must be None")
return self
@model_validator(mode="after")
def check_goal_value_requirement(self):
# Make sure that goal_value is set when goal is "Increasing EPC"
if self.goal == "Increasing EPC" and not self.goal_value:
raise ValueError("goal_value is required when goal is 'Increasing EPC'")
return self

View file

@ -811,7 +811,8 @@ async def model_engine(body: PlanTriggerRequest):
# we can discount the number of points required to get to the target SAP band (or increase)
# in the case of ventilation
needs_ventilation = any(
x in property_measure_types for x in assumptions.measures_needing_ventilation) and not p.has_ventilation
x in property_measure_types for x in assumptions.measures_needing_ventilation
) and not p.has_ventilation
input_measures = prepare_input_measures(measures_to_optimise, body.goal, needs_ventilation)
@ -849,15 +850,21 @@ async def model_engine(body: PlanTriggerRequest):
0
)
current_sap_points = int(p.data["current-energy-efficiency"])
if body.goal == "Increasing EPC":
current_sap_points = int(p.data["current-energy-efficiency"])
gain = CostOptimiser.calculate_sap_gain_with_slack(
epc_to_sap_lower_bound(body.goal_value) - current_sap_points
) - fixed_gain
if body.simulate_sap_10:
# We add 3 additional SAP points to the required gain to account for SAP 10
gain += 3
sap_gain = CostOptimiser.calculate_sap_gain_with_slack(
epc_to_sap_lower_bound(body.goal_value) - current_sap_points
) - fixed_gain
if body.simulate_sap_10:
# We add 3 additional SAP points to the required gain to account for SAP 10
sap_gain += 3
gain = gain if gain > 0 else 0
elif body.goal in ["Energy Savings", "Reducing CO2 emissions"]:
# We will aim to maximise these goals, while constaining by budget
gain = None
else:
raise NotImplementedError(f"Goal {body.goal} is not supported")
if not body.optimise:
if body.goal != "Increasing EPC":
@ -870,15 +877,13 @@ async def model_engine(body: PlanTriggerRequest):
else:
if body.budget:
optimiser = GainOptimiser(
input_measures, max_cost=body.budget, max_gain=sap_gain if sap_gain > 0 else 0
)
optimiser = GainOptimiser(input_measures, max_cost=body.budget, max_gain=gain)
else:
# The minimum gain is the minimum number of SAP points required to get to the target SAP band
# If the gain is negative, the optimiser will return an empty solution
optimiser = CostOptimiser(
input_measures,
min_gain=sap_gain
min_gain=gain
)
optimiser.setup()
@ -1111,6 +1116,8 @@ async def model_engine(body: PlanTriggerRequest):
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))
# TODO - This code only pulls in the properties that have been updated in this run, but we need to
# aggregate all properties in the portfolio. We likely need to trigger a re-aggregation
aggregated_data = extract_portfolio_aggregation_data(
input_properties=input_properties,
total_valuation_increase=total_valuation_increase,

567
epr_data_exports/app.py Normal file
View file

@ -0,0 +1,567 @@
"""
This is a placeholder script to extract epr data from files, where we can
"""
"""
July 2025 LiveWest Heating Upgrades
"""
import os
import re
import PyPDF2
import pandas as pd
from tqdm import tqdm
from collections import Counter
def extract_window_age_description(windows_text):
"""
Extracts the most common window age description and its proportion.
Parameters:
windows_text (str): The text section containing window data.
Returns:
dict: A dictionary with the most common window age description and its proportion.
"""
# Clean up windows_text by removing line breaks for better pattern matching
windows_text = windows_text.replace("\n", "")
# Define possible window age descriptions
window_descriptions = [
"Double post or during 2002",
"Double pre 2002",
"Double with unknown install date",
"Secondary glazing",
"Triple glazing",
"Single glazing",
"Double between 2002 \nand 2021",
"Double between 2002 and 2021"
]
# Count occurrences of each description
description_counts = Counter()
for description in window_descriptions:
matches = re.findall(re.escape(description), windows_text)
description_counts[description] = len(matches)
if not description_counts or not sum(description_counts.values()):
raise ValueError("Failed to extract window data.")
# Determine the most common description and calculate its proportion
most_common_description, window_count = description_counts.most_common(1)[0]
window_proportion = window_count / sum(description_counts.values()) * 100
# Get the second most common and the proportion
if window_proportion == 100:
second_most_common_description = None
second_most_common_proportion = 0
else:
second_most_common_description, second_window_count = description_counts.most_common(2)[1]
second_most_common_proportion = second_window_count / sum(description_counts.values()) * 100
return {
"Window Age Description": most_common_description,
"Window Age Description Proportion (%)": window_proportion,
"Secondary Window Age Description": second_most_common_description,
"Secondary Window Age Description Proportion (%)": second_most_common_proportion,
"Number of Windows": sum(description_counts.values())
}
def extract_building_parts_summary(text):
"""
Extracts building parts and associated dimensions from the summary report PDF.
This includes Main Property, multiple extensions if they exist, and Room in Roof areas.
"""
data = []
# Locate the Dimensions section
dimensions_section = re.search(
r"Dimensions:\s*Dimension type: Internal\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL
)
if not dimensions_section:
dimensions_section = re.search(
r"Dimensions:\s*Dimension type: External\n(.*?)\n5\.0 Conservatory:", text, re.DOTALL
)
if not dimensions_section:
raise ValueError("Failed to locate dimensions section in the text.")
dimensions_text = dimensions_section.group(1)
# Pattern to extract each building part, starting from Main Property and including extensions
building_part_pattern = re.compile(
r"(Main Property|\d+(?:st|nd|rd|th) Extension)\s*"
r"(.*?)(?=\d+(?:st|nd|rd|th) Extension|5\.0 Conservatory|$)",
re.DOTALL
)
# Loop through each building part match, including Main Property and extensions
for match in building_part_pattern.finditer(dimensions_text):
part_name = match.group(1)
floor_data = match.group(2)
# Pattern to extract floor details: Floor Level, Floor Area, Room Height, Perimeter, Party Wall Length
floor_pattern = re.compile(
r"(1st Floor|Lowest Floor|Second floor):\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)"
)
# Extract data for each floor within the building part
for floor_match in floor_pattern.finditer(floor_data):
floor_level = floor_match.group(1)
floor_area = float(floor_match.group(2))
room_height = float(floor_match.group(3))
perimeter = float(floor_match.group(4))
party_wall_length = float(floor_match.group(5))
# Append to data list
data.append({
"Building Part": part_name,
"Floor Level": floor_level,
"Floor Area (m2)": floor_area,
"Room Height (m)": room_height,
"Perimeter (m)": perimeter,
"Party Wall Length (m)": party_wall_length
})
# Check specifically for "Room(s) in Roof" entries, which only have Floor Area
room_in_roof_pattern = re.compile(r"Room\(s\) in Roof:\s*([\d.]+)")
room_in_roof_match = room_in_roof_pattern.search(floor_data)
if room_in_roof_match:
floor_area = float(room_in_roof_match.group(1))
data.append({
"Building Part": part_name,
"Floor Level": "Room in Roof",
"Floor Area (m2)": floor_area,
"Room Height (m)": None, # Placeholder for missing data
"Perimeter (m)": None, # Placeholder for missing data
"Party Wall Length (m)": None # Placeholder for missing data
})
# Calculate aggregated dimensions
main_property = [part for part in data if "Main Property" in part["Building Part"]]
first_extensions = [part for part in data if "1st Extension" in part["Building Part"]]
dimensions = {
"Total Floor Area (m2)": sum([part["Floor Area (m2)"] for part in data]),
"Total Ground Floor Area (m2)": sum(
[part["Floor Area (m2)"] for part in data if "Lowest Floor" in part["Floor Level"]]
),
"RIR Floor Area": sum(
[part["Floor Area (m2)"] for part in data if "Room in Roof" in part["Floor Level"]]
),
"Main Building Wall Area (m2)": sum([x["Perimeter (m)"] * x["Room Height (m)"] for x in main_property if
x["Perimeter (m)"] and x["Room Height (m)"]]),
"First Extension Wall Area (m2)": sum(
[x["Perimeter (m)"] * x["Room Height (m)"] for x in first_extensions if
x["Perimeter (m)"] and x["Room Height (m)"]]
),
}
return dimensions
def extract_roof_details_summary(text):
"""
Extracts roof type, insulation, and insulation thickness for each building part
in the 8.0 Roofs section of the summary report.
"""
# Define data structure to hold results
roof_data = []
# Locate the entire 8.0 Roofs section
roof_section_match = re.search(r"8\.0 Roofs:\n(.*?)(?=\n9\.0 Floors:|$)", text, re.DOTALL)
if not roof_section_match:
return roof_data # Return empty if no roof section is found
# Extract the roof section and append "9.0 Floors:" as the boundary
roof_section = roof_section_match.group(1).strip() + "\n9.0 Floors:"
# Define pattern to match each building part's roof entry
building_part_pattern = re.compile(
r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label
r"Type\s+(.*?)(?=\n(?:Insulation|9\.0 Floors:|[A-Z]))" # Matches Roof Type until the next field, label, or end
r"(?:\nInsulation\s+(.*?)(?=\n(?:Insulation Thickness|9\.0 Floors:|[A-Z])))?" # Optional Insulation
r"(?:\nInsulation Thickness\s+(.*?)(?=\n(?:9\.0 Floors:|[A-Z])))?", # Optional Insulation Thickness
re.DOTALL
)
# Extract each building part's data
for match in building_part_pattern.finditer(roof_section):
part_name = match.group(1).strip() # Building part label
roof_type = match.group(2).strip() # Roof Type
roof_insulation = match.group(3).strip() if match.group(3) else None # Optional Insulation
roof_insulation_thickness = match.group(4).strip() if match.group(4) else None # Optional Thickness
# Cleaning to handle annoying cases when it comes out like this:
# 'A Another dwelling above\n1st Extension'
if roof_type.startswith("A Another dwelling above"):
roof_type = "A Another dwelling above"
# Store results for this building part
roof_data.append({
"Building Part": part_name,
"Roof Type": roof_type,
"Roof Insulation": roof_insulation,
"Roof Insulation Thickness": roof_insulation_thickness,
})
return roof_data
def extract_wall_details_summary(text):
"""
Extracts wall type, insulation, dry-lining, and thickness for each building part,
including any alternative wall details within the 7.0 Walls section of the summary PDF text.
"""
# Define data structure to hold all building part wall entries
wall_data = []
# Locate the entire 7.0 Walls section
wall_section = re.search(r"7\.0 Walls:\n(.*?)\n8\.0 Roofs:", text, re.DOTALL).group(1)
# Define pattern to match each building part's wall entry within the section
building_part_pattern = re.compile(
r"(Main Property|1st Extension|2nd Extension|[\w\s]+)\n" # Matches each building part label
r"Type\s+(.*?)\n" # Matches main wall Type
r"Insulation\s+(.*?)\n", # Matches main wall Insulation
# r"(Dry-lining\s+(.*?)\n)?" # Optional main wall Dry-lining
# r"Wall Thickness Unknown\s+(.*?)\n" # Matches main wall Thickness Unknown
# r"Wall Thickness \[mm\]\s+(\d+)", # Matches main wall Thickness
re.DOTALL
)
# Define pattern to capture alternative wall details, if present
alternative_wall_pattern = re.compile(
r"Alternative Wall Area.*?\n" # Matches start of alternative wall section
r"Alternative Type\s+(.*?)\n" # Matches alternative wall Type
r"Alternative Insulation\s+(.*?)\n" # Matches alternative wall Insulation
r"(Alternative Dry-lining\s+(.*?)\n)?" # Optional Alternative Dry-lining
r"Alternative Wall Thickness Unknown\s+(.*?)\n" # Matches alternative wall Thickness Unknown
r"Alternative Wall Thickness\s+(\d+)", # Matches alternative wall Thickness
re.DOTALL
)
# Find all building part entries within the 7.0 Walls section
for match in building_part_pattern.finditer(wall_section):
wall_label = match.group(1).strip()
main_wall_type = match.group(2).strip()
main_wall_insulation = match.group(3).strip()
# main_wall_dry_lining = match.group(5).strip() if match.group(5) else "N/A"
# main_wall_thickness_unknown = match.group(6).strip()
# main_wall_thickness = int(match.group(7))
# Initialize dictionary for this wall entry
wall_entry = {
"Building Part": wall_label,
"Wall Type": main_wall_type,
"Wall Insulation": main_wall_insulation,
# "Wall Dry-lining": main_wall_dry_lining,
# "Wall Thickness Unknown": main_wall_thickness_unknown,
# "Wall Thickness (mm)": main_wall_thickness,
"Alternative Wall Type": None,
"Alternative Wall Insulation": None,
"Alternative Wall Dry-lining": "N/A",
"Alternative Wall Thickness Unknown": None,
"Alternative Wall Thickness (mm)": None,
}
# Check if there's an alternative wall section following this wall entry
alt_match = alternative_wall_pattern.search(wall_section, match.end())
if alt_match:
wall_entry["Alternative Wall Type"] = alt_match.group(1).strip()
wall_entry["Alternative Wall Insulation"] = alt_match.group(2).strip()
wall_entry["Alternative Wall Dry-lining"] = alt_match.group(4).strip() if alt_match.group(4) else "N/A"
wall_entry["Alternative Wall Thickness Unknown"] = alt_match.group(5).strip()
wall_entry["Alternative Wall Thickness (mm)"] = int(alt_match.group(6))
# Append each building part as a dictionary in the wall_data list
wall_data.append(wall_entry)
return wall_data
def extract_summary_report(pdf_path):
"""
Extracts specific data from the provided PDF file.
Data includes:
- Current SAP rating
- Fuel Bill
- Address
"""
data = {
"Address": None,
"Postcode": None,
"Current SAP Rating": None,
"Current EPC Band": None,
"Fuel Bill": None,
"Main Building Age Band": None,
"Number of Storeys": None,
"Window Age Description": None,
"Window Age Description Proportion (%)": None,
"Secondary Window Age Description": None,
"Secondary Window Age Description Proportion (%)": None,
"Number of Windows": None,
"Total Number of Doors": None,
"Number of Insulated Doors": None,
"Existing Primary Heating System": None,
"Existing Primary Heating PCDF Reference": None,
"Existing Primary Heating Controls": None,
"Existing Primary Heating % of Heat": None,
"Existing Secondary Heating System": None,
"Existing Secondary Heating PCDF Reference": None,
"Existing Secondary Heating Controls": None,
"Existing Secondary Heating % of Heat": None,
"Secondary Heating Code": None,
"Water Heating Code": None,
'Total Floor Area (m2)': None,
'Total Ground Floor Area (m2)': None,
'RIR Floor Area': None,
'Main Building Wall Area (m2)': None,
'First Extension Wall Area (m2)': None,
"Number of Light Fittings": None,
"Number of LEL Fittings": None,
"Number of fittings needing LEL": None,
"Main Roof Type": None,
"Main Roof Insulation": None,
"Main Roof Insulation Thickness": None,
"Main Wall Type": None,
"Main Wall Insulation": None,
"Main Wall Dry-lining": None,
"Main Wall Thickness": None,
"Main Building Alternative Wall Type": None,
"Main Building Alternative Wall Insulation": None,
"Main Building Alternative Wall Dry-lining": None,
"Main Building Alternative Wall Thickness": None,
}
with (open(pdf_path, "rb") as file):
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
# Extract Current SAP rating
sap_match = re.search(r"Current SAP rating:\s*([A-Z] \d+)", text)
data["Current SAP Rating"] = sap_match.group(1).split(" ")[1]
data["Property Type"] = (
re.search(r"Property type:\s*(.*?)\n2\.0", text, re.DOTALL)
.group(1).replace('\n', ' ').strip().replace(" ", " ")
)
# Extract age
age_band_match = re.search(
r"3\.0 Date Built:\s*Main Property\s*[A-Z]?\s*(\d{4}-\d{4}|before \d{4}|\d{4} onwards)",
text
)
data["Main Building Age Band"] = age_band_match.group(1)
# Number of storeys
storeys_match = re.search(r"Number of Storeys:\s*(\d+)", text)
data["Number of Storeys"] = int(storeys_match.group(1))
# Grab number of heated rooms, number of habitable rooms
data["Number of Heated Rooms"] = int(re.search(r"Heated Habitable Rooms:\s*(\d+)", text).group(1))
data["Number of Habitable rooms"] = int(re.search(r"Habitable Rooms:\s*(\d+)", text).group(1))
# Extract Carbon Emissions
# carbon_match = re.search(r"Emissions \(t/year\):\s*([\d.]+)\s*tonnes", text)
# data["Carbon Emissions (t/year)"] = float(carbon_match.group(1))
# Extract Fuel Bill
fuel_bill_match = re.search(r"Fuel Bill:\s*£(\d+)", text)
data["Fuel Bill"] = f"£{fuel_bill_match.group(1)}"
# Extract individual address components
postcode = re.search(r"Postcode:\s*(.*?)\nRegion:", text)
# region = re.search(r"Region:\s*(.*?)\nHouse Name:", text)
house_name = re.search(r"House Name:\s*(.*?)\nHouse No:", text)
house_no = re.search(r"House No:\s*(.*?)\nStreet:", text)
street = re.search(r"Street:\s*(.*?)\nLocality:", text)
locality = re.search(r"Locality:\s*(.*?)\nTown:", text)
town = re.search(r"Town:\s*(.*?)\nCounty:", text)
county = re.search(r"County:\s*(.*?)\nProperty Tenure:", text)
# Clean extracted values and remove any prefixes
address_parts = [
house_no.group(1).strip() if house_no else "",
house_name.group(1).strip() if house_name else "",
street.group(1).strip() if street else "",
locality.group(1).strip() if locality else "",
town.group(1).strip() if town else "",
county.group(1).strip() if county else "",
postcode.group(1).strip() if postcode else ""
]
# Join non-empty parts with a comma
data["Address"] = ", ".join([part for part in address_parts if part])
data["Postcode"] = postcode.group(1).strip()
# windows_section = re.search(r"Windows\s*(.*?)\s*Draught Proofing", text, re.DOTALL)
# windows_text = windows_section.group(1)
# window_data = extract_window_age_description(windows_text)
# data.update(window_data)
# Extract Total Number of Doors
total_doors_match = re.search(r"Total Number of Doors\s*(\d+)", text)
data["Total Number of Doors"] = int(total_doors_match.group(1))
# Extract Number of Insulated Doors
insulated_doors_match = re.search(r"Number of Insulated Doors\s*(\d+)", text)
data["Number of Insulated Doors"] = int(insulated_doors_match.group(1))
# Extract heating system
# Extract Primary Heating Data
# Extract Primary Heating Section
primary_heating_section1 = re.search(r"Main\s*Heating1\s*(.*?)\s*Main\s*Heating2", text, re.DOTALL)
primary_heating_section2 = re.search(r"Main\s*Heating1\s*(.*?)\s*Water\s*Heating", text, re.DOTALL)
primary_heating_section = primary_heating_section1 if primary_heating_section1 else primary_heating_section2
primary_text = primary_heating_section.group(1)
# Handle extracting main heating code:
mainheat_search = re.search(r"Main Heating Code\s*(.*?)\n", primary_text)
if mainheat_search is None:
mainheat_search = re.search(r"Main Heating EES Code\s*(.*?)\n", primary_text)
if mainheat_search is None:
mainheat_search = re.search(r"PCDF boiler Reference\s*(.*?)\n", primary_text)
data["Existing Primary Heating System"] = mainheat_search.group(1).strip()
data["Existing Primary Heating PCDF Reference"] = re.search(
r"PCDF boiler Reference\s*(\d+)", primary_text
).group(1)
controls_search = re.search(
r"Main Heating Controls Sap\s*(.*?)\n", primary_text
)
if controls_search is None:
controls_search = re.search(
r"Main Heating Controls\s*(.*?)\n", primary_text
)
data["Existing Primary Heating Controls"] = controls_search.group(1).strip()
data["Existing Primary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%", primary_text).group(1)
)
# Extract Secondary Heating Section
secondary_heating_section = re.search(r"Main\s*Heating2\s*(.*?)\s*Water\s*Heating", text, re.DOTALL)
if secondary_heating_section is None:
data["Existing Secondary Heating System"] = ""
data["Existing Secondary Heating PCDF Reference"] = ""
data["Existing Secondary Heating Controls"] = ""
data["Existing Secondary Heating % of Heat"] = 0
else:
secondary_text = secondary_heating_section.group(1)
main_heating_code_match_secondary = re.search(
r"Main Heating Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
)
if main_heating_code_match_secondary is None:
main_heating_code_match_secondary = re.search(
r"Main Heating EES Code\s*(.*?)(?=\n|Percentage of Heat)", secondary_text
)
data["Existing Secondary Heating System"] = main_heating_code_match_secondary.group(1).strip()
data["Existing Secondary Heating PCDF Reference"] = re.search(r"PCDF boiler Reference\s*(\d+)",
secondary_text).group(1)
second_heating_controls_match = re.search(r"Main Heating Controls\s*(.*?)\n", secondary_text)
data["Existing Secondary Heating Controls"] = (
second_heating_controls_match.group(1).strip() if second_heating_controls_match else ""
)
data["Existing Secondary Heating % of Heat"] = int(
re.search(r"Percentage of Heat\s*(\d+)\s*%", secondary_text).group(1)
)
# Extract Secondary Heating and Water Heating Codes
secondary_heating_code_match = re.search(r"Secondary Heating Code\s*(.*?)\n", text)
water_heating_code_match = re.search(r"Water Heating Code\s*(.*?)\n", text)
if data["Existing Secondary Heating System"] == "":
data["Secondary Heating Code"] = ""
else:
data["Secondary Heating Code"] = secondary_heating_code_match.group(
1).strip() if secondary_heating_code_match else ""
data["Water Heating Code"] = water_heating_code_match.group(1).strip()
dimensions = extract_building_parts_summary(text)
data.update(dimensions)
# Need to get the hot water
section_match = re.search(r"15\.0.*?\n(.*?)15\.1", text, re.DOTALL)
section_text = section_match.group(1)
# Extract Water Heating Code
code_match = re.search(r"Water Heating Code\s+(\S+)", section_text)
fuel_match = re.search(r"Water Heating Fuel Type\s+(.+)", section_text)
if fuel_match is None:
fuel_type = None
else:
fuel_type = fuel_match.group(1).strip()
code = code_match.group(1)
data["Hot Water System"] = code
data["Hot Water Fuel"] = fuel_type
# data["Number of Light Fittings"] = int(re.search(r"Total number of light fittings\s*(\d+)", text).group(1))
# data["Number of LEL Fittings"] = int(re.search(r"Total number of L.E.L. fittings\s*(\d+)", text).group(1))
# data["Number of fittings needing LEL"] = data["Number of Light Fittings"] - data["Number of LEL Fittings"]
extracted_roof_data = extract_roof_details_summary(text)
main_roof_data = [roof for roof in extracted_roof_data if "Main" in roof["Building Part"]][0]
data["Main Roof Type"] = main_roof_data["Roof Type"]
data["Main Roof Insulation"] = main_roof_data["Roof Insulation"]
data["Main Roof Insulation Thickness"] = main_roof_data["Roof Insulation Thickness"]
walls_data = extract_wall_details_summary(text)
# Get the main building wall data
main_building_walls = [wall for wall in walls_data if "Main" in wall["Building Part"]][0]
data["Main Wall Type"] = main_building_walls["Wall Type"]
data["Main Wall Insulation"] = main_building_walls["Wall Insulation"]
# data["Main Wall Dry-lining"] = main_building_walls["Wall Dry-lining"]
# data["Main Wall Thickness"] = main_building_walls["Wall Thickness (mm)"]
# data["Main Building Alternative Wall Type"] = main_building_walls["Alternative Wall Type"]
# data["Main Building Alternative Wall Insulation"] = main_building_walls["Alternative Wall Insulation"]
# data["Main Building Alternative Wall Dry-lining"] = main_building_walls["Alternative Wall Dry-lining"]
# data["Main Building Alternative Wall Thickness"] = main_building_walls["Alternative Wall Thickness (mm)"]
return data
folder_location = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/July 2025 Heating Upgrades"
df = pd.read_csv("/Users/khalimconn-kowlessar/Documents/hestia/July 2025 Surveys/export_summary_table.csv")
property_data = []
for _, x in tqdm(df.iterrows(), total=len(df)):
if not pd.isnull(x["error"]):
continue
filepath = x["filepath"]
if filepath in ["No summary file found"]:
continue
summary_data = extract_summary_report(pdf_path=filepath)
property_data.append(
{
**x.to_dict(),
**summary_data
}
)
property_data = pd.DataFrame(property_data)
# Store as excel
property_data.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/July 2025 Heating "
"Upgrades/property_table_24th_july.xlsx"
)
sandwell_data = property_data[property_data["company"] == "sandwell.gov.uk"]
sandwell_data.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Livewest/July 2025 Heating "
"Upgrades/Sandwell EPR data (WIP).xlsx"
)

View file

@ -9,7 +9,7 @@ class GainOptimiser:
This class is used to maximise gain, given a constrained cost
"""
def __init__(self, components, max_cost, max_gain):
def __init__(self, components, max_cost, max_gain, allow_slack=True):
"""
This function will try and maximise the gain, given a constrained cost. If we specific a max_gain, then the
optimisation routine is constained to try not to exceed a maximum increase
@ -21,6 +21,8 @@ class GainOptimiser:
:param components: List of components, where each component is a dictionary with keys "id", "cost" and "gain"
:param max_cost: Maximum cost constraint
:param max_gain: Maximum gain constraint
:param allow_slack: If True, allows the model to use slack variables to relax the cost constraint if the model
is infeasible. Defaults to True.
"""
self.components = components
self.max_cost = max_cost
@ -32,6 +34,7 @@ class GainOptimiser:
self.solution = []
self.solution_gain = None
self.solution_cost = None
self.allow_slack = allow_slack
def setup(self):
# Initialize Model
@ -124,15 +127,18 @@ class GainOptimiser:
if (self.m.status == OptimizationStatus.INFEASIBLE) or (
(self.m.status == OptimizationStatus.OPTIMAL) and not len(solution)
):
logger.info("We have an infeasible model, setting up slack model")
self.setup_slack()
self.m.optimize()
solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in
zip(group, group_vars)
if
var.x >= 0.99
]
if self.allow_slack:
logger.info("We have an infeasible model, setting up slack model")
self.setup_slack()
self.m.optimize()
solution = [
item for group, group_vars in zip(self.components, self.variables) for item, var in
zip(group, group_vars)
if
var.x >= 0.99
]
else:
logger.info("Infeasible but slack disabled - returning empty solution")
self.solution = solution

View file

@ -13,7 +13,9 @@ def prepare_input_measures(property_recommendations, goal, needs_ventilation):
"""
goal_map = {
"Increasing EPC": "sap_points"
"Increasing EPC": "sap_points",
"Energy Savings": "kwh_savings",
"Reducing CO2 emissions": "co2_equivalent_savings",
}
goal_key = goal_map[goal]

View file

@ -66,7 +66,7 @@ functions:
- sqs:
arn: arn:aws:sqs:${self:provider.region}:${aws:accountId}:model-engine-queue
batchSize: 1
maximumConcurrency: 2
maximumConcurrency: 2 # Heavily restricts concurrency to avoid overwhelming the ldmbda limits
resources:

View file

@ -7,10 +7,12 @@ from backend.app.utils import sap_to_epc
from sqlalchemy.orm import sessionmaker
from backend.app.db.connection import db_engine
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel, PropertyDetailsSpatial
PORTFOLIO_ID = 206
SCENARIOS = [389]
# PORTFOLIO_ID = 206
# SCENARIOS = [389]
PORTFOLIO_ID = 221
SCENARIOS = [427]
def get_data(portfolio_id, scenario_ids):
@ -125,17 +127,64 @@ df["predicted_post_works_sap"] = df["predicted_post_works_sap"].round()
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(lambda x: sap_to_epc(x))
# We merge this back to the main dataframe, which will contain the bathrooms
from utils.s3 import read_csv_from_s3
from utils.s3 import read_csv_from_s3, read_excel_from_s3
asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv')
# asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv')
asset_list = read_excel_from_s3(
bucket_name="retrofit-plan-inputs-dev", file_key='8/221/20250722T202328736Z/asset_list.xlsx',
header_row=0, sheet_name="320 - edited"
)
asset_list = pd.DataFrame(asset_list)
asset_list = asset_list[["domna_full_address", "domna_postcode", "epc_os_uprn", ]].copy()
asset_list = asset_list.rename(columns={"epc_os_uprn": "uprn"})
df["uprn"] = df["uprn"].astype(str)
asset_list["uprn"] = asset_list["uprn"].astype("Int64").astype(str)
asset_list = asset_list.merge(
df.drop(columns=["address", "postcode", "property_type", "total_floor_area"]),
how="left",
on="uprn"
)
# Get conservation area data from property details spatial. based on the UPRNs
def get_conservation_area_data(uprns):
session = sessionmaker(bind=db_engine)()
session.begin()
# Query to get conservation area data
spatial_query = session.query(
PropertyDetailsSpatial
).filter(
PropertyDetailsSpatial.uprn.in_(uprns) # Filter by UPRNs
).all()
# Transform spatial data to include all fields dynamically
spatial_data = [
{col.name: getattr(spatial, col.name) for col in PropertyDetailsSpatial.__table__.columns}
for spatial in spatial_query
]
session.close()
return pd.DataFrame(spatial_data)
uprns = asset_list[
~pd.isna(asset_list["uprn"]) & (asset_list["uprn"] != "<NA>")
]["uprn"].astype(int).unique().tolist()
conservation_area_data = get_conservation_area_data(uprns)
conservation_area_data["uprn"] = conservation_area_data["uprn"].astype(str)
asset_list = asset_list.merge(
conservation_area_data[["uprn", "conservation_status", "is_listed_building", "is_heritage_building"]],
how="left",
on="uprn"
)
# For exporting NCHA
asset_list.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/NCHA/320 Portfolio/asset_list_epc_b.xlsx",
index=False
)
condition_costs = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx",
sheet_name="Prices - Khalim",