allow double glazing if no restrictions but confirmed performance of secondary glazing

This commit is contained in:
Khalim Conn-Kowlessar 2026-01-10 11:40:49 +00:00
parent ef350df6d8
commit ef942ef18a
16 changed files with 1147 additions and 167 deletions

View file

@ -10,3 +10,4 @@ from .materials_functions import *
from .inspections_functions import *
from .non_intrusive_surveys import *
from .whlg_functions import *
from .already_installed_functions import *

View file

@ -0,0 +1,40 @@
from backend.app.db.models.recommendations import InstalledMeasure
from typing import Dict, List, Set
from collections import defaultdict
def get_installed_measure_types_by_uprns(
session,
uprns: List[int],
) -> Dict[int, Set[str]]:
"""
Returns installed measure types per UPRN.
{
uprn: {"cavity_wall_insulation", "mechanical_ventilation", ...}
}
"""
if not uprns:
return {}
rows = (
session.query(
InstalledMeasure.uprn,
InstalledMeasure.measure_type,
)
.filter(InstalledMeasure.is_active.is_(True))
.filter(InstalledMeasure.uprn.in_(uprns))
.all()
)
out: Dict[int, Set[str]] = defaultdict(set)
for uprn, measure_type in rows:
out[uprn].add(
measure_type.value
if hasattr(measure_type, "value")
else measure_type
)
return out

View file

@ -27,7 +27,8 @@ def prepare_plan_data(
"""
# Plan carbon savings
co2_savings = sum([r["co2_equivalent_savings"] for r in default_recommendations])
post_co2_emissions = p.data["co2-emissions-current"] - co2_savings
raise Exception("CHECK ME")
post_co2_emissions = p.energy["co2_emissions"] - co2_savings
# Plan bill savings
energy_bill_savings = sum([r["energy_cost_savings"] for r in default_recommendations])

View file

@ -5,6 +5,5 @@ from typing import Any, Optional
@dataclass
class PropertyRequestData:
patch: dict
already_installed: list
non_invasive_recommendations: dict
valuation: Optional[float]

View file

@ -52,7 +52,7 @@ def patch_epc(patch, epc_records):
def extract_property_request_data(
address: Address, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
address: Address, patches, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
@ -64,10 +64,6 @@ def extract_property_request_data(
x for x in patches if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), {})
property_already_installed = next((
x for x in already_installed if (x["address"] == address.address) and (x["postcode"] == address.postcode)
), [])
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
# we need to check existence of uprn
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
@ -119,7 +115,6 @@ def extract_property_request_data(
# Return data class to give a structured format
return PropertyRequestData(
patch=patch,
already_installed=property_already_installed,
non_invasive_recommendations=property_non_invasive_recommendations,
valuation=property_valuation
)

View file

@ -684,6 +684,9 @@ async def model_engine(body: PlanTriggerRequest):
energy_assessments_by_uprn = db_funcs.energy_assessment_functions.get_latest_assessments_for_uprns(
session, uprns
)
already_installed_by_uprn = db_funcs.already_installed_functions.get_installed_measure_types_by_uprns(
session, uprns
)
# If we have properties that need to be created, we cerate them in bulk
logger.info("Determine new properties to be created")
@ -703,7 +706,7 @@ async def model_engine(body: PlanTriggerRequest):
property_lookup[("uprn", uprn)] = prop_id
if landlord_property_id:
property_lookup[("landlord_property_id", landlord_property_id)] = prop_id
logger.info("Processing each property for model input preparation")
input_properties, inspections_map, eco_packages, epc_upserts = [], {}, {}, []
for addr, config in tqdm(
@ -725,6 +728,8 @@ async def model_engine(body: PlanTriggerRequest):
energy_assessment = energy_assessments_by_uprn.get(addr.uprn)
property_already_installed = list(already_installed_by_uprn[addr.uprn])
epc_searcher = SearchEpc(
address1=addr.address1,
postcode=addr.postcode,
@ -767,7 +772,6 @@ async def model_engine(body: PlanTriggerRequest):
req_data = extract_property_request_data(
address=addr,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=addr.uprn,
@ -813,7 +817,7 @@ async def model_engine(body: PlanTriggerRequest):
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=req_data.already_installed + eco_packages.get(property_id)[3],
already_installed=property_already_installed + eco_packages.get(property_id)[3],
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
@ -965,6 +969,8 @@ async def model_engine(body: PlanTriggerRequest):
# Temp putting this here
recommendations_scoring_data["is_post_sap10_ending"] = True
recommendations_scoring_data["sap_starting"] = 77
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
@ -1189,7 +1195,8 @@ async def model_engine(body: PlanTriggerRequest):
property_updates, property_epc_details, property_spatial_updates = [], [], []
plans_to_create, recommendations_to_create = [], []
# TODO: Check the update to carbon
print("NEED TO CHECK THE UPDATE TO CARBON")
# Prepare the data that will need to be uploaded in bulk
for p in input_properties:
recommendations_for_property = recommendations.get(p.id, [])

View file

@ -0,0 +1,14 @@
party_map = {
"Before 1900": 'England and Wales: before 1900',
"1900-1929": 'England and Wales: 1900-1929',
"1930-1949": 'England and Wales: 1930-1949',
"1950-1966": 'England and Wales: 1950-1966',
"1967-1975": 'England and Wales: 1967-1975',
"1976-1982": 'England and Wales: 1976-1982',
"1983-1990": 'England and Wales: 1983-1990',
"1991-1995": 'England and Wales: 1991-1995',
"1996-2002": 'England and Wales: 1996-2002',
"2003-2006": 'England and Wales: 2003-2006',
"2007-2011": 'England and Wales: 2007-2011',
"2012 onwards": 'England and Wales: 2012-2021',
}

View file

@ -0,0 +1,15 @@
parity_map = {
"MidTerrace": "Mid-Terrace",
"EndTerrace": "End-Terrace",
"Detached": "Detached",
"SemiDetached": "Semi-Detached",
"EnclosedMidTerrace": "Enclosed Mid-Terrace",
"EnclosedEndTerrace": "Enclosed End-Terrace",
}
# MidTerrace 41462
# EndTerrace 20910
# Detached 16875
# SemiDetached 14725
# EnclosedMidTerrace 3176
# EnclosedEndTerrace 2393

View file

@ -0,0 +1,6 @@
parity_map = {
"Flat": "Flat",
"Maisonette": "Maisonette",
"Bungalow": "Bungalow",
"House": "House",
}

View file

@ -0,0 +1,3 @@
parity_map = {
}

View file

@ -0,0 +1,95 @@
import pandas as pd
from etl.epc.DataProcessor import construction_age_bounds_map
from backend.onboarders.mappings.property_type import parity_map as property_map
from backend.onboarders.mappings.age_band import party_map as age_band_map
from backend.onboarders.mappings.built_form import parity_map as built_form_map
def check_nulls(data, original_column, mapped_column):
# We only allow nulls if the oroginal value was null
null_vals = data[pd.isnull(data[mapped_column])]
if null_vals.empty:
return True
# We make sure all original values were null
assert pd.isnull(null_vals[original_column]).all(), (
f"Some values in {mapped_column} were not mapped, but original values were not null"
)
# Sample input data
data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)
# We want to map the parity fields to standard EPC references. This will allow us to
# 1) Estimate EPCs, more accurately
# 2) Patch incorrect EPCs with ease
# 3) Indicate already installed measures
# ------------ construction_age_band ------------
# Map to EPC age bands
# def construction_date_to_band(year):
# if pd.isnull(year):
# return None
# # Get the year from the date which is numpy datetime format
# for label, ranges in construction_age_bounds_map.items():
# if ranges["l"] <= year <= ranges["u"]:
# return label
# raise NotImplementedError("year out of bounds")
#
#
# data["construction_age_band"] = pd.to_datetime(data["Construction Date"]).dt.year.apply(construction_date_to_band)
data["construction_age_band"] = data["Construction Years"].map(age_band_map)
check_nulls(data, "Construction Years", "construction_age_band")
# ------------ property_type ------------
data["property_type"] = data["Type"].map(property_map)
assert pd.isnull(data["property_type"]).sum() == 0, "Some property types were not mapped"
# ------------ built_form ------------
data["built_form"] = data["Attachment"].map(built_form_map)
assert pd.isnull(data["built_form"]).sum() == 0, "Some built forms were not mapped"
# ------------ Wall Construction ------------
data["walls_combined"] = data["Wall Construction"] + "+" + data["Wall Insulation"].fillna("Unknown Insulation")
data["Wall Insulation"].value_counts()
data["Wall Construction"].value_counts()
as_built_map = {
"Cavity": {"insulated_age_bands":[], "partial_insulated_age_bands": []},
"Solid Brick": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"System": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Timber Frame": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Sandstone": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Granite": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
"Cob": {"insulated_age_bands": [], "partial_insulated_age_bands": []},
}
def map_wall_construction(wall_constuction, wall_insulation, construction_age_band):
if wall_insulation == "AsBuilt":
# Deduce based on wall construction and age band
bands = as_built_map.get(wall_constuction, None)
if bands is None:
raise NotImplementedError(f"Wall construction {wall_constuction} not in as built map")
# We check if the age band is in insulated or partial insulated, and if neither, we assume uninsulated
# Variables we want to map
'Org Ref', 'Address 1', 'Address 2', 'Address 3', 'Postcode', 'Type',
'Attachment', 'Construction Years', 'Wall Construction',
'Wall Insulation', 'Roof Construction', 'Roof Insulation',
'Floor Construction', 'Floor Insulation', 'Glazing', 'Heating',
'Boiler Efficiency', 'Main Fuel', 'Controls Adequacy', 'UPRN',
'Total Floor Area (m2)'

View file

@ -75,6 +75,10 @@ df = df.sort_values("property_id", ascending=True)
agg = df.groupby("property_id").size().reset_index(name="n_plans")
agg = agg.sort_values("n_plans", ascending=True)
agg[agg["n_plans"] == 3]
agg[agg["n_plans"] == 2].shape
agg[agg["n_plans"] != 3]
assert all(agg["n_plans"] == 3)
@ -153,4 +157,54 @@ with pd.ExcelWriter(filename) as writer:
sal.iloc[41000:61000, :].to_excel(writer, sheet_name="batch 4", index=False)
sal.iloc[61000:81000, :].to_excel(writer, sheet_name="batch 5", index=False)
sal.iloc[81000:, :].to_excel(writer, sheet_name="batch 5", index=False)
sal.iloc[81000:, :].to_excel(writer, sheet_name="batch 6", index=False)
# TODO - mistake was made when creating the final SAL
b1 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 1"
)
b2 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 2"
)
b3 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 3"
)
b4 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 4"
)
b5 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/20260101 "
"sal.xlsx",
sheet_name="batch 5"
)
# Batch 6 should be the remaining
total = pd.concat([b1, b2, b3, b4, b5])
remaining = sal[~sal["epc_os_uprn"].isin(total["epc_os_uprn"].values)]
# Create new output
filename = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/Final SAL/"
"20260107 corrected batch 6 sal.xlsx")
with pd.ExcelWriter(filename) as writer:
sal.to_excel(writer, sheet_name="Standardised Asset List", index=False)
# Top 1000 for testing
b1.to_excel(writer, sheet_name="batch 1", index=False)
# Batch 2 is the next 20,000
b2.to_excel(writer, sheet_name="batch 2", index=False)
# Batch 3 is the next 20,000
b3.to_excel(writer, sheet_name="batch 3", index=False)
b4.to_excel(writer, sheet_name="batch 4", index=False)
b5.to_excel(writer, sheet_name="batch 5", index=False)
remaining.to_excel(writer, sheet_name="batch 6", index=False)
all_together = pd.concat(
[b1, b2, b3, b4, b5, remaining]
)

View file

@ -0,0 +1,21 @@
import pandas as pd
df = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/Parity Data 08012026.xlsx"
)
df['SAP Score'].mean()
df[~pd.isnull(df["Lodged EPC Score"])]["Lodged EPC Score"].mean()
df[~pd.isnull(df["Lodged EPC Score"])]["SAP Score"].mean()
df['Difference'] = abs(df['SAP Score'] - df['Lodged EPC Score'])
df[~pd.isnull(df["Lodged EPC Score"])]["Difference"].mean()
df["Lodged EPC Band"].value_counts(normalize=True)
df["SAP Band"].value_counts(normalize=True)
z = df[df["SAP Band"] != df["Lodged EPC Band"]]
agg = z.groupby(["Lodged EPC Band", "SAP Band"]).size().reset_index(name="count")
zz = z[z["Lodged EPC Band"] == "A"]

View file

@ -0,0 +1,7 @@
import pandas as pd
sustainability_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)

View file

@ -72,9 +72,12 @@ class WindowsRecommendations:
elif "secondary_glazing" in measures and "double_glazing" not in measures:
is_secondary_glazing = True
else:
is_secondary_glazing = self.property.restricted_measures or (
self.property.windows["glazing_type"] == "secondary"
# If the property currently has some secondary glazing but isn't in a conservation area
#
is_secondary_glazing = self.property.restricted_measures and (
self.property.data["windows-energy-eff"] in ["Poor", "Very Poor"]
)
windows_area = self.property.windows_area
if not number_of_windows:
@ -200,6 +203,8 @@ class WindowsRecommendations:
else:
glazed_type_ending = "secondary glazing"
new_windows_description = "Multiple glazing throughout"
# Windows only end up with an average efficiency
windows_energy_eff = "Average"
else:
raise ValueError("Invalid glazing type - implement me")
@ -208,7 +213,6 @@ class WindowsRecommendations:
windows_energy_eff = "Very Good"
# For post 2002 windows, the energy efficiency is "Good" and so for the simulation, we simulate with "Good"
windows_ending_config = WindowAttributes(new_windows_description).process()
windows_simulation_config = check_simulation_difference(