Merge pull request #612 from Hestia-Homes/eco-eligiblity-bug

minor debugging
This commit is contained in:
KhalimCK 2025-12-13 22:16:34 +08:00 committed by GitHub
commit 6fdde5ee40
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 722 additions and 217 deletions

View file

@ -59,25 +59,26 @@ def app():
Property UPRN
"""
# Lambeth:
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lambeth/December 10th"
data_filename = "lambeth_sw2_leigham court estate.xlsx"
# Peabody data for cleaning
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation")
data_filename = "to_standardise_uprns.xlsx"
sheet_name = "Sheet1"
postcode_column = 'Postcode'
address1_column = "Address"
address1_column = "Address 1"
address1_method = None
fulladdress_column = None
address_cols_to_concat = ["Address"]
address_cols_to_concat = ["Address 1", "Address 2", "Address 3"]
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = None
landlord_property_type = None
landlord_built_form = None
landlord_property_type = "Type"
landlord_built_form = "Attachment"
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "row_id"
landlord_property_id = "Org Ref"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@ -93,6 +94,40 @@ def app():
asset_list_header = 0
landlord_block_reference = None
# Lambeth:
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lambeth/December 10th"
# data_filename = "lambeth_sw2_leigham court estate.xlsx"
# sheet_name = "Sheet1"
# postcode_column = 'Postcode'
# address1_column = "Address"
# address1_method = None
# fulladdress_column = None
# address_cols_to_concat = ["Address"]
# missing_postcodes_method = None
# landlord_year_built = None
# landlord_os_uprn = None
# landlord_property_type = None
# landlord_built_form = None
# landlord_wall_construction = None
# landlord_roof_construction = None
# landlord_heating_system = None
# landlord_existing_pv = None
# landlord_property_id = "row_id"
# landlord_sap = None
# outcomes_filename = None
# outcomes_sheetname = None
# outcomes_postcode = None
# outcomes_houseno = None
# outcomes_id = None
# outcomes_address = None
# master_filepaths = []
# master_id_colnames = []
# master_to_asset_list_filepath = None
# phase = False
# ecosurv_landlords = None
# asset_list_header = 0
# landlord_block_reference = None
# Maps addresses to uprn in problematic cases
manual_uprn_map = {}
@ -230,22 +265,22 @@ def app():
)
# We now retrieve any failed properties
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
epc_data_failed, _, _ = get_data(
df=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
uprn_column=AssetList.STANDARD_UPRN,
fulladdress_column=AssetList.STANDARD_FULL_ADDRESS,
address1_column=AssetList.STANDARD_ADDRESS_1,
postcode_column=AssetList.STANDARD_POSTCODE,
property_type_column=AssetList.STANDARD_PROPERTY_TYPE,
built_form_column=AssetList.STANDARD_BUILT_FORM,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only,
epc_auth_token=EPC_AUTH_TOKEN
)
epc_data_chunk.extend(epc_data_failed)
# chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
# epc_data_failed, _, _ = get_data(
# df=chunk_failed,
# row_id_name=asset_list.DOMNA_PROPERTY_ID,
# uprn_column=AssetList.STANDARD_UPRN,
# fulladdress_column=AssetList.STANDARD_FULL_ADDRESS,
# address1_column=AssetList.STANDARD_ADDRESS_1,
# postcode_column=AssetList.STANDARD_POSTCODE,
# property_type_column=AssetList.STANDARD_PROPERTY_TYPE,
# built_form_column=AssetList.STANDARD_BUILT_FORM,
# manual_uprn_map=manual_uprn_map,
# epc_api_only=epc_api_only,
# epc_auth_token=EPC_AUTH_TOKEN
# )
#
# epc_data_chunk.extend(epc_data_failed)
# Append the failed data to the main data
# Store the chunk locally as a csv
@ -422,3 +457,7 @@ def app():
if not asset_list.geographical_areas.empty:
asset_list.geographical_areas.to_excel(writer, sheet_name="Geographical Areas", index=False)
# Store dupes
if not asset_list.duplicated_addresses.empty:
asset_list.duplicated_addresses.to_excel(writer, sheet_name="Duplicate Properties", index=False)

View file

@ -458,6 +458,12 @@ BUILT_FORM_MAPPINGS = {
'Maisonette: Detached: Mid Floor': 'detached',
'Bungalow: EnclosedMidTerrace': 'enclosed mid-terrace',
'House: EnclosedMidTerrace': 'enclosed mid-terrace'
'House: EnclosedMidTerrace': 'enclosed mid-terrace',
'EnclosedMidTerrace': 'enclosed mid-terrace',
'EnclosedEndTerrace': 'enclosed end-terrace',
'EndTerrace': 'end-terrace',
'SemiDetached': 'semi-detached',
'MidTerrace': 'mid-terrace'
}

View file

@ -1,4 +1,4 @@
from sqlalchemy import insert, delete, text
from sqlalchemy import insert, delete, select
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.recommendations import (
@ -242,20 +242,26 @@ def chunked(iterable, size=100):
yield iterable[i:i + size]
# def fast_delete_recommendations(session, chunk):
# placeholders = ",".join(["(:p{})".format(i) for i in range(len(chunk))])
# params = {f"p{i}": chunk[i] for i in range(len(chunk))}
#
# sql = text(f"""
# WITH ids(property_id) AS (
# VALUES {placeholders}
# )
# DELETE FROM recommendation r
# USING ids
# WHERE r.property_id = ids.property_id;
# """)
#
# session.execute(sql, params, execution_options={"synchronize_session": False})
def fast_delete_recommendations(session, chunk):
placeholders = ",".join(["(:p{})".format(i) for i in range(len(chunk))])
params = {f"p{i}": chunk[i] for i in range(len(chunk))}
sql = text(f"""
WITH ids(property_id) AS (
VALUES {placeholders}
)
DELETE FROM recommendation r
USING ids
WHERE r.property_id = ids.property_id;
""")
session.execute(sql, params, execution_options={"synchronize_session": False})
session.execute(
delete(Recommendation)
.where(Recommendation.property_id.in_(chunk))
)
def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
@ -362,11 +368,19 @@ def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
# --------------------------
# Recommendations (fast delete)
# --------------------------
rec_chunks = list(chunked(property_ids, batch_size))
# rec_chunks = list(chunked(property_ids, batch_size * 5)) # larger chunks for fast delete
# total = len(rec_chunks)
# for i, chunk in enumerate(rec_chunks, start=1):
# print_progress("Deleting Recommendations", i, total)
# fast_delete_recommendations(session, chunk)
rec_chunks = list(chunked(recommendation_ids, batch_size))
total = len(rec_chunks)
for i, chunk in enumerate(rec_chunks, start=1):
print_progress("Deleting Recommendations", i, total)
fast_delete_recommendations(session, chunk)
session.execute(
delete(Recommendation)
.where(Recommendation.id.in_(chunk))
)
# --------------------------
# Inspections
@ -412,3 +426,114 @@ def clear_portfolio(session: Session, portfolio_id: int, batch_size=100):
session.commit()
print("Portfolio cleared.")
def clear_portfolio_in_batches(
session: Session,
portfolio_id: int,
property_batch_size: int = 10
):
# Fetch all property IDs once
property_ids = [
pid for (pid,) in
session.query(PropertyModel.id)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
]
def delete_for_property_batch(prop_ids):
# ----------------------------
# Recommendations → PlanRecommendations
# ----------------------------
rec_subq = (
select(Recommendation.id)
.where(Recommendation.property_id.in_(prop_ids))
)
session.execute(
delete(PlanRecommendations)
.where(PlanRecommendations.recommendation_id.in_(rec_subq))
)
session.execute(
delete(RecommendationMaterials)
.where(RecommendationMaterials.recommendation_id.in_(rec_subq))
)
session.execute(
delete(Recommendation)
.where(Recommendation.property_id.in_(prop_ids))
)
# ----------------------------
# Inspections
# ----------------------------
session.execute(
delete(InspectionModel)
.where(InspectionModel.property_id.in_(prop_ids))
)
# ----------------------------
# Plans (scoped to these properties)
# ----------------------------
plan_subq = (
select(Plan.id)
.where(Plan.property_id.in_(prop_ids))
)
session.execute(
delete(PlanRecommendations)
.where(PlanRecommendations.plan_id.in_(plan_subq))
)
session.execute(
delete(FundingPackageMeasures)
.where(
FundingPackageMeasures.funding_package_id.in_(
select(FundingPackage.id)
.where(FundingPackage.plan_id.in_(plan_subq))
)
)
)
session.execute(
delete(FundingPackage)
.where(FundingPackage.plan_id.in_(plan_subq))
)
session.execute(
delete(Plan)
.where(Plan.id.in_(plan_subq))
)
# ----------------------------
# Property-scoped auxiliary tables
# ----------------------------
session.execute(
delete(PropertyDetailsEpcModel)
.where(PropertyDetailsEpcModel.property_id.in_(prop_ids))
)
session.execute(
delete(PropertyTargetsModel)
.where(PropertyTargetsModel.property_id.in_(prop_ids))
)
# ----------------------------
# Properties (last)
# ----------------------------
session.execute(
delete(PropertyModel)
.where(PropertyModel.id.in_(prop_ids))
)
# -------- BATCH DELETE LOOP --------
property_chunks = list(chunked(property_ids, property_batch_size))
total_batches = len(property_chunks)
for i, prop_ids in enumerate(property_chunks, start=1):
print(f"Deleting batch {i}/{total_batches} ({len(prop_ids)} properties)")
delete_for_property_batch(prop_ids)
session.commit()
print("Portfolio cleared in batches.")

View file

@ -662,7 +662,9 @@ async def model_engine(body: PlanTriggerRequest):
address1 = config.get("domna_address_1", None)
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
full_address = config.get("domna_full_address") if body.file_format == "domna_asset_list" else None
full_address = config.get("domna_full_address", "") if body.file_format == "domna_asset_list" else None
if not isinstance(full_address, str): # Catch for when the full address is nan
full_address = None
heating_system = parse_heating_system(config)
associated_uprns = []

View file

@ -290,6 +290,14 @@ class AnnualBillSavings:
# The solar thermal covers a % of the heating kwh, so we need to adjust the cost
return (kwh / cop) * assumptions.SOLAR_CONSUMPTION_PROPORTION * cls.ELECTRICITY_PRICE_CAP
if fuel in ['Oil + Solar Thermal']:
# The solar thermal covers a % of the heating kwh, so we need to adjust the cost
price_data = cls.FUEL_DATA[cls.FUEL_DATA["Fuel"] == "Kerosene"].squeeze()
cost_per_kwh = cls.cost_per_kwh(
price_data["Price (p)"], price_data["Energy Content, Net Calorific value (kWh/unit)"]
)
return (kwh / cop) * cost_per_kwh * assumptions.SOLAR_CONSUMPTION_PROPORTION
if fuel == "LPG + Solar Thermal":
# The solar thermal covers a % of the heating kwh, so we need to adjust the cost
price_data = cls.FUEL_DATA[cls.FUEL_DATA["Fuel"] == "LPG"].squeeze()

View file

@ -82,6 +82,12 @@ costs_by_floor_area = epc_data[
][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT",
"HOT_WATER_COST_CURRENT"]].copy()
epc_data = epc_data[
(epc_data["MAINHEAT_DESCRIPTION"].str.contains("SAP05:") == False) &
(~epc_data["LIGHTING_COST_CURRENT"].isin([None, ""])) &
(~pd.isnull(epc_data["LIGHTING_COST_CURRENT"]))
]
costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns]
for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]:
costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"]
@ -92,8 +98,8 @@ costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample(
10000).reset_index(drop=True)
sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2008-01-01"].drop_duplicates("UPRN").sample(
50000).reset_index(drop=True)
# TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type
# TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used
@ -163,6 +169,8 @@ mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_pred
mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"])
# TODO: We might want to implement this generally, via an ETL process
for x in cleaned["mainheat-description"]:
x["has_wood_chips"] = False
for p in input_properties:
for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]:
if pd.isnull(p.data[col]):
@ -313,6 +321,10 @@ for p in tqdm(input_properties):
if not recommendations.get(p.id):
continue
# Temp allow to skip
if not isinstance(recommendations.get(p.id)[0], list):
continue
# we need to double unlist because we have a list of lists
property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs}
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
@ -336,32 +348,32 @@ for p in tqdm(input_properties):
)
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
funding = Funding(
tenure=body.housing_type,
project_scores_matrix=project_scores_matrix,
partial_project_scores_matrix=partial_project_scores_matrix,
whlg_eligible_postcodes=whlg_eligible_postcodes,
eco4_social_cavity_abs_rate=13,
eco4_social_solid_abs_rate=17,
eco4_private_cavity_abs_rate=13,
eco4_private_solid_abs_rate=17,
gbis_social_cavity_abs_rate=21,
gbis_social_solid_abs_rate=25,
gbis_private_cavity_abs_rate=21,
gbis_private_solid_abs_rate=28,
)
li_thickness = convert_thickness_to_numeric(
p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
)
current_wall_u_value = p.walls["thermal_transmittance"]
if current_wall_u_value is None:
current_wall_u_value = get_wall_u_value(
clean_description=p.walls["clean_description"],
age_band=p.age_band,
is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
)
# funding = Funding(
# tenure=body.housing_type,
# project_scores_matrix=project_scores_matrix,
# partial_project_scores_matrix=partial_project_scores_matrix,
# whlg_eligible_postcodes=whlg_eligible_postcodes,
# eco4_social_cavity_abs_rate=13,
# eco4_social_solid_abs_rate=17,
# eco4_private_cavity_abs_rate=13,
# eco4_private_solid_abs_rate=17,
# gbis_social_cavity_abs_rate=21,
# gbis_social_solid_abs_rate=25,
# gbis_private_cavity_abs_rate=21,
# gbis_private_solid_abs_rate=28,
# )
#
# li_thickness = convert_thickness_to_numeric(
# p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"]
# )
# current_wall_u_value = p.walls["thermal_transmittance"]
# if current_wall_u_value is None:
# current_wall_u_value = get_wall_u_value(
# clean_description=p.walls["clean_description"],
# age_band=p.age_band,
# is_granite_or_whinstone=p.walls["is_granite_or_whinstone"],
# is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"],
# )
# We insert the innovation uplift
measures_to_optimise_with_uplift = deepcopy(measures_to_optimise)
@ -369,35 +381,39 @@ for p in tqdm(input_properties):
# TODO: Turn this into a function and store the innovaiton uplift
for group in measures_to_optimise_with_uplift:
for r in group:
if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
"extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
(
r["partial_project_score"],
r["partial_project_funding"],
r["innovation_uplift"],
r["uplift_project_score"],
) = (
0, 0, 0, 0
)
continue
(
r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]
) = funding.get_innovation_uplift(
measure=r,
starting_sap=int(p.data["current-energy-efficiency"]),
floor_area=p.floor_area,
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
r["uplift_project_score"]) = (
0, 0, 0, 0
)
# if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating",
# "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]:
# (
# r["partial_project_score"],
# r["partial_project_funding"],
# r["innovation_uplift"],
# r["uplift_project_score"],
# ) = (
# 0, 0, 0, 0
# )
# continue
#
# (
# r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
# r["uplift_project_score"]
# ) = funding.get_innovation_uplift(
# measure=r,
# starting_sap=int(p.data["current-energy-efficiency"]),
# floor_area=p.floor_area,
# is_cavity=p.walls["is_cavity_wall"],
# current_wall_uvalue=current_wall_u_value,
# is_partial="partial" in p.walls["clean_description"].lower(),
# existing_li_thickness=li_thickness,
# mainheating=p.main_heating,
# main_fuel=p.main_fuel,
# mainheat_energy_eff=p.data["mainheat-energy-eff"],
# )
if r["already_installed"]:
# if already installed, we zero out the uplift and funding
(r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"],
@ -411,7 +427,7 @@ for p in tqdm(input_properties):
)
# When the goal is Increasing EPC, we can run the funding optimiser
if body.goal == "Increasing EPC":
if body.goal == "Switch off":
solutions = optimise_with_funding_paths(
p=p,
@ -481,37 +497,43 @@ for p in tqdm(input_properties):
ROOF_INSULATION_MEASURES
)
funding.check_funding(
measures=solution,
starting_sap=int(p.data["current-energy-efficiency"]),
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
floor_area=p.floor_area,
mainheat_description=p.main_heating["clean_description"],
heating_control_description=p.main_heating_controls["clean_description"],
is_cavity=p.walls["is_cavity_wall"],
current_wall_uvalue=current_wall_u_value,
is_partial="partial" in p.walls["clean_description"].lower(),
existing_li_thickness=li_thickness,
mainheating=p.main_heating,
main_fuel=p.main_fuel,
mainheat_energy_eff=p.data["mainheat-energy-eff"],
has_wall_insulation_recommendation=has_wall_insulation_recommendation,
has_roof_insulation_recommendation=has_roof_insulation_recommendation,
)
# funding.check_funding(
# measures=solution,
# starting_sap=int(p.data["current-energy-efficiency"]),
# ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
# floor_area=p.floor_area,
# mainheat_description=p.main_heating["clean_description"],
# heating_control_description=p.main_heating_controls["clean_description"],
# is_cavity=p.walls["is_cavity_wall"],
# current_wall_uvalue=current_wall_u_value,
# is_partial="partial" in p.walls["clean_description"].lower(),
# existing_li_thickness=li_thickness,
# mainheating=p.main_heating,
# main_fuel=p.main_fuel,
# mainheat_energy_eff=p.data["mainheat-energy-eff"],
# has_wall_insulation_recommendation=has_wall_insulation_recommendation,
# has_roof_insulation_recommendation=has_roof_insulation_recommendation,
# )
# Determine the scheme
scheme = "none"
if funding.eco4_eligible:
scheme = "eco4"
if scheme == "none" and funding.gbis_eligible:
scheme = "gbis"
# if funding.eco4_eligible:
# scheme = "eco4"
# if scheme == "none" and funding.gbis_eligible:
# scheme = "gbis"
funded_measures = solution if scheme in ["gbis", "eco4"] else []
project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
total_uplift = funding.eco4_uplift
full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
partial_project_score = funding.partial_project_abs
uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
funded_measures = []
# funded_measures = solution if scheme in ["gbis", "eco4"] else []
# project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs
project_funding = 0
# total_uplift = funding.eco4_uplift
total_uplift = 0
# full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs
full_project_score = 0
# partial_project_score = funding.partial_project_abs
partial_project_score = 0
# uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift
uplift_project_score = 0
selected = {r["id"] for r in solution}

View file

@ -0,0 +1,47 @@
# After going back to Lincs rural, they gave us some additional data that we can use to try to fetch missed UPRNs again
import pandas as pd
# missed = pd.read_excel(
# "/Users/khalimconn-kowlessar/Downloads/lincs_rural_missed_nov_2025.xlsx",
# sheet_name="Missed Properties"
# )
# missed = missed[~pd.isnull(missed["rrn"])]
prepared = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/lincs_rural_standardised_ara_nov_2025.xlsx",
sheet_name="Standardised Asset List"
)
updated_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/MASTER LIST EPCS UPDATED November 2025 Domna Homes - Copy.xlsx",
sheet_name="PROPERTY EPC RATINGS"
)
updated_data = updated_data[~pd.isnull(updated_data["Property Ref."])]
missed = updated_data[~updated_data["Property Ref."].isin(prepared["landlord_property_id"].values.tolist())].copy()
# missed.to_csv("/Users/khalimconn-kowlessar/Downloads/lincs_rural_missed_uprn.csv")
# We'll grab the UPRNs manually and then pull them in, and prepare for ARA
missing_uprns = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/lincs_rural_missed_uprn.csv")
missing_uprns["landlord_property_id"] = missing_uprns["Property Ref."].copy()
missing_uprns["domna_property_id"] = missing_uprns["Property Ref."].copy()
missing_uprns["domna_address_1"] = missing_uprns['Unnamed: 1'].str.split(",").str[0].str.strip()
missing_uprns["postcode"] = missing_uprns['Unnamed: 1'].str.split(",").str[-1].str.strip()
missing_uprns["landlord_property_type"] = "unknown"
missing_uprns["landlord_built_form"] = "unknown"
missing_uprns["domna_full_address"] = missing_uprns['Unnamed: 1'].copy()
missed_standardised_for_ara = missing_uprns[
['landlord_property_id', 'domna_address_1', 'landlord_property_type', 'landlord_built_form', 'postcode',
'domna_property_id', 'UPRN']
].rename(
columns={"UPRN": "epc_os_uprn"}
)
# Store
missed_standardised_for_ara.to_excel(
"/Users/khalimconn-kowlessar/Downloads/lincs_rural_missed_standardised_ara_nov_2025.xlsx",
index=False,
sheet_name="Standardised Asset List"
)

View file

@ -0,0 +1,147 @@
"""
We have found, within the Peabody data, a large volume of properties with missing and incorrects
UPRNS and incorrect address data. We want to flag these records and also find missings where we can
We also have duplicate UPRNS that should be flagged
"""
import json
import time
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from dotenv import load_dotenv
from asset_list.utils import get_data_for_property
from utils.logger import setup_logger
from utils.s3 import read_io_from_s3, save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
logger = setup_logger()
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
sustainability_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)
property_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Properties"
)
missing_uprns = sustainability_data[pd.isnull(sustainability_data['UPRN'])].copy()
# Any non-numeric UPRNS or leading with 0s are invalid
non_numeric_uprns = sustainability_data[
~sustainability_data['UPRN'].astype(str).str.match(r'^[1-9][0-9]*$') & ~pd.isnull(sustainability_data['UPRN'])
].copy()
# 70 properties
leading_zero_uprns = sustainability_data[
sustainability_data['UPRN'].astype(str).str.startswith('0')
].copy()
# Flag duplicates
duplicate_uprns = sustainability_data[
sustainability_data.duplicated(subset=['UPRN'], keep=False) & ~pd.isnull(sustainability_data['UPRN'])
].copy()
# Store this data
# missing_uprns.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting
# Project/data_validation/missing_uprns.csv", index=False)
# non_numeric_uprns.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting
# Project/data_validation/non_numeric_uprns.csv", index=False)
# leading_zero_uprns.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting
# Project/data_validation/leading_zero_uprns.csv", index=False)
# duplicate_uprns.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting
# Project/data_validation/duplicate_uprns.csv", index=False)
# Take everything remaining
data_needing_validation = sustainability_data[
~sustainability_data["Org Ref"].isin(
missing_uprns["Org Ref"].values.tolist() + non_numeric_uprns["Org Ref"].values.tolist() +
leading_zero_uprns["Org Ref"].values.tolist() + duplicate_uprns["Org Ref"].values.tolist()
)
].copy()
# TODO: We should build a SAL for UPRNS that are missing, invalid or duplicated
# We check UPRN validity against our OS data
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="spatial/filename_meta.parquet"
)
# We're going to:
# 1) Grab a filename
# 2) Read it in
# 3) Check which UPRNS from our data are in that file
# 4) Keep a record of which UPRNS were found where
for uprn_file in tqdm(uprn_filenames['filenames'].values, total=len(uprn_filenames)):
spatial_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key=f"spatial/{uprn_file}"
)
uprns_in_file = data_needing_validation[
data_needing_validation['UPRN'].astype('Int64').isin(spatial_data['UPRN'].astype('Int64').values)
].copy()
print("Found {} UPRNS in file {}".format(len(uprns_in_file), uprn_file))
if len(uprns_in_file) > 0:
# Store the found UPRNS in the validation cache
data_to_store = uprns_in_file[["Org Ref", "UPRN"]].copy()
data_to_store["Source File"] = uprn_file
# Store
data_to_store.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
f"Project/data_validation/validation_cache/{uprn_file.split('.parquet')[0]}_found_uprns.csv",
index=False
)
# Get all of the files:
storage_locations = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation/validation_cache")
# List contents
folder_contents = os.listdir(storage_locations)
# Grab files and concatenate
all_found_uprns = []
for file in folder_contents:
if file.endswith("_found_uprns.csv"):
df = pd.read_csv(os.path.join(storage_locations, file))
all_found_uprns.append(df)
all_found_uprns = pd.concat(all_found_uprns)
# We now flag any UPRNS that were not found in any of the OS datasets
os_missed_uprns = data_needing_validation[
~data_needing_validation['Org Ref'].isin(all_found_uprns['Org Ref'].values.tolist())
].copy()
# store
os_missed_uprns.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation/os_missed_uprns.csv",
index=False
)
# Now build a larger table for standardisation
to_standardised = pd.concat(
[missing_uprns, non_numeric_uprns, leading_zero_uprns, duplicate_uprns, os_missed_uprns]
)
to_standardised.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation/to_standardise_uprns.xlsx",
index=False)
# We prepare a finalised dataset to work with, that excludes all problematic properties and leaves us with
# properties for which we have the data we need
finalised_data = sustainability_data[
~sustainability_data["Org Ref"].isin(
to_standardised["Org Ref"].values.tolist()
)
].copy()
# Prepare with the column formats we need, as analogous to a_data_prep where we defined an initial working sample

View file

@ -0,0 +1,114 @@
import pandas as pd
# import pandas as pd
#
# sal = pd.read_excel(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
# "Project/data_validation/to_standardise_uprns - Standardised.xlsx",
# sheet_name="Standardised Asset List"
# )
#
# # Quick breadown of missingness
# missing = sal[
# pd.isnull(sal["estimated"]) | (sal["estimated"] == True) | pd.isnull(sal["epc_os_uprn"])
# ]
#
# fetched = sal[(sal["estimated"] == False) | ~pd.isnull(sal["epc_os_uprn"])].copy()
# fetched = fetched[
# ["landlord_property_id", "domna_address_1", "domna_postcode", "domna_full_address", "epc_address1",
# "epc_postcode", "epc_address", "landlord_property_type", "epc_property_type"]
# ]
#
# known_issues = [
#
# ]
#
# # Missed postcodes
# missed_postcode_agg = missing.groupby("domna_postcode").size().reset_index(name="count")
# missed_postcode_agg = missed_postcode_agg.sort_values("count", ascending=False)
#
# multi_missed_postcode = missed_postcode_agg[missed_postcode_agg["count"] > 1]
### Prepare
sustainability_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)
# Data we want to remove:
missing_uprns = sustainability_data[pd.isnull(sustainability_data['UPRN'])].copy()
# Any non-numeric UPRNS or leading with 0s are invalid
non_numeric_uprns = sustainability_data[
~sustainability_data['UPRN'].astype(str).str.match(r'^[1-9][0-9]*$') & ~pd.isnull(sustainability_data['UPRN'])
].copy()
# 70 properties
leading_zero_uprns = sustainability_data[
sustainability_data['UPRN'].astype(str).str.startswith('0')
].copy()
# Flag duplicates
duplicate_uprns = sustainability_data[
sustainability_data.duplicated(subset=['UPRN'], keep=False) & ~pd.isnull(sustainability_data['UPRN'])
].copy()
# Read in the UPRNs that were not valid based on the OS data
os_missed_uprns = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation/os_missed_uprns.csv",
)
modelling_data = sustainability_data[
~sustainability_data["Org Ref"].isin(
missing_uprns["Org Ref"].unique().tolist() + non_numeric_uprns["Org Ref"].unique().tolist() +
leading_zero_uprns["Org Ref"].unique().tolist() + duplicate_uprns["Org Ref"].unique().tolist() +
os_missed_uprns["Org Ref"].unique().tolist()
)
].copy()
# Need to prepare for upload
# Variables:
modelling_data["landlord_property_id"] = sustainability_data["Org Ref"].copy()
modelling_data["domna_property_id"] = sustainability_data["Org Ref"].copy()
modelling_data = modelling_data.rename(
{
"Address 1": "domna_address_1",
"Postcode": "postcode",
"Type": "landlord_property_type",
"Attachment": "landlord_built_form",
"Heating": "landlord_heating_system",
"UPRN": "epc_os_uprn"
}
)
modelling_data = modelling_data[
[
"domna_address_1", "Address 2", "Address 3", "postcode", "landlord_property_type",
"landlord_built_form", "landlord_heating_system", "epc_os_uprn", "Total Floor Area (m2)",
"domna_property_id", "domna_full_address"
]
]
modelling_data["landlord_built_form"] = modelling_data["landlord_built_form"].map(
{
"MidTerrace": "Mid-Terrace",
"EndTerrace": "End-Terrace",
"SemiDetached": "Semi-Detached",
"Detached": "Detached",
"EnclosedEndTerrace": "Enclosed End-Terrace",
"EnclosedMidTerrace": "Enclosed Mid-Terrace",
}
)
def make_full_address(x):
to_join = [x['domna_address_1'], x['Address 2'], x['Address 3']]
to_join = [x for x in to_join if not pd.isnull(x) and x != '']
return ", ".join(to_join)
modelling_data["domna_full_address"] = modelling_data.apply(lambda x: make_full_address(x), axis=1)

View file

@ -1,6 +0,0 @@
"""
We have found, within the Peabody data, a large volume of properties with missing and incorrects
UPRNS and incorrect address data. We want to flag these records and also find missings where we can
We also have duplicate UPRNS that should be flagged
"""

View file

@ -844,7 +844,7 @@ class TrainingDataset(BaseDataset):
# Make sure they are all efficiency columns
if any(~missings.index.str.contains("energy_eff")):
raise ValueError("Non efficiency columns are missing")
raise ValueError(f"Non efficiency columns are missing {missings.index}")
for m in missings.index:
self.df[m] = self.df[m].fillna("NO_RATING")

View file

@ -15,25 +15,10 @@ os.makedirs(CACHE_DIR, exist_ok=True)
def random_delay():
"""Pause randomly between requests (0.52 s)."""
time.sleep(random.uniform(0.5, 2))
def extract_feature(soup, icon_id):
tag = soup.find("use", href=f"#{icon_id}")
if tag:
parent = tag.find_parent("div", class_="_1pbf8i53")
if parent:
text = parent.get_text(strip=True)
return text
return None
def extract_embedded_json(text):
"""
Extract embedded property JSON containing attributes, energy, estimates, and sales history.
"""
# Try to grab everything after "attributes"
match = re.search(
r'"attributes"\s*:\s*\{.*?\}\s*,.*?"historicSales".*?\]',
text,
@ -48,13 +33,16 @@ def extract_embedded_json(text):
except json.JSONDecodeError:
pass
# fallback for independent keys
result = {}
for key in [
"attributes", "energy", "rentEstimate",
"saleEstimate", "saleHistory", "historicSales"
]:
key_match = re.search(rf'"{key}"\s*:\s*(\{{.*?\}}|\[.*?\])', text, re.DOTALL)
key_match = re.search(
rf'"{key}"\s*:\s*(\{{.*?\}}|\[.*?\])',
text,
re.DOTALL
)
if key_match:
try:
result[key] = json.loads(key_match.group(1))
@ -64,28 +52,23 @@ def extract_embedded_json(text):
def scrape_all_estimates(session, url):
"""Scrape valuation estimates for one Zoopla property URL."""
resp = session.get(url, impersonate=random.choice(ENGINES))
html = resp.text
page_source = BeautifulSoup(resp.text, "html.parser")
estimates = page_source.find_all("div", {"data-testid": "sale-estimate"})
soup = BeautifulSoup(html, "html.parser")
estimates = soup.find_all("div", {"data-testid": "sale-estimate"})
data = extract_embedded_json(html)
is_blocked = len(estimates) == 0
return {
"estimates": estimates,
"is_blocked": is_blocked,
"is_blocked": len(estimates) == 0,
"response_html": html,
"attributes": data.get("attributes"),
"rent": data.get("rentEstimate"),
"historicSales": data.get("historicSales"),
"attributes": data.get("attributes", {}),
"rentEstimate": data.get("rentEstimate", {}),
"historicSales": data.get("historicSales", []),
}
def extract_estimates(estimates):
"""Extract low, mid, and high estimates from parsed HTML."""
est = estimates[0]
low = est.find("span", {"data-testid": "low-estimate-blurred"}).text
mid = est.find("p", {"data-testid": "estimate-blurred"}).text
@ -94,110 +77,123 @@ def extract_estimates(estimates):
def cache_path_for_url(url):
"""Return a deterministic local cache path for a URL."""
uprn = url.split("/")[-2]
return os.path.join(CACHE_DIR, f"{uprn}.html")
def parse_cached_html(url, html):
soup = BeautifulSoup(html, "html.parser")
estimates = soup.find_all("div", {"data-testid": "sale-estimate"})
data = extract_embedded_json(html)
history = data.get("historicSales") or [{}]
if not estimates:
return None
low, mid, high = extract_estimates(estimates)
return {
"URL": url,
"Low Estimate": low,
"Middle Estimate": mid,
"High Estimate": high,
**data.get("attributes", {}),
**data.get("rentEstimate", {}),
**history[0],
}
def parallel_task(url):
"""Main worker function executed in each process."""
cache_path = cache_path_for_url(url)
# Use cached file if it exists
if os.path.exists(cache_path):
html = open(cache_path, "r").read()
page_source = BeautifulSoup(html, "html.parser")
estimates = page_source.find_all("div", {"data-testid": "sale-estimate"})
data = extract_embedded_json(html)
history_sales = data.get("historicSales", [{}])
if len(history_sales) == 0:
history_sales = [{}]
with open(cache_path, "r", encoding="utf-8") as f:
html = f.read()
cached = parse_cached_html(url, html)
if cached:
return cached
if estimates:
low, mid, high = extract_estimates(estimates)
return {
"URL": url, "Low Estimate": low, "Middle Estimate": mid, "High Estimate": high,
**data.get("attributes", {}), **data.get("rentEstimate", {}),
**history_sales[0]
}
# Otherwise scrape live
with StealthSession() as session:
attempts = 0
while attempts < 5:
for attempt in range(5):
output = scrape_all_estimates(session, url)
if not output["is_blocked"] and output["estimates"]:
open(cache_path, "w").write(output["html"])
html = output.get("response_html")
if html:
with open(cache_path, "w", encoding="utf-8") as f:
f.write(html)
history = output.get("historicSales") or [{}]
low, mid, high = extract_estimates(output["estimates"])
history_sales = output.get("historicSales", [{}])
if len(history_sales) == 0:
history_sales = [{}]
return {
"URL": url, "Low Estimate": low, "Middle Estimate": mid, "High Estimate": high,
"URL": url,
"Low Estimate": low,
"Middle Estimate": mid,
"High Estimate": high,
**output.get("attributes", {}),
**output.get("rent", {}),
**history_sales[0]
**output.get("rentEstimate", {}),
**history[0],
}
attempts += 1
print(f"[Attempt {attempts}] Blocked or empty for {url}")
random_delay()
# If still blocked, return placeholders
return {"URL": url, "Low Estimate": None, "Middle Estimate": None, "High Estimate": None}
return {
"URL": url,
"Low Estimate": None,
"Middle Estimate": None,
"High Estimate": None,
}
def parse_price(p):
if p is None:
if not p:
return None
p = p.replace("£", "").strip().lower()
if not p:
return None
if p.endswith("k"):
return float(p[:-1]) * 1_000
elif p.endswith("m"):
if p.endswith("m"):
return float(p[:-1]) * 1_000_000
else:
try:
return float(p.replace(",", ""))
except ValueError:
return None
try:
return float(p.replace(",", ""))
except ValueError:
return None
if __name__ == "__main__":
# Load portfolio
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/October 2025 AL portfolio/22.10 AL Portfolio - "
"Standardised - partial UPRN fill.xlsx",
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/modelling_sample.xlsx",
sheet_name="Standardised Asset List"
)
asset_list = asset_list[~pd.isnull(asset_list["epc_os_uprn"])]
asset_list = asset_list.drop_duplicates("epc_os_uprn")
asset_list["epc_os_uprn"] = asset_list["epc_os_uprn"].astype(int).astype(str)
uprns = asset_list["epc_os_uprn"].tolist()
urls = [f"https://www.zoopla.co.uk/property/uprn/{uprn}/" for uprn in uprns]
# Limit concurrency to avoid blocks
with Pool(processes=2) as pool: # fewer processes = fewer fingerprints
with Pool(processes=2) as pool:
estimates_list = list(
tqdm(pool.imap(parallel_task, urls), total=len(urls))
)
df = pd.DataFrame(estimates_list)
print(df.head())
df["uprn"] = df["URL"].str.extract(r"uprn/(\d+)/")
df["valuation"] = df["Middle Estimate"].apply(parse_price)
df.to_csv("zoopla_estimates.csv", index=False)
# Merge with asset list
merged = asset_list.merge(
df[["uprn", "valuation"]],
left_on="epc_os_uprn",
right_on="uprn",
how="left"
)
merged.to_excel(
"20251029 AL Portfolio - Standardised - with valuations.xlsx",
index=False

View file

@ -11,8 +11,8 @@ from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcMod
# PORTFOLIO_ID = 206
# SCENARIOS = [389]
PORTFOLIO_ID = 388
SCENARIOS = [803]
PORTFOLIO_ID = 404
SCENARIOS = [829]
def get_data(portfolio_id, scenario_ids):
@ -121,7 +121,8 @@ recommendations_measures_pivot["total_retrofit_cost"] = recommendations_measures
df = properties_df[
[
"property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows",
"landlord_property_id", "property_id", "uprn", "address", "postcode", "property_type", "walls", "roof",
"heating", "windows",
"current_epc_rating",
"current_sap_points", "total_floor_area", "number_of_rooms",
]
@ -143,7 +144,7 @@ from utils.s3 import read_csv_from_s3, read_excel_from_s3
# asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv')
asset_list = read_excel_from_s3(
bucket_name="retrofit-plan-inputs-dev", file_key='2/388/20251208T203603925Z/asset_list.xlsx',
bucket_name="retrofit-plan-inputs-dev", file_key="2/404/20251211T163200754Z/asset_list.xlsx",
header_row=0, sheet_name="Standardised Asset List"
)
asset_list = pd.DataFrame(asset_list)
@ -201,11 +202,15 @@ asset_list = asset_list.merge(
)
# For exporting
asset_list.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/20251209_sample_package_data.xlsx",
df.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lincs Rural/EPC C -without floors proposed measures - "
"with ID.xlsx",
index=False
)
# asset_list.to_excel(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lincs Rural/epc_measures.xlsx",
# index=False
# )
condition_costs = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx",