preparing programme for mhs

This commit is contained in:
Khalim Conn-Kowlessar 2025-05-12 15:58:41 +01:00
parent 5848cb5314
commit f1b9ee2920
11 changed files with 1063 additions and 184 deletions

View file

@ -693,6 +693,9 @@ class AssetList:
c for c in self.OLD_FORMAT_NON_INTRUSIVE_COLNAMES if c in self.standardised_asset_list.columns
]
if "Warmfront Finding" in self.standardised_asset_list.columns:
non_intrusive_columns.append("Warmfront Finding")
self.keep_variables += non_intrusive_columns
self.rename_map = {
@ -931,7 +934,10 @@ class AssetList:
raise ValueError(f"Dataframe must contain the column {self.DOMNA_PROPERTY_ID}")
if df[self.DOMNA_PROPERTY_ID].duplicated().sum():
raise ValueError(f"{self.DOMNA_PROPERTY_ID} contains duplicated IDs")
df = df.drop_duplicates(
subset=[self.DOMNA_PROPERTY_ID],
keep="first"
)
self.standardised_asset_list = self.standardised_asset_list.merge(
df, how="left", on=self.DOMNA_PROPERTY_ID
@ -1260,7 +1266,7 @@ class AssetList:
)
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] = (
pd.isnull(self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]) &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter &
year_built_filter &
@ -1272,23 +1278,35 @@ class AssetList:
# We also add a filter on anything that was generally identified by the non-intrusives
self.standardised_asset_list["non_intrusive_indicates_empty_cavity_no_year_filter"] = (
pd.isnull(self.standardised_asset_list["non_intrusive_indicates_empty_cavity"]) &
pd.isnull(self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"]) &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity"] &
~self.standardised_asset_list["non_intrusive_indicates_empty_cavity_has_solar"] &
(~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])) &
non_intrusives_wall_filter
)
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
if (not self.non_intrusives_eligibility) and (not self.old_format_non_intrusives_present):
# If we have NO inspections data, we capture all of the wall types and don't filter on age of the EPC
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
else:
self.standardised_asset_list["epc_indicates_empty_cavity"] = (
self.standardised_asset_list[self.EPC_API_DATA_NAMES["walls-description"]].str.lower().isin(
self.EPC_NO_WALL_INSULATION_DESCRIPTIONS
) & (
self.standardised_asset_list["epc_year_upper_bound"] <= self.EMPTY_CAVITY_YEAR_THRESHOLD
) & (
~self.standardised_asset_list[self.ATTRIBUTE_EPC_PRE_YEAR_THRESHOLD]
) & (
~self.standardised_asset_list[self.STANDARD_PROPERTY_TYPE].isin(["bedsit"])
)
)
)
self.standardised_asset_list["landlord_data_indicates_empty_cavity"] = (
self.standardised_asset_list[self.STANDARD_WALL_CONSTRUCTION].isin(["uninsulated cavity"]) &
@ -1336,6 +1354,9 @@ class AssetList:
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter & year_built_filter
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = (
extraction_wall_filter & ~year_built_filter
)
elif self.old_format_non_intrusives_present:
print("Review these categories!!!!")
@ -1349,10 +1370,11 @@ class AssetList:
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = (
extraction_wall_filter
)
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
else:
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction"] = False
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_sap_filter"] = False
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] = False
######################################################
# Solar
@ -1480,7 +1502,7 @@ class AssetList:
)
# We merge on the u-value for average thermal transmittance
roof_roof_data = pd.DataFrame(cleaned["roof-description"])[
roof_data = pd.DataFrame(cleaned["roof-description"])[
["original_description", "thermal_transmittance", "is_pitched", "is_loft"]
].rename(
columns={
@ -1490,7 +1512,7 @@ class AssetList:
)
self.standardised_asset_list = self.standardised_asset_list.merge(
roof_roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
roof_data, how="left", on=self.EPC_API_DATA_NAMES["roof-description"]
)
# If the u-value of a roof is less than 0.7 we consider it insulated
@ -1749,6 +1771,16 @@ class AssetList:
self.standardised_asset_list["cavity_reason"]
)
self.standardised_asset_list["cavity_reason"] = np.where(
(
self.standardised_asset_list["non_intrusive_indicates_cavity_extraction_no_year_filter"] &
pd.isnull(self.standardised_asset_list["cavity_reason"])
),
f"Non-Intrusive Data Shows Cavity Extraction, built after {self.EMPTY_CAVITY_YEAR_THRESHOLD}: " +
self.standardised_asset_list["SAP Category"],
self.standardised_asset_list["cavity_reason"]
)
######################################################
# Flag solar
######################################################
@ -1771,6 +1803,16 @@ class AssetList:
self.standardised_asset_list["solar_reason"]
)
# Finally, anything flagged for solar should not be flagged for cavity - make them None
self.standardised_asset_list["cavity_reason"] = np.where(
(
~pd.isnull(self.standardised_asset_list["solar_reason"]) &
~pd.isnull(self.standardised_asset_list["cavity_reason"])
),
None,
self.standardised_asset_list["cavity_reason"]
)
# Flag anything that has existing outcomes
if (self.outcomes is not None) and ("surveyed" in self.standardised_asset_list.columns):
@ -2170,7 +2212,7 @@ class AssetList:
self.hubspot_data = programme_data
def flag_ecosurv(self, ecosurv_landlords=None):
def flag_ecosurv(self, ecosurv_landlords=None, landlords_to_ignore=None):
"""
This class will match ecosurv data to the asset list
@ -2193,6 +2235,11 @@ class AssetList:
self.ecosurv["Landlord"].isin(landlord_references["Landlord"].values)
]
if landlords_to_ignore is not None:
landlord_ecosurv_data = landlord_ecosurv_data[
~landlord_ecosurv_data["Landlord"].isin(landlords_to_ignore)
]
# Try and match to asset list
matched = []
unmatched = []
@ -2254,6 +2301,11 @@ class AssetList:
# We now match
matched = pd.DataFrame(matched)
# We'll possibly have duplicates here, where properties have been sold twice. Ww de-dupe
if matched[self.STANDARD_LANDLORD_PROPERTY_ID].duplicated().sum():
# It doesn't matter too much which record we take
matched = matched.drop_duplicates(subset=[self.STANDARD_LANDLORD_PROPERTY_ID])
self.standardised_asset_list = self.standardised_asset_list.merge(
matched,
how="left",
@ -2407,7 +2459,7 @@ class AssetList:
self.outcomes.append(outcomes)
lookup = pd.concat(lookup)
outcomes_no_match = pd.concat(outcomes_no_match)
self.outcomes_no_match = pd.concat(outcomes_no_match)
self.outcomes = pd.concat(self.outcomes)
if lookup.empty:
@ -2425,6 +2477,8 @@ class AssetList:
date_col = "Survey Date"
elif "Date letters sent" in self.outcomes.columns:
date_col = "Date letters sent"
elif "Date Letter sent" in self.outcomes.columns:
date_col = "Date Letter sent"
else:
raise NotImplementedError("Invalid date in outcomes - implement me")
@ -2564,8 +2618,18 @@ class AssetList:
axis=1
)
scheme_col = (
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" if
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns else "AFFORDABLE WARMTH"
)
postcode_col = "POSTCODE" if "POSTCODE" in master_data.columns else "Post Code"
house_no_col = 'NO.' if 'NO.' in master_data.columns else "NO"
property_type_col = (
"PROPERTY TYPE As per table emailed" if
"PROPERTY TYPE As per table emailed" in
master_data.columns else "PROPERTY TYPE As per table emailed"
)
measure_mix_col = "MEASURE COMBO"
# Otherwise, we need to match algorithmically
has_property_id = "UPRN" in master_data.columns
@ -2574,6 +2638,10 @@ class AssetList:
unmatched = []
for _, row in tqdm(master_data.iterrows(), total=len(master_data)):
original_house_no = row[house_no_col]
original_street = row["Street / Block Name"]
original_postcode = row[postcode_col]
if pd.isnull(row[postcode_col]):
continue
@ -2595,9 +2663,40 @@ class AssetList:
]
house_no = row[house_no_col]
if isinstance(house_no, float):
if isinstance(house_no, (float, int)):
house_no = str(int(house_no))
if house_no not in df["house_no"].values:
# Handle postcode errors
postal_region = row[postcode_col].split(" ")[0].lower()
df = self.standardised_asset_list[
(
self.standardised_asset_list[self.STANDARD_POSTCODE]
.str.strip().str.lower().str.startswith(postal_region)
)
]
if house_no not in df["house_no"].values:
unmatched.append(row["row_id"])
continue
df = df[df["house_no"] == house_no]
if df.shape[0] > 1:
df = df[
df[self.STANDARD_FULL_ADDRESS].str.lower().str.contains(row["Street / Block Name"].lower())
]
if df.shape[0] == 0:
unmatched.append(row["row_id"])
continue
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
if house_no in df["house_no"].values:
df = df[df["house_no"] == house_no]
if df.shape[0] != 1:
@ -2632,14 +2731,12 @@ class AssetList:
]
if any(
df[self.STANDARD_PROPERTY_TYPE].str.contains(
row["PROPERTY TYPE As per table emailed"].split(" ")[-1].lower()
)
df[self.STANDARD_PROPERTY_TYPE].str.contains(row[property_type_col].split(" ")[-1].lower())
):
# We ignore "block of flats" entries
df = df[
df[self.STANDARD_PROPERTY_TYPE].str.contains(
row["PROPERTY TYPE As per table emailed"].split(" ")[-1].lower()
row[property_type_col].split(" ")[-1].lower()
) & (df[self.STANDARD_PROPERTY_TYPE] != "block of flats")
]
@ -2649,6 +2746,9 @@ class AssetList:
matched.append(
{
"row_id": row["row_id"],
"original_house_no": original_house_no,
"original_street": original_street,
"original_postcode": original_postcode,
self.STANDARD_LANDLORD_PROPERTY_ID: df[self.STANDARD_LANDLORD_PROPERTY_ID].values[0],
}
)
@ -2657,10 +2757,12 @@ class AssetList:
# We match the "UPRN" which is the landlords ID, onto the master sheet
matched = pd.DataFrame(matched)
master_to_append = master_data[["row_id", install_col, submission_col]].merge(
master_to_append = master_data[[scheme_col, "row_id", install_col, submission_col, measure_mix_col]].merge(
matched, how="left", on="row_id"
).rename(
columns={
scheme_col: "funding_scheme",
measure_mix_col: "measure_mix",
install_col: "survey_status",
submission_col: "submission_date"
}
@ -2671,10 +2773,6 @@ class AssetList:
master_data["row_id"].isin(unmatched)
]
scheme_col = (
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" if
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in master_data.columns else "AFFORDABLE WARMTH"
)
# The columns are massively different - we take just a few
unmatched_df = unmatched_df[
[

View file

@ -62,10 +62,42 @@ def app():
Property UPRN
"""
# Community Housing new list
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/Programme "
"Reconciliation")
data_filename = "SUB EPC C to Domna.xlsx"
sheet_name = "Sheet1"
postcode_column = 'POSTCODE'
fulladdress_column = None
address1_column = "ADDRESS"
address1_method = None
address_cols_to_concat = ["ADDRESS", "ESTATE", "TOWN"]
missing_postcodes_method = None
landlord_year_built = "BUILD DATE"
landlord_os_uprn = None
landlord_property_type = "PROPERTY TYPE"
landlord_built_form = "PROPERTY TYPE"
landlord_wall_construction = "CONSTRUCTION TYPE"
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "UPRN"
landlord_sap = None
outcomes_filename = []
outcomes_sheetname = []
outcomes_postcode = []
outcomes_houseno = []
outcomes_id = []
outcomes_address = []
master_filepaths = []
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = None
# Unitas
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Unitas"
data_filename = "UNITAS - Asset List.xlsx"
sheet_name = "Asset List"
data_filename = "unitas_asset_list_for_analysis.xlsx"
sheet_name = "Sheet1"
postcode_column = 'Post Code'
fulladdress_column = "Address Line 1"
address1_column = "Address Line 1"
@ -611,7 +643,7 @@ def app():
epc_api_only = False
force_retrieve_data = False
skip = None # Used to skip already completed chunks
chunk_size = 1000
chunk_size = 5000
filename = "Chunk {i}.csv"
download_folder = os.path.join(data_folder, "Chunks")
if not os.path.exists(download_folder):

View file

@ -250,5 +250,20 @@ BUILT_FORM_MAPPINGS = {
'House Homeless Unit': 'unknown',
'Flat ELEVENTH FLOOR': 'mid-floor',
'Flat TENTH FLOOR': 'mid-floor',
'House. MT': 'mid-terrace'
'House. MT': 'mid-terrace',
'Ground Floor Bedsit': 'ground floor',
'Mid Terrace With Passage': 'mid-terrace',
'End Of Terrace': 'end-terrace',
'Ground Floor Maisonette': 'ground floor',
'First Floor Bedsit': 'mid-floor',
'GROUND FLOOR BEDSIT': 'ground floor',
'GROUND FLOOR FLAT': 'ground floor',
'BUNGALOW': 'unknown',
'HOUSE 1 LIVING ROOM': 'unknown',
'MAISONETTE OVER SHOP': 'unknown',
'SECOND FLOOR FLAT': 'mid-floor',
'FIRST FLOOR FLAT': 'ground floor',
'GROUND FL MAISONETTE': 'ground floor',
'HOUSE 2 LIVING ROOMS': 'unknown',
'FLAT OVER SHOP': 'unknown'
}

View file

@ -5,7 +5,7 @@ STANDARD_HEATING_SYSTEMS = {
"gas boiler, radiators",
"electric storage heaters",
"district heating",
"communal heating"
"communal heating",
"gas condensing boiler",
"oil boiler",
"gas condensing combi",
@ -32,7 +32,7 @@ STANDARD_HEATING_SYSTEMS = {
HEATING_MAPPINGS = {
"Combi - GAS": "gas combi boiler",
"E7 Storage Heaters": "high heat retention storage heaters",
"E7 Storage Heaters": "electric storage heaters",
"District heating system": "district heating",
"Condensing Boiler - GAS": "gas condensing boiler",
"Boiler Oil/other": "oil boiler",
@ -50,7 +50,7 @@ HEATING_MAPPINGS = {
"Gas fire": "other",
"Backboiler - Solid fuel": "other",
'combi - gas': 'gas combi boiler',
'e7 storage heaters': 'high heat retention storage heaters',
'e7 storage heaters': 'electric storage heaters',
'district heating system': 'district heating',
'condensing boiler - gas': 'gas condensing boiler',
'boiler oil/other': 'oil boiler',
@ -275,6 +275,18 @@ HEATING_MAPPINGS = {
'POTTERTON': 'gas combi boiler',
'BAXI SOLO': 'gas combi boiler',
'BAXI BERMUDA': 'gas combi boiler',
'BAXI': 'gas combi boiler'
'BAXI': 'gas combi boiler',
'Combi Boiler': 'gas combi boiler',
'Air Source Heat Pump': 'air source heat pump',
'Dual Fuel': 'other',
'Regular Boiler': 'gas condensing boiler',
'No Main Heating': 'no heating',
'None (via Communal System)': 'communal heating',
'No Mains Heating': 'no heating',
'Open Fire with Back Boiler': 'solid fuel',
'No Gas Boiler': 'no heating',
'Back Boiler': 'solid fuel',
"This cell has an external reference that can't be shown or edited. Editing this cell will remove the external "
"reference.": 'unknown'
}

View file

@ -218,6 +218,15 @@ PROPERTY_MAPPING = {
'Bungalow MT': 'bungalow',
'House MT': 'house',
'House. MT': 'house',
'': 'unknown'
'': 'unknown',
'GROUND FLOOR BEDSIT': 'bedsit',
'HOUSE 1 LIVING ROOM': 'house',
'MAISONETTE OVER SHOP': 'maisonette',
'GROUND FLOOR FLAT': 'flat',
'SECOND FLOOR FLAT': 'flat',
'FIRST FLOOR FLAT': 'flat',
'GROUND FL MAISONETTE': 'maisonette',
'HOUSE 2 LIVING ROOMS': 'house',
'FLAT OVER SHOP': 'flat'
}

View file

@ -212,12 +212,17 @@ WALL_CONSTRUCTION_MAPPINGS = {
'Cornish': 'system built',
'Rwate': 'system built',
'Hill Presweld Steel': 'system built',
'Cavity Filled Cavity': 'filled cavity',
'Cavity Unknown': 'cavity unknown insulation',
'Cavity Filled Cavity (internal)': 'filled cavity',
'': 'unknown',
'Cavity Internal Insulation': 'filled cavity',
'Cavity As Built': "uninsulated cavity"
'Cavity As Built': "uninsulated cavity",
'Non Trad Large Panel System': 'system built',
'Non Trad Cornish': 'system built',
'Non Trad Reema': 'system built',
'Traditional Cavity Brickwork': 'cavity unknown insulation',
'System build (undefined)': 'system built',
'Non Trad Wimpey': 'system built',
'Non Trad Wates': 'system built'
}

View file

@ -0,0 +1,708 @@
"""
This script is used to reconcile the data from the Community Housing project, to understand the differences in
the various asset lists, and the work that has been conducted
"""
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from asset_list.AssetList import AssetList
from backend.SearchEpc import SearchEpc
# Data preparation
outcomes_1 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/Programme Reconciliation/Outcomes "
"Community Housing.xlsx",
sheet_name="Sheet1",
)
outcomes_2 = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/Programme Reconciliation/Outcomes "
"Community Housing.xlsx",
sheet_name="ECO4 + PV",
)
outcomes_2["Type of Funding"] = "ECO4 Solar"
combined_outcomes = pd.concat([outcomes_1, outcomes_2], ignore_index=True)
combined_outcomes.columns = [
'Surveyor', 'Housing Association', 'No.', 'Address', 'Postcode', 'Outcome', 'Type of Funding', "Notes",
'Previous letter sent Date:', 'Date Letter sent', 'Installer'
]
# Store
combined_outcomes.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/Programme "
"Reconciliation/combined_outcomes.xlsx",
)
################################################################################################
# Config for asset list standardisation
################################################################################################
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/Programme Reconciliation"
data_filename = "Community Housing - Original Asset List Copy for Reconciliation.xlsx"
sheet_name = "Assets"
postcode_column = 'Postcode'
fulladdress_column = "Full Address"
address1_column = None
address1_method = "house_number_extraction"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = "Build_Date"
landlord_os_uprn = None
landlord_property_type = "Asset_Type1"
landlord_built_form = "Asset_Classification"
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = "Heat Source Static"
landlord_existing_pv = None
landlord_property_id = "Asset_Reference"
landlord_sap = None
outcomes_filename = [
os.path.join(data_folder, "combined_outcomes.xlsx"),
]
outcomes_sheetname = ["Sheet1"]
outcomes_postcode = ["Postcode"]
outcomes_houseno = ["No."]
outcomes_id = [None]
outcomes_address = ["Address"]
master_filepaths = [
os.path.join(data_folder, "Submissions - for analysis.csv"),
]
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = "community community|community housing|mr community|david lindwood"
manual_uprn_map = {}
asset_list = AssetList(
local_filepath=os.path.join(data_folder, data_filename),
header=0,
sheet_name=sheet_name,
address1_colname=address1_column,
postcode_colname=postcode_column,
landlord_property_id=landlord_property_id,
full_address_colname=fulladdress_column,
full_address_cols_to_concat=address_cols_to_concat,
missing_postcodes_method=missing_postcodes_method,
address1_extraction_method=address1_method,
landlord_year_built=landlord_year_built,
landlord_uprn=landlord_os_uprn,
landlord_property_type=landlord_property_type,
landlord_built_form=landlord_built_form,
landlord_wall_construction=landlord_wall_construction,
landlord_roof_construction=landlord_roof_construction,
landlord_heating_system=landlord_heating_system,
landlord_existing_pv=landlord_existing_pv,
landlord_sap=landlord_sap,
phase=phase
)
asset_list.init_standardise()
asset_list.apply_standardiation()
# We now flag properties that have been treated under existing programmes
asset_list.flag_outcomes(
outcomes_filepaths=outcomes_filename,
outcomes_sheetname=outcomes_sheetname,
outcomes_address=outcomes_address,
outcomes_postcode=outcomes_postcode,
outcomes_houseno=outcomes_houseno,
outcomes_id=outcomes_id
)
if pd.isnull(asset_list.outcomes["domna_property_id"]).sum() == 1:
# We fix this one manually
asset_list.outcomes["domna_property_id"] = asset_list.outcomes["domna_property_id"].fillna(
"29walternashroadeastbirchencoppicekidderminsterdy117ea-caa3a8d92ea9"
)
else:
raise Exception("Something went wrong")
asset_list.flag_survey_master(
master_filepaths=master_filepaths,
master_to_asset_list_filepath=master_to_asset_list_filepath
)
master_surveyed = asset_list.master_surveyed
scheme_map = {
"ECO4 A/W": "ECO4",
'ECO4 GBIS': "GBIS",
'ECO4 - REMEDIAL CWI ONLY': "ECO4 Remedial",
"ECO4 GBIS REMEDIAL": "GBIS Remedial",
'ECO4 - Remedial CWI Only': "ECO4 Remedial",
'ECO4 GBIS Remedial': "GBIS Remedial"
}
master_surveyed["funding_scheme"] = master_surveyed["funding_scheme"].map(scheme_map)
master_surveyed["survey_reference"] = master_surveyed["funding_scheme"] + ": " + master_surveyed["measure_mix"]
master_surveyed = master_surveyed.merge(
asset_list.standardised_asset_list[["domna_property_id", "landlord_property_id"]],
how="left",
on="landlord_property_id",
)
if pd.isnull(master_surveyed["domna_property_id"]).sum():
raise ValueError("Some of the master surveyed properties do not have a domna_property_id")
# Flag anything in outcomes that has been listed as surveyed, that is NOT in the master_surveyed sheet
surveyed_outcomes = asset_list.outcomes[
asset_list.outcomes["Outcome"].isin(["surveyed", "surveyed"])
]
outcomes_not_in_master = surveyed_outcomes[
~surveyed_outcomes["domna_property_id"].isin(master_surveyed["domna_property_id"])
]
outcomes_not_in_master["Type of Funding"] = outcomes_not_in_master["Type of Funding"].fillna("Work Type Not Filled In")
asset_list.flag_ecosurv(
ecosurv_landlords=ecosurv_landlords,
landlords_to_ignore=[
"Watford Community housing", "Eastlight Community housing", "Mr Tower Hamlets Community Housing"
]
)
# These are properties NOT on the Community Housing asset list that were sold under the wrong HA
# asset_list.ecosurv_no_match.to_csv(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing/Programme "
# "Reconciliation/Ecosurv - properties sold to Community Housing, not belonging to them.csv",
# index=False
# )
# We read in the works, split by sold to SGEC and on-hold
billed_to_installer = pd.read_csv(
os.path.join(
data_folder, "Community Housing Deck of works", "SGEC BILLED -Table 1.csv"
),
)
billed_to_installer["billed"] = True
not_billed_to_installer = pd.read_csv(
os.path.join(
data_folder, "Community Housing Deck of works", "ON HOLD -Table 1.csv"
),
)
not_billed_to_installer["billed"] = False
sgec_billings = pd.concat(
[billed_to_installer, not_billed_to_installer],
)
sgec_billings = sgec_billings.reset_index(drop=True)
sgec_billings["row_id"] = sgec_billings.index
# We match these two lists back to the domna_property_id. They SHOULD match to submissions
scheme_col = (
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" if
"AFFORDABLE WARMTH OR EPC FOR HOUSING ASSOCIATION" in billed_to_installer.columns else "AFFORDABLE WARMTH"
)
postcode_col = "POSTCODE" if "POSTCODE" in billed_to_installer.columns else "Post Code"
house_no_col = 'NO.' if 'NO.' in billed_to_installer.columns else "NO"
property_type_col = (
"PROPERTY TYPE As per table emailed" if
"PROPERTY TYPE As per table emailed" in
billed_to_installer.columns else "PROPERTY TYPE As per table emailed"
)
measure_mix_col = "MEASURE COMBO"
manual_corrections = {
"30+DY12 1EB": "73440300",
"32+DY12 1EB": "73440320",
"1+DY11 7ES": "20150010",
"12+DY11 7EP": "9460120",
"72+DY11 7PA": "88520720",
"39+DY13 0DR": "44250390",
"43+DY11 7EF": "2460430",
"45+DY11 7EG": "2460450",
"47+DY11 7EG": "2460470",
"49+DY11 7EG": "2460490",
"11+DY13 0HB": "87320110",
"4+DY130HA": "87320040"
}
billed_lookup = []
for _, row in tqdm(sgec_billings.iterrows(), total=len(sgec_billings)):
postcode = row["Post Code"]
houseno = row["NO."]
# We need to correct some records
if manual_corrections.get("+".join([houseno, postcode])):
landlord_pid = manual_corrections["+".join([houseno, postcode])]
df = asset_list.standardised_asset_list[
(asset_list.standardised_asset_list["landlord_property_id"] == landlord_pid)
]
if df.shape[0] != 1:
raise ValueError(f"More than one match found for {landlord_pid} in the standardised asset list")
billed_lookup.append(
{
"domna_property_id": df["domna_property_id"].values[0],
"row_id": row["row_id"],
}
)
continue
df = master_surveyed[
(master_surveyed["original_house_no"] == houseno) &
(master_surveyed["original_postcode"] == postcode)
]
if df.shape[0] != 1:
# Try a search on the asset list
postcode_no_space = row[postcode_col].strip().replace(" ", "").lower()
df = asset_list.standardised_asset_list[
(
asset_list.standardised_asset_list[asset_list.STANDARD_POSTCODE]
.str.strip().str.lower().str.replace(" ", "") == postcode_no_space
)
].copy()
house_no = row[house_no_col]
if isinstance(house_no, float):
house_no = str(int(house_no)).lower()
else:
house_no = str(house_no).lower()
df["house_no"] = df.apply(
lambda x: SearchEpc.get_house_number(
str(x[asset_list.STANDARD_ADDRESS_1]), str(x[asset_list.STANDARD_POSTCODE])
),
axis=1
)
df = df[df["house_no"].str.lower() == house_no].copy()
if df.shape[0] == 1:
billed_lookup.append(
{
"domna_property_id": df["domna_property_id"].values[0],
"row_id": row["row_id"],
}
)
continue
raise ValueError(f"More than one match found for {'+'.join([houseno, postcode])} in the master surveyed list")
billed_lookup.append(
{
"domna_property_id": df["domna_property_id"].values[0],
"row_id": row["row_id"],
}
)
billed_lookup = pd.DataFrame(billed_lookup)
sgec_billings = sgec_billings.merge(
billed_lookup,
how="left",
on="row_id"
)
# We get the asset list that Community Housing thinks they sent Warmfront
master_data_sheet = pd.read_excel(
os.path.join(
data_folder, "Warmfront.xlsx"
),
sheet_name="Asset Stock List (3)",
)
master_data_sheet["Asset_Reference"] = master_data_sheet["Asset_Reference"].astype(str)
# 1) We check that all of the properties in the asset list we have on file are in the asset list that Community Housing
# believe they sent Warmfront
if not asset_list.standardised_asset_list[
~asset_list.standardised_asset_list["landlord_property_id"].isin(
master_data_sheet["Asset_Reference"].astype(str).values
)
].empty:
raise ValueError("Some of the properties in the asset list are not in the Warmfront asset list")
# This column documents whether or not the property is in the asset list that the WFT were sent
# There are 189 properties that were never sent to WFT, but all properties are accounted for in the asset list
master_data_sheet["Is Property in WFT Asset List"] = master_data_sheet["Asset_Reference"].astype(str).isin(
asset_list.standardised_asset_list["landlord_property_id"].astype(str).values
)
# We now merge on the Warmfront findings
master_data_sheet = master_data_sheet.merge(
asset_list.standardised_asset_list[["landlord_property_id", "non-intrusives: ECO Eligibility"]],
how="left",
left_on="Asset_Reference",
right_on="landlord_property_id"
)
master_data_sheet["non-intrusives: ECO Eligibility"] = master_data_sheet["non-intrusives: ECO Eligibility"].fillna(
"Not in original asset list"
)
# SGEC did a number of CIGA checks. We match these onto the master data sheet
# TODO: Need to split the programme into historical 2023 and 2024 (there was a cutoff data in late 2024 which seemed
# to be the start of the new programme
# Seems like there were 2 main checks - it also seems like this was a 2 phase programme, where these CIGA checks
# correspond to phase 2
ciga_checks_1 = pd.read_excel(
os.path.join(
data_folder, "CIGA Checks", "2 CIGA Check WFT 14102024 x1073.xlsx"
),
sheet_name="Worksheet"
)
ciga_checks_1 = ciga_checks_1[~pd.isnull(ciga_checks_1["Postcode"])]
ciga_checks_1["request"] = "1073 properties"
ciga_checks_2 = pd.read_excel(
os.path.join(
data_folder, "CIGA Checks", "2 CIGA Check 01112024 x125.xlsx"
),
sheet_name="Worksheet"
)
ciga_checks_2 = ciga_checks_2[~pd.isnull(ciga_checks_2["Postcode"])]
ciga_checks_2["request"] = "125 flats"
cigas = pd.concat([ciga_checks_1, ciga_checks_2], ignore_index=True)
cigas["row_id"] = cigas.index
# We add some temp columns to allow for easier matching
asset_list.standardised_asset_list["house_no"] = asset_list.standardised_asset_list.apply(
lambda x: SearchEpc.get_house_number(
str(x["domna_full_address"]), str(x["domna_postcode"])
),
axis=1
)
manual_fixes = {
"2 Austcliffe Road Cookley, Kidderminster": "2250020",
'5 Brett Young Close, Kidderminster': "9800050"
}
incorrect_ciga_return = [
"19 Wood Street, Kidderminster",
"nan Charles Street",
"53 Harold Evers Way, Kidderminster",
'63 Harold Evers way'
]
ciga_lookup = []
for _, row in tqdm(cigas.iterrows(), total=len(cigas)):
if manual_fixes.get(row["Matched Address"]):
ll_pid = manual_fixes[row["Matched Address"]]
df = asset_list.standardised_asset_list[
(asset_list.standardised_asset_list["landlord_property_id"] == ll_pid)
]
ciga_lookup.append(
{
"domna_property_id": df["domna_property_id"].values[0],
"row_id": row["row_id"],
}
)
continue
if (row["Matched Address"] in incorrect_ciga_return) or (
" ".join([str(row["Address1"]), row["Address2"]]) in incorrect_ciga_return
):
continue
df = asset_list.standardised_asset_list[
(asset_list.standardised_asset_list["domna_postcode"] == row["Postcode"])
]
df = df[(df["house_no"].astype(str) == str(row["Address1"]))]
if df.empty:
df = asset_list.standardised_asset_list[
(asset_list.standardised_asset_list["domna_postcode"] == row["Matched Postcode"])
]
df = df[(df["house_no"].astype(str) == str(row["Address1"]))]
if df.shape[0] > 1:
df = asset_list.standardised_asset_list[
(asset_list.standardised_asset_list["domna_full_address"].str.lower().str.replace(",", "").str.contains(
row["Matched Address"].lower().replace(",", ""), na=False))
]
if df.empty:
df = asset_list.standardised_asset_list[
(asset_list.standardised_asset_list["domna_full_address"].str.lower().str.replace(",", "").str.contains(
row["Address2"].lower().replace(",", ""), na=False))
]
df = df[(df["house_no"].astype(str) == str(row["Address1"]))]
if df.shape[0] != 1:
raise Exception("More than one match found for {row['Address1']} in the asset list")
ciga_lookup.append(
{
"domna_property_id": df["domna_property_id"].values[0],
"row_id": row["row_id"],
}
)
ciga_lookup = pd.DataFrame(ciga_lookup)
cigas = cigas.merge(
ciga_lookup,
how="left",
on="row_id"
)
cigas = cigas[~pd.isnull(cigas["domna_property_id"])]
cigas = cigas.merge(
asset_list.standardised_asset_list[["domna_property_id", "landlord_property_id"]],
how="left",
on="domna_property_id"
)
# Note 4 entries in the CIGA checks did NOT match to the asset list (were for properties not owned by Community Housing)
master_data_sheet = master_data_sheet.merge(
cigas[["landlord_property_id", "Guarantee", "request"]].rename(
columns={"request": "CIGA request batch"}
),
how="left",
on="landlord_property_id"
)
# Fill missing survey_reference with funding_scheme
master_surveyed["survey_reference"] = master_surveyed["survey_reference"].fillna(
master_surveyed["funding_scheme"]
)
master_surveyed_to_merge = master_surveyed[
~master_surveyed["domna_property_id"].isin(sgec_billings["domna_property_id"].values)
]
master_surveyed_to_merge["Survey Status"] = "Surveyed, Submitted, not on SGEC Deck of Works"
# We now merge on what we've surveyed and submitted
master_data_sheet = master_data_sheet.merge(
master_surveyed_to_merge[
["landlord_property_id", "survey_reference", "submission_date", "cancelled", "Survey Status"]
].rename(
columns={
"survey_reference": "Survey Type", "submission_date": "Survey Date",
"cancelled": "Was the Install Cancelled?"
}
),
how="left",
on="landlord_property_id"
)
# We now deduce the status of the work based on sgec_billings
sgec_billings = sgec_billings.merge(
asset_list.standardised_asset_list[["landlord_property_id", "domna_property_id"]],
how="left",
on="domna_property_id"
)
dupe_ids = sgec_billings[sgec_billings["domna_property_id"].duplicated()]["domna_property_id"]
# We sort by domna_property_id and billed (where true should be first) and take the first instance
sgec_billings = sgec_billings.sort_values(
["domna_property_id", "billed"], ascending=[True, False]
)
sgec_billings = sgec_billings.drop_duplicates(
subset=["domna_property_id"],
keep="first"
)
sgec_billings["Survey Type"] = (
sgec_billings["SUBMISSION TYPE - ECO4,GBIS,SHDF,EPC or OTHER"].map(scheme_map) + ": " +
sgec_billings["MEASURE COMBO"]
)
sgec_billings["Survey Type"] = sgec_billings["Survey Type"].fillna(
sgec_billings["SUBMISSION TYPE - ECO4,GBIS,SHDF,EPC or OTHER"].map(scheme_map)
)
sgec_billings["Survey Date"] = sgec_billings['SUBMISSION DATE']
sgec_billings["Was the Install Cancelled?"] = (
sgec_billings["INSTALLED"].astype(str).str.lower().str.contains("cancel")
)
sgec_billings['Survey Status'] = np.where(
sgec_billings["billed"] == True,
"Surveyed, Submitted, on SGEC Deck of Works",
"Surveyed, not submitted to SGEC, on SGEC Deck of Works"
)
master_data_sheet = master_data_sheet.merge(
sgec_billings[
["landlord_property_id", "Survey Type", "Survey Date", "Was the Install Cancelled?", "Survey Status"]],
how="left",
on="landlord_property_id",
suffixes=("", "_y")
)
for col in ["Survey Type", "Survey Date", "Was the Install Cancelled?", "Survey Status"]:
master_data_sheet[col] = np.where(
pd.isnull(master_data_sheet[col]) & ~pd.isnull(master_data_sheet[col + "_y"]),
master_data_sheet[col + "_y"],
master_data_sheet[col]
)
master_data_sheet = master_data_sheet.drop(columns=[col + "_y"])
outcomes_not_in_master = outcomes_not_in_master.merge(
asset_list.standardised_asset_list[["landlord_property_id", "domna_property_id"]],
how="left",
left_on="domna_property_id",
right_on="domna_property_id"
)
# We also filter out any that were in the SGEC billings
outcomes_not_in_master = outcomes_not_in_master[
~outcomes_not_in_master["domna_property_id"].isin(sgec_billings["domna_property_id"].values)
]
# We now merge on outcomes. There are a small number of surveyed outcomes that were not submitted
master_data_sheet = master_data_sheet.merge(
outcomes_not_in_master[["landlord_property_id", 'Type of Funding', "Date Letter sent"]],
how="left",
on="landlord_property_id",
)
master_data_sheet["Survey Status"] = np.where(
pd.isnull(master_data_sheet["Survey Type"]) & ~pd.isnull(master_data_sheet["Type of Funding"]),
"Surveyed, On Outcomes, not submitted",
master_data_sheet["Survey Status"]
)
master_data_sheet["Survey Type"] = np.where(
pd.isnull(master_data_sheet["Survey Type"]) & ~pd.isnull(master_data_sheet["Type of Funding"]),
master_data_sheet["Type of Funding"],
master_data_sheet["Survey Type"]
)
master_data_sheet["Survey Date"] = np.where(
pd.isnull(master_data_sheet["Survey Date"]) & ~pd.isnull(master_data_sheet["Date Letter sent"]),
master_data_sheet["Date Letter sent"],
master_data_sheet["Survey Date"]
)
master_data_sheet = master_data_sheet.drop(columns=["Type of Funding", "Date Letter sent"])
# We now need to compare the submissions that SGEC have sent us, because the deck of works is likely incorrect given
# given the number of properties that have been received by SGEC
# We have submissions from the following dates:
# - 18/11/2024
# - 10/03/2024
# - A sheet that claims to be 25/11/2024 but has 18/11/2024 as the submission date
# - 16/12/2025
# - 02/12/2024
# - 10/02/2025
sgec_received_submissions = []
for filename in [
"4x108 18.11.24 - RT MASTERS SGEC INVOICE.xlsx",
"4x144 COMMUNITY HOUSING TOTAL PROJECT INV 10032025.xlsx",
"4x19 25.11.2024 - RT Master SGEC.xlsx",
"4x37 16.12.2024 - SGEC INVOICED.xlsx",
"4x60 02.12.2024 - RT SGEC INV.xlsx",
"4x78 10.02.2025 MASTERS - SGEC INVOICED-CORRECT.xlsx"
]:
data = pd.read_excel(
os.path.join(
data_folder, "SGEC Received Submissions", filename
),
)
data["filename"] = filename
sgec_received_submissions.append(data)
sgec_received_submissions = pd.concat(sgec_received_submissions)
sgec_received_submissions = sgec_received_submissions.reset_index(drop=True)
sgec_received_submissions["row_id"] = sgec_received_submissions.index
manual_fix = {
"5a+DY10 3JR": "6856005A",
'12+DY10 3JR': "78900120",
"9+DY10 3JR": "86280090",
'10+DY10 3JL': "86280100",
"66+DY10 3JS": "68560660",
"70+DY10 3JS": "68560700",
"72+DY10 3JS": "68560720",
"12+DY10 3JP": "86280120",
"2A+DY11 5TZ": "6872002A",
"3A+DY11 5TZ": "6872003A",
"4A+DY11 5TZ": "6872004A"
}
sgec_received_submissions_lookup = []
for _, row in tqdm(sgec_received_submissions.iterrows(), total=len(sgec_received_submissions)):
_key = "+".join([str(row["NO."]), str(row["Post Code"])])
if manual_fix.get(_key) is not None:
ll_pid = manual_fix[_key]
sgec_received_submissions_lookup.append(
{
"row_id": row["row_id"],
"landlord_property_id": ll_pid,
}
)
continue
match = sgec_billings[
(sgec_billings['NO.'].astype(str) == str(row['NO.'])) &
(sgec_billings['Post Code'] == row['Post Code'])
]
if match.shape[0] > 1:
raise Exception(f"something went wrong {_key} {row['Street / Block Name']}")
if match.shape[0] == 1:
sgec_received_submissions_lookup.append(
{
"row_id": row["row_id"],
"landlord_property_id": match["landlord_property_id"].values[0],
}
)
continue
match = master_surveyed[
(master_surveyed['original_house_no'].astype(str) == str(row['NO.'])) &
(master_surveyed['original_postcode'] == row['Post Code'])
]
if match.shape[0] > 1:
raise Exception(f"something went wrong 2 {_key} {row['Street / Block Name']}")
if match.shape[0] == 0:
raise Exception(f"No match {_key} {row['Street / Block Name']}")
sgec_received_submissions_lookup.append(
{
"row_id": row["row_id"],
"landlord_property_id": match["landlord_property_id"].values[0],
}
)
sgec_received_submissions_lookup = pd.DataFrame(sgec_received_submissions_lookup)
sgec_received_submissions = sgec_received_submissions.merge(
sgec_received_submissions_lookup[["row_id", "landlord_property_id"]],
how="left",
on="row_id"
)
sgec_received_submissions["Survey Type"] = (
sgec_received_submissions["SUBMISSION TYPE - ECO4,GBIS,SHDF,EPC or OTHER"].map(scheme_map) + ": " +
sgec_received_submissions["MEASURE COMBO"]
)
sgec_received_submissions["Survey Type"] = sgec_received_submissions["Survey Type"].fillna(
sgec_received_submissions["SUBMISSION TYPE - ECO4,GBIS,SHDF,EPC or OTHER"].map(scheme_map)
)
sgec_received_submissions["Survey Date"] = sgec_received_submissions['SUBMISSION DATE']
sgec_received_submissions["Was the Install Cancelled?"] = (
sgec_received_submissions["INSTALLED"].astype(str).str.lower().str.contains("cancel")
)
sgec_received_submissions['Survey Status'] = "Submission sent to SGEC, Confirmed by SGEC"
sgec_received_submissions["Survey Received by SGEC"] = True
# We now merge on the submissions that SGEC have sent us
master_data_sheet = master_data_sheet.merge(
sgec_received_submissions[
[
"landlord_property_id", "Survey Type", "Survey Date", "Was the Install Cancelled?", "Survey Status",
"Survey Received by SGEC"
]
],
how="left",
on="landlord_property_id",
suffixes=("", "_y")
)
# Fill in the gaps
for col in ["Survey Type", "Survey Date", "Was the Install Cancelled?", "Survey Status"]:
master_data_sheet[col] = np.where(
pd.isnull(master_data_sheet[col]) & ~pd.isnull(master_data_sheet[col + "_y"]),
master_data_sheet[col + "_y"],
master_data_sheet[col]
)
master_data_sheet = master_data_sheet.drop(columns=[col + "_y"])
if master_data_sheet["Asset_Reference"].duplicated().sum():
raise ValueError("There are duplicates in the asset reference column")
# Drop this at the end
master_data_sheet = master_data_sheet.drop(columns=["landlord_property_id"])
master_data_sheet.to_excel(
os.path.join(
data_folder, "Draft Results.xlsx"
),
)

View file

@ -0,0 +1,51 @@
import pandas as pd
tabs = [
"Straight Fill", "Solar PV - Straight Fill", "RDF CIGA checks", "Solar PV - RDF CIGA Checks",
"AT BUILD", "Solar PV - AT BUILD"
]
programme_revisions = []
for tab in tabs:
original_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward/Route March/WESTWARD - Route March Prep.xlsx",
sheet_name=tab,
)
revised_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward/Route March/WESTWARD - GBIS List revised for "
"Domna.xlsx",
sheet_name=tab,
)
revised_list["Client Review"] = "Retain in programme"
df = original_list[["Place ref"]].copy()
df["Tab"] = tab
df = df.merge(revised_list[["Place ref", "Client Review"]], how="left", on="Place ref")
df["Client Review"] = df["Client Review"].fillna("Remove from programme")
programme_revisions.append(df)
programme_revisions = pd.concat(programme_revisions)
# Read in the standardised asset list and create the column to append to that
al = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward/WESTWARD - completed list - "
"08.05.2025 - Standardised - Client Review.xlsx",
sheet_name="Standardised Asset List",
)
client_revisions = al[["landlord_property_id"]].merge(
programme_revisions[["Place ref", "Client Review"]],
how="left",
left_on="landlord_property_id",
right_on="Place ref",
)
client_revisions["Client Review"] = client_revisions["Client Review"].fillna("Needs Review")
client_revisions["Client Review Date"] = "08/05/2025"
client_revisions.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward/Route March/client_revisions.csv", index=False
)

View file

@ -20,9 +20,9 @@ def app():
"ventilation": 350,
"Room Roof Insulation": 210,
"Loft insulation": 15,
"Internal wall insulation": 215,
"Internal wall insulation": 131,
"External wall insulation": 298.35,
"Solid wall insulation": 215,
"Solid wall insulation": 131,
"LEDs": 35, # per light
"Flat Roof Insulation": 195,
"Double Glazing": 1140,
@ -71,149 +71,10 @@ def app():
"Ground Floor Flat": 10
}
# If we have a flat, we won't use the 199m2 floor area
floor_areas = [73, 97, 199]
# We remove age bracket, as we ended up with 360 combinations
# age_brackets = ["1945-1970", "1971-2002", "Post 2002"]
wall_type = ["cavity", "non-cavity"]
roof_type = ["pitched", "other"]
planning_constraints = [True, False]
# This is the list of all combinations of the above variables
combinations_untrimmed = product(
*[
dwelling_types, floor_areas, wall_type, roof_type, planning_constraints
]
)
# TODO: Possibly need to add an additional cost for immersion hot water
combinations = []
for comb in combinations_untrimmed:
if "Flat" in comb[0] and comb[1] == 199:
continue
# If we have a flat, not too much difference if it's in a conservation area or not
if "Flat" in comb[0] and comb[4] is True:
continue
combinations.append(comb)
risk_matrix = []
for combination in combinations:
n_floors = num_floors_map[combination[0]]
bf = built_form_map[combination[0]]
pt = "House" if "Flat" not in combination[0] else "Flat"
# Model the home as a box
ground_floor_area = combination[1] / n_floors
perimeter = np.sqrt(ground_floor_area) * 4
# This is the amount of insulation required
external_wall_area = estimate_external_wall_area(
num_floors=n_floors,
floor_height=2.5,
perimeter=perimeter,
built_form=bf
)
n_rooms = np.floor(combination[1] / 15)
n_windows = estimate_windows(
property_type=pt,
built_form=bf,
construction_age_band="",
floor_area=combination[1],
number_habitable_rooms=n_rooms
)
# We determine the exact upgrade pathway for this combination, guided by the generic upgrade pathway
combination_upgrade_pathway = []
for upgrade in upgrade_path:
if upgrade == "wall_insulation":
if combination[2] == "cavity":
combination_upgrade_pathway.append("cavity_wall_insulation")
else:
combination_upgrade_pathway.append("solid_wall_insulation")
continue
if upgrade == "roof_insulation":
if combination[3] == "pitched":
combination_upgrade_pathway.append("loft_insulation")
else:
combination_upgrade_pathway.append("non_pitched_roof_insualtion")
continue
if upgrade == "ventilation":
combination_upgrade_pathway.append("ventilation")
continue
if upgrade == "low_energy_lighting":
combination_upgrade_pathway.append("low_energy_lighting")
continue
if upgrade == "windows":
if not combination[4]:
combination_upgrade_pathway.append("double_glazing")
else:
combination_upgrade_pathway.append("secondary_glazing")
continue
if upgrade == "heating":
if combination[0] in ["Semi Detached House", "Detached House"]:
combination_upgrade_pathway.append("high_heat_retention_storage")
else:
combination_upgrade_pathway.append("air_source_heat_pump")
continue
if upgrade == "solar":
if combination[0] in ["Semi Detached House", "Detached House", "Mid Terrace House"]:
combination_upgrade_pathway.append("solar_pv")
continue
combination_costs = []
for measure in combination_upgrade_pathway:
unit_cost = pricing_matrix[measure]
# Wall insulation
if measure in ["cavity_wall_insulation", "internal_wall_insulation", "external_wall_insulation"]:
cost = unit_cost * external_wall_area
elif measure in ["loft_insulation"]:
cost = unit_cost * ground_floor_area
elif measure == "ventilation":
if combination[1] == 73:
cost = unit_cost * 2
elif combination[1] == 97:
cost = unit_cost * 3
else:
cost = unit_cost * 4
elif measure == "low_energy_lighting":
n_lights = lighting_count[combination[0]]
if combination[1] == 73:
inflation = 1
elif combination[1] == 97:
inflation = 1.2
else:
inflation = 1.5
cost = unit_cost * n_lights * inflation
elif measure in ["double_glazing", "secondary_glazing"]:
cost = unit_cost * n_windows
elif measure == "high_heat_retention_storage":
cost = unit_cost * n_rooms
elif measure in ["air_source_heat_pump", "solar_pv"]:
cost = unit_cost
else:
raise NotImplementedError("Implement: %s" % measure)
combination_costs.append(
{
"measure": measure,
"cost": cost
}
)
combination_costs = pd.DataFrame(combination_costs)
contingency = 0.26
epr_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Risk Matrix/EPR Data.xlsx", header=1
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Risk Matrix/EPR Data V2.xlsx", header=1
)
epr_data["Measure added"].value_counts()
epr_data["row_id"] = epr_data.index
@ -318,6 +179,6 @@ def app():
)
with pd.ExcelWriter(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Risk Matrix/risk_matrix.xlsx") as writer:
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/L&G/Risk Matrix/risk_matrix V2.xlsx") as writer:
risk_matrix.to_excel(writer, sheet_name="Risk Matrix", index=False)
pricing_df.to_excel(writer, sheet_name="Pricing Assumptions", index=False)

View file

@ -0,0 +1,60 @@
"""
The data held on file for MHS is fairly incomplete, where not every single property has an observation
"""
from tqdm import tqdm
import pandas as pd
from docutils.utils.math.tex2mathml_extern import blahtexml
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MHS/MHS HOMES (Full Asset List) - for analysis.xlsx",
sheet_name="Data"
)
# When this list was checked, an observation was made per postcode, and so we need to extrapolate those findings
inspections_observatons = asset_list[["UPRN", "Postcode", "ManagementGroup", "WFT Findings"]].copy()
populated = []
for _, group in tqdm(inspections_observatons.groupby("Postcode"),
total=len(inspections_observatons.groupby("Postcode"))):
if all(pd.isnull(group["WFT Findings"])):
group["WFT Findings"] = "Property not inspected"
populated.append(group)
continue
fill_observation = group["WFT Findings"].values[0]
if pd.isnull(fill_observation):
group["WFT Findings"] = group["WFT Findings"].fillna("Property not inspected")
populated.append(group)
continue
group = group.reset_index(drop=True)
group_filled = []
for idx, x in group.iterrows():
if idx == 0:
group_filled.append(x)
continue
new_value = x["WFT Findings"]
if not pd.isnull(new_value):
fill_observation = new_value
x["WFT Findings"] = fill_observation
group_filled.append(x)
group_filled = pd.DataFrame(group_filled)
populated.append(group_filled)
populated = pd.concat(populated)
missed = populated[~populated["UPRN"].isin(asset_list["UPRN"].values)]
asset_list = asset_list.drop(columns=["WFT Findings"]).merge(
populated.drop(columns=["Postcode", "ManagementGroup"]), how="left", on="UPRN"
)
# Store the data
asset_list.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/MHS/MHS HOMES (Full Asset List) - for programme build.xlsx"
)

View file

@ -0,0 +1,28 @@
"""
Simple script to tidy up the unitas asset list
"""
import pandas as pd
df = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Unitas/UNITAS - Asset List.xlsx",
sheet_name="Asset List"
)
df["Warmfront Finding"] = df["Warmfront Finding"].str.lower().str.strip()
mapping = pd.read_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Unitas/unitas-mapped-categories.csv",
)
al = df.merge(
mapping[["non-intrusives: WFT Findings", 'mapped_category']].rename(
columns={"mapped_category": "WFT Findings"}
),
how="left",
left_on="Warmfront Finding",
right_on="non-intrusives: WFT Findings"
)
al.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Unitas/unitas_asset_list_for_analysis.xlsx",
index=False
)