Model/etl/customers/peabody/Nov 2025 Consulting Project/data_prep.py
2025-12-03 22:07:52 +00:00

369 lines
14 KiB
Python

"""
This scipt prepares the raw data that was sent over by Peabody for production of
a standardised asset list
They have sent over just short of 100,000 properties and so, to make this easier, we will do the following
1) Break the data up into subsets of 25,000
2) Combine the data provided into a single list
"""
import json
import time
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from dotenv import load_dotenv
from asset_list.utils import get_data_for_property
from utils.logger import setup_logger
logger = setup_logger()
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
property_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Properties"
)
sustainability_data = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/2025_11_11 - Peabody "
"- Data Extracts for Domna.xlsx",
sheet_name="Sustainability"
)
# Basic overview:
# 1) We have 10,634 postcodes. If we needed to make requests to the ordnance survey API for
# all of these postcodes, it would cost at least £106, not accounting for double requests for postcodes
# where we have more than 100 properties (WE DONT!)
# 2) This is on average 9.36 properties per postcode
# 3) The UPRN in the property_list matches to the Org Ref in the sustainability data. These
# is an additional UPRN column in sustainability data which appears to be the ordnance survey UPRN
# 4) There appears to be some anomalous records, e.g. a flat with 543 m2 floor area and another flat
# with 6m2 floor area
# 5) Based on the residential indicator, all properties appear to be resi
# 6) We should do some quick calcs on how much it might cost to fetch all of the solar API data
# 7) We have 8785 missing UPRNS, which we should potentially try and fill
# 8) In the backend, we should probably start storing the raw EPC input data to allow for much quicker
# re-runs. All we really need to do is store the find my EPC data, perhaps against UPRN and RRN, as well
# as the raw EPC data, against uprn. This will be useful for scenario re-builds and will be much much
# quicker, as a starting point. Do we store in the database vs s3? TBC
n_postcodes = property_list["Post Code"].nunique()
postcode_summary = property_list.groupby("Post Code")["UPRN"].count().reset_index()
postcode_summary["UPRN"].mean()
def classify_floor_area(x):
if x <= 72:
return "0-72"
if x <= 97:
return "73-97"
if x <= 199:
return "98-199"
return "200+"
sustainability_data["Postal Region"] = sustainability_data["Postcode"].str.split(" ").str[0]
sustainability_data["Floor Area Band"] = sustainability_data["Total Floor Area (m2)"].apply(
lambda x: classify_floor_area(x)
)
# Archetype reductions
# Roof insulation category
# 1) Split roof insulation into > 100mm loft and <= 100mm loft
sustainability_data["Roof Insulation Category"] = sustainability_data["Roof Insulation"].copy()
# sustainability_data["Roof Insulation Category"] = np.where(
# sustainability_data["Roof Insulation Category"].isin(
# ['mm200', 'mm300', 'mm250', 'mm150', 'mm270', 'mm400', 'mm350'],
# ),
# "LI > 100mm",
# sustainability_data["Roof Insulation Category"],
# )
# sustainability_data["Roof Insulation Category"] = np.where(
# sustainability_data["Roof Insulation Category"].isin(
# ['mm100', 'mm50', 'mm75', 'mm25'],
# ),
# "LI <= 100mm",
# sustainability_data["Roof Insulation Category"],
# )
# 2) Group all of the glazed together (e.g. double glazed, secondary glazed, triple glazed)
sustainability_data["Glazing Type"] = sustainability_data["Glazing"].copy()
# sustainability_data["Glazing Type"] = np.where(
# sustainability_data["Glazing Type"].isin(
# ['Double 2002 or later', 'Double before 2002', 'Double but age unknown', 'DoubleKnownData']
# ),
# "Double Glazed",
# sustainability_data["Glazing Type"],
# )
# sustainability_data["Glazing Type"] = np.where(
# sustainability_data["Glazing Type"].isin(['Triple', 'TripleKnownData']),
# "Triple Glazed",
# sustainability_data["Glazing Type"],
# )
# 3) Group up boiler efficiency A, B-D, E - G? or someting like this
sustainability_data["Boiler Efficiency Group"] = sustainability_data["Boiler Efficiency"].copy()
# sustainability_data["Boiler Efficiency Group"] = np.where(
# sustainability_data["Boiler Efficiency Group"].isin(['B', 'C', 'D']),
# "B-D",
# sustainability_data["Boiler Efficiency Group"],
# )
# sustainability_data["Boiler Efficiency Group"] = np.where(
# sustainability_data["Boiler Efficiency Group"].isin(['E', 'F', 'G']),
# "E-G",
# sustainability_data["Boiler Efficiency Group"],
# )
# 4) Group up main fuel into gas, electric, oil, other?
sustainability_data["Main Fuel Group"] = sustainability_data["Main Fuel"].copy()
# sustainability_data["Main Fuel Group"] = np.where(
# sustainability_data["Main Fuel Group"].isin(
# ["SmokelessCoal", "BiomassCommunity", "B30DCommunity"]
# ),
# "Other Fuel",
# sustainability_data["Main Fuel Group"],
# )
# 5) Wall Construction - group up Sandstone and Granite into one category
# sustainability_data["Wall Construction"] = np.where(
# sustainability_data["Wall Construction"].isin(["Sandstone", "Granite"]),
# "Sandstone/Granite",
# sustainability_data["Wall Construction"]
# )
# sustainability_data["Wall Construction"] = np.where(
# sustainability_data["Wall Construction"].isin(["Timber Frame", "System"]),
# "Timber/System",
# sustainability_data["Wall Construction"]
# )
# 6) Reduce or remove floor construction
# sustainability_data["Floor Construction"] = np.where(
# sustainability_data["Floor Construction"].isin(["SuspendedTimber", "SuspendedNotTimber"]),
# "Suspended Floor",
# sustainability_data["Floor Construction"]
# )
# 7) Reduce wall insulation
# sustainability_data["Wall Insulation"] = np.where(
# sustainability_data["Wall Insulation"].isin(
# ["FilledCavityPlusInternal", "FilledCavityPlusExternal", "FilledCavity", "External", "Internal"]
# ),
# "Insulated",
# sustainability_data["Wall Insulation"]
# )
# 8) Fill floor insulation
sustainability_data["Floor Insulation"] = sustainability_data["Floor Insulation"].fillna("Unknown")
# 9) Reduce Age bands
# sustainability_data["Construction Years"] = np.where(
# sustainability_data["Construction Years"].isin(["2003-2006", "2007-2011", "2012 onwards"]),
# "2003 onwards",
# sustainability_data["Construction Years"],
# )
# sustainability_data["Construction Years"] = np.where(
# sustainability_data["Construction Years"].isin(["Before 1900", "1900-1929"]),
# "Before 1929",
# sustainability_data["Construction Years"],
# )
# sustainability_data["Construction Years"] = np.where(
# sustainability_data["Construction Years"].isin(["1983-1990", "1991-1995"]),
# "1983-1995",
# sustainability_data["Construction Years"],
# )
# sustainability_data["Construction Years"] = np.where(
# sustainability_data["Construction Years"].isin(["1950-1966", "1967-1975", "1976-1982"]),
# "1950-1982",
# sustainability_data["Construction Years"],
# )
# Roof
# sustainability_data["Roof Construction"] = np.where(
# sustainability_data["Roof Construction"].isin(
# ["PitchedNormalLoftAccess", "PitchedThatched", "PitchedNormalNoLoftAccess", "PitchedWithSlopingCeiling"]
# ),
# "Pitched Roof",
# sustainability_data["Roof Construction"]
# )
archetype_variables = [
"Type", "Attachment", "Construction Years", "Wall Construction", "Wall Insulation",
"Roof Construction", "Roof Insulation Category", "Floor Construction", "Floor Insulation",
"Glazing Type", "Heating", "Boiler Efficiency Group", "Main Fuel Group", "Controls Adequacy",
"Floor Area Band"
]
archetypes = sustainability_data[archetype_variables + ["UPRN"]].dropna().groupby(archetype_variables)[
"UPRN"].nunique().reset_index().rename(columns={"UPRN": "Count"}).sort_values(by="Count",
ascending=False).reset_index(
drop=True)
# We take a sample that represents 95% of the properties
archetypes["Cumulative Count"] = archetypes["Count"].cumsum()
archetypes["Cumulative Proportion"] = archetypes["Cumulative Count"] / archetypes["Count"].sum()
archetypes_95 = archetypes.copy()
archetypes_95["Archetypes_95_reference"] = archetypes_95.index + 1
archetypes_95["Archetypes_95_reference"] = "Archetype_Sample_" + archetypes_95["Archetypes_95_reference"].astype(str)
# For the sample, look for invalid looking UPRNs and remove them.
sample_from = sustainability_data.copy()
# 1) Check for UPRNs that are not numeric or begin with a Zero
sample_from["uprn_not_numeric"] = ~sample_from["UPRN"].apply(lambda x: str(x).isnumeric())
sample_from = sample_from[~sample_from["uprn_not_numeric"]]
sample_from["uprn_has_leading_zero"] = sample_from["UPRN"].apply(lambda x: str(x).startswith("0"))
sample_from = sample_from[~sample_from["uprn_has_leading_zero"]]
sample_from = sample_from[~pd.isnull(sample_from["UPRN"])]
# We now take a sample of the properties that represent 85% of the total properties
sample_from = sample_from.merge(
archetypes_95,
on=archetype_variables,
how="inner"
)
# We take 1 random property, by archetype reference
modelling_sample = sample_from.groupby("Archetypes_95_reference").apply(
lambda x: x.sample(1, random_state=42)
).reset_index(drop=True)
# Checking distributions
def compare_distributions(full_df, sample_df, column):
full_dist = full_df[column].value_counts(normalize=True)
sample_dist = sample_df[column].value_counts(normalize=True)
comparison = pd.concat([full_dist, sample_dist], axis=1, keys=['Full', 'Sample']).fillna(0)
return comparison
for col in archetype_variables:
print(f"--- {col} ---")
print(compare_distributions(sustainability_data, sample_from, col))
# prepare
modelling_sample["domna_property_id"] = modelling_sample.index + 1
# Rename
modelling_sample = modelling_sample.rename(
columns={
"Org Ref": "landlord_property_id", "Address 1": "domna_address_1",
"Postcode": "postcode", "Type": "landlord_property_type",
"Attachment": "landlord_built_form",
"Heating": "landlord_heating_system",
"UPRN": "epc_os_uprn"
}
)
modelling_sample["landlord_built_form"] = modelling_sample["landlord_built_form"].map(
{
"MidTerrace": "Mid-Terrace",
"EndTerrace": "End-Terrace",
"SemiDetached": "Semi-Detached",
"Detached": "Detached",
"EnclosedEndTerrace": "Enclosed End-Terrace",
"EnclosedMidTerrace": "Enclosed Mid-Terrace",
}
)
if pd.isnull(modelling_sample["landlord_built_form"]).sum():
raise ValueError("Some built forms are null after mapping")
# Placeholder copies
def make_full_address(x):
to_join = [x['domna_address_1'], x['Address 2'], x['Address 3']]
to_join = [x for x in to_join if not pd.isnull(x) and x != '']
return ", ".join(to_join)
modelling_sample["domna_full_address"] = modelling_sample.apply(lambda x: make_full_address(x), axis=1)
# Save this CSV as input
modelling_sample.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/modelling_sample.xlsx",
sheet_name="Standardised Asset List"
)
# Save the archetype definitions
archetypes_95.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/archetypes_85.xlsx",
)
# Save the full archetypes
archetypes.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/full_archetypes.xlsx",
)
# Maps the property types to the format recognised by the EPC api
property_type_map = {}
# Maps the build form to the format recognised by the OS api
built_form_map = {}
# Proposed data fetching
# 1) grab propeties with UPRN and fetch the assocated EPC data & find my EPC data
# Some thoughts:
# S3 is quite cheap to query however we may incur some cost if we're making hundreds of thousands of calls
# to S3 to fetch data out of it. It's cheap to fetch data, if we aren't taking data out of S3, but we
# should consider this. This may influence whether or not we want to store each record individually
# against UPRN, or store against the 10,641 postcodes. We can fetch the data and store in a single
# large dump and then determine later if we want to split it up
# TODO: Handle properties without uprn
# TODO: I think we can json dump all of this, but check if we can load and re-use the page source
# TODO: Create batches?
batch_size = 500
batch_indexes = list(range(0, len(sustainability_data), batch_size))
# TODO: SET
working_directory = ""
download_contents = os.listdir(working_directory)
for i in range(0, len(sustainability_data.standardised_asset_list), batch_size):
batch_name = f"batch_{i}_to_{i + batch_size}"
# TODO: Check this
if batch_name in download_contents:
# Means we already have the data downloaded
continue
batch_data = {}
for _, property_data in tqdm(sustainability_data.iterrows(), total=len(sustainability_data)):
os_uprn = property_data["UPRN"]
address1 = property_data["Address 1"]
postcode = property_data["Postcode"]
full_address_components = [
x for x in [property_data["Address 1"], property_data["Address 2"], property_data["Address 3"]]
if not pd.isnull(x)
]
full_address = ", ".join(full_address_components)
fetched_data = get_data_for_property(
address1=address1,
postcode=postcode,
full_address=full_address,
property_type=property_type_map[property_data["Type"]],
built_form=built_form_map[property_data["Attachment"]],
uprn=property_data["UPRN"],
epc_auth_token=EPC_AUTH_TOKEN,
find_my_epc_return_page=True
)
batch_data[property_data["Org Ref"]] = fetched_data
# TODO: We likely want to do something like this: to slow down
# TODO: We also perhaps store the data in batches
if len(batch_data) % 50 == 0 and len(batch_data) > 0:
logger.info("Sleeping for 10 seconds to avoid hitting API rate limit")
time.sleep(10)
# Store the batch data in the wd
with open(os.path.join(working_directory, batch_name), "wb") as f:
json.dump(batch_data, f)