Model/asset_list/app.py
2025-02-21 22:57:56 +00:00

506 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
import msgpack
from utils.s3 import read_from_s3
from asset_list.AssetList import AssetList
from asset_list.mappings.property_type import PROPERTY_MAPPING
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(
df, manual_uprn_map, epc_api_only=False, row_id_name="row_id"
):
uprn_column = AssetList.STANDARD_UPRN
fulladdress_column = AssetList.STANDARD_FULL_ADDRESS
address1_column = AssetList.STANDARD_ADDRESS_1
postcode_column = AssetList.STANDARD_POSTCODE
# These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat"
}
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(df.iterrows(), total=len(df)):
try:
postcode = home[postcode_column]
house_number = str(home[address1_column]).strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
if uprn is None and home.get(uprn_column):
uprn = home[uprn_column]
if pd.isnull(uprn):
uprn = None
property_type = property_type_map.get(home[AssetList.STANDARD_PROPERTY_TYPE], None)
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
# As a final resort, we estimate the EPC
if property_type is not None and searcher.newest_epc is None:
searcher.ordnance_survey_client.property_type = property_type
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home[row_id_name])
continue
if epc_api_only:
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_data = {}
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home[row_id_name])
time.sleep(5)
return epc_data, errors, no_epc
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list["address1_extracted"] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
raise ValueError(f"Method {method} not recognized")
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
Data request contents:
Date of last EPC
Reason for EPC
SAP score on register
Property Type
Property Area
Property Age
Any Dimensions (HLP,PW,RH)
Property Wall Construction
Heating Type
Secondary Heating
Loft Insulation Depth
Additional if possible:
Heat loss calculations
EPC recommendations
Property UPRN
"""
# TODO:
# For cavity work:
# - Flag any entries that have a different wall type between non-intrusive data against EPC
# - Worth double checking entries that have a difference in wall construction
# - Look at anything that is flagged as an empty cavity but the EPC data says its a filled cavity
# - Look at the current EPC scores - Anything that is C75 or above, especially if its assumed no insulation
# - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats
# are less than C75
# - Flag anything pre SAP2012
# - Flag anything over 5 years old
# - Look at year built vs age band
#
# For Solar:
# - Discount any that have solar PV - based on non-intrusives and from the inspections team
# - In the heating, discount anything that isnt ashp, ghsp, hhrs, electric storage - possibly homes with
# electric room heaters but it might need to be an EPC E
# - Fabric - check the floor, wall and roof:
# - Filled or empty cavity is good
# - Insulated solid/timber/system built is good
# - SCIS/CEG needs solid floors
# - JJC dont care
# - Anything with a loft 200 or below
# - Anything C75 and above wont qualify
# - Insulated loft = 200mm
# - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
# - Or the insulation required is loft/cavity (floors should be solid)
# For Westward
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward"
DATA_FILENAME = "WESTWARD - completed list..xlsx"
SHEET_NAME = "Sheet1"
POSTCODE_COLUMN = "WFT EDIT Postcode"
FULLADDRESS_COLUMN = "Address"
ADDRESS1_COLUMN = None
ADDRESS1_METHOD = "house_number_extraction"
ADDRESS_COLS_TO_CONCAT = []
MISSING_POSTCODES_METHOD = None
PROPERTY_YEAR_BUILT = "Build date"
UPRN_COLUMN = "UPRN"
# If we have the non-intrusives data, this should be true
HAS_NON_INTRUSIVES = True
PROPERTY_TYPE_COLUMN = "Location type" # This will be used to identify and remove bedsits
# Maps addresses to uprn in problematic cases
MANUAL_UPRN_MAP = {}
asset_list = AssetList(
local_filepath=os.path.join(DATA_FOLDER, DATA_FILENAME),
header=0,
sheet_name=SHEET_NAME,
address1_colname=ADDRESS1_COLUMN,
postcode_colname=POSTCODE_COLUMN,
landlord_property_id="UPRN",
full_address_colname=FULLADDRESS_COLUMN,
full_address_cols_to_concat=ADDRESS_COLS_TO_CONCAT,
missing_postcodes_method=MISSING_POSTCODES_METHOD,
address1_extraction_method=ADDRESS1_METHOD,
landlord_year_built=PROPERTY_YEAR_BUILT,
landlord_uprn=UPRN_COLUMN,
landlord_property_type=PROPERTY_TYPE_COLUMN,
landlord_wall_construction="Wall Construction (EPC)",
landlord_heating_system="Heat Source",
landlord_existing_pv="PV (Y/N)"
)
asset_list.init_standardise()
# We produce the new maps, which can be saved for future useage
new_property_type_map = PROPERTY_MAPPING.copy().update(
asset_list.variable_mappings[asset_list.landlord_property_type]
)
new_wall_map = WALL_CONSTRUCTION_MAPPINGS.copy().update(
asset_list.variable_mappings[asset_list.landlord_wall_construction]
)
new_heating_map = HEATING_MAPPINGS.copy().update(
asset_list.variable_mappings[asset_list.landlord_heating_system]
)
new_existing_pv_map = EXISTING_PV_MAPPINGS.copy().update(
asset_list.variable_mappings[asset_list.landlord_existing_pv]
)
asset_list.apply_standardiation()
# DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester"
# DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx"
# SHEET_NAME = "Sheet1"
# POSTCODE_COLUMN = 'Full Address.1'
# FULLADDRESS_COLUMN = "Full Address"
# ADDRESS1_COLUMN = None
# ADDRESS1_METHOD = "first_word"
# ADDRESS_COLS_TO_CONCAT = []
# MISSING_POSTCODES_METHOD = None
# PROPERTY_YEAR_BUILT = "Build Date"
# UPRN_COLUMN = None
# # If we have the non-intrusives data, this should be true
# HAS_NON_INTRUSIVES = True
### We retrieve the EPC data
# We chunk up this data into 5000 rows at a time
# Create the chunks directory
force_retrieve_data = False
skip = None # Used to skip already completed chunks
chunk_size = 5000
filename = "Chunk {i}.csv"
download_folder = os.path.join(DATA_FOLDER, "Chunks")
if not os.path.exists(download_folder):
os.makedirs(download_folder)
chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size))
downloaded_files = {filename.format(i=i) for i in chunk_indexes}
# We check if we have files associated to these files already and if we do, and we do not want to force the
# fetching of the data, we skip
folder_contents = os.listdir(download_folder)
if all(x in folder_contents for x in downloaded_files):
skip = max(chunk_indexes)
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
manual_uprn_map=MANUAL_UPRN_MAP,
)
# We now retrieve any failed properties
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
epc_data_failed, _, _ = get_data(
df=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
manual_uprn_map=MANUAL_UPRN_MAP,
epc_api_only=False
)
epc_data_chunk.extend(epc_data_failed)
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False)
# Store the errors and no-data locally
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f:
json.dump(no_epc_chunk, f)
# We read in and concatenate the created created chunks
# List the contents
epc_data = []
for file in downloaded_files:
csv_data = pd.read_csv(os.path.join(download_folder, file))
# We need to convert the recommendations back to a list
csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)
epc_data.append(csv_data)
epc_df = pd.concat(epc_data)
# We expand out the recommendations
recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]]
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
transformed_data = []
for _, row in recommendations_df.iterrows():
# Initialize a dictionary for this row with False for all recommendations
row_data = {col: False for col in columns}
row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID]
# Set True for each recommendation present in this row
for rec in row["recommendations"]:
recommendation_text = rec["improvement-summary-text"]
row_data[recommendation_text] = True
# Append the row data to transformed_data
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
transformed_df = transformed_df[
[
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
"Floor insulation", "Floor insulation (suspended floor)"
]
]
transformed_df["epc_has_floor_recommendation"] = (
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
transformed_df["Floor insulation (suspended floor)"]
)
# Get the find my epc data
find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop(
columns=["find_my_epc_data"]).join(
pd.json_normalize(epc_df["find_my_epc_data"])
)
find_my_epc_data = find_my_epc_data.merge(
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
how="left", on=asset_list.DOMNA_PROPERTY_ID
)
# We check if we get the solar pv column:
if "Solar photovoltaics" not in find_my_epc_data.columns:
find_my_epc_data["Solar photovoltaics"] = False
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
].rename(
columns=asset_list.EPC_API_DATA_NAMES
)
epc_df = epc_df.merge(
find_my_epc_data[
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
]
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
on=asset_list.DOMNA_PROPERTY_ID
)
asset_list.merge_data(epc_df)
asset_list.extract_attributes()
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
asset_list.identify_worktypes(cleaned)
# TODO: We should do this breakdown for flats
def flat_analysis(asset_list):
# We need to deduce the building name - we strip out the house number
def extract_building_name(x):
# TODO: This doesn't really work
if pd.isnull(x):
return None
house_no = SearchEpc.get_house_number(address=x, postcode=None)
if house_no:
return x.replace(house_no, "").strip()
return x.split(",")[0].strip()
# We want to deduce if flats have 50% of the properties below C75
# We group by postcode and property type
grouped = asset_list.groupby([POSTCODE_COLUMN, "Property Type"])
flat_data = []
for _, group in grouped:
if "flat" in group["Property Type"].str.lower().values:
num_flats = group["Property Type"].str.lower().value_counts().get("flat", 0)
num_below_c75 = group["SAP score on register"].lt(75).sum()
flat_data.append(
{
"Postcode": group[POSTCODE_COLUMN].iloc[0],
"Property Type": "Flat",
"Number of Flats with EPC": num_flats,
"Number of Flats below C75": num_below_c75,
"Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats)
}
)
flat_data = pd.DataFrame(flat_data)
return flat_data
flat_data = flat_analysis(asset_list)
# Store as an excel
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
with pd.ExcelWriter(filename) as writer:
asset_list.to_excel(writer, sheet_name="EPC Data", index=False)
flat_data.to_excel(writer, sheet_name="Flat Data", index=False)
matches_review = asset_list[
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
]