Model/asset_list/app.py
2025-03-08 18:39:25 +00:00

619 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from pprint import pprint
import msgpack
from utils.s3 import read_from_s3
from asset_list.AssetList import AssetList
from asset_list.mappings.property_type import PROPERTY_MAPPING
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def get_data(
df, manual_uprn_map, epc_api_only=False, row_id_name="row_id"
):
uprn_column = AssetList.STANDARD_UPRN
fulladdress_column = AssetList.STANDARD_FULL_ADDRESS
address1_column = AssetList.STANDARD_ADDRESS_1
postcode_column = AssetList.STANDARD_POSTCODE
# These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
property_type_map = {
"house": "House",
"flat": "Flat",
"maisonette": "Maisonette",
"bungalow": "Bungalow",
"block house": "House",
"coach house": "House",
"bedsit": "Flat"
}
epc_data = []
errors = []
no_epc = []
for _, home in tqdm(df.iterrows(), total=len(df)):
try:
# If we have a block of flats, we cannot retrieve this data
if home[AssetList.STANDARD_PROPERTY_TYPE] == "block of flats":
no_epc.append(home[row_id_name])
continue
postcode = home[postcode_column]
house_number = str(home[address1_column]).strip()
full_address = home[fulladdress_column].strip()
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
if house_no is None:
house_no = house_number
uprn = manual_uprn_map.get(full_address, None)
if uprn is None and home.get(uprn_column):
uprn = home[uprn_column]
if pd.isnull(uprn):
uprn = None
property_type = property_type_map.get(home[AssetList.STANDARD_PROPERTY_TYPE], None)
searcher = SearchEpc(
address1=str(house_no),
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5,
uprn=uprn
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
# Check if we have a flat or appartment
if searcher.newest_epc is None and uprn is None:
# Try again:
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
# Backup
add1 = full_address.split(",")
if len(add1) > 1:
add1 = add1[1].strip()
else:
# Try splitting on space
add1 = full_address.split(" ")[0].strip()
else:
add1 = str(house_number)
searcher = SearchEpc(
address1=add1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address,
max_retries=5
)
if (
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
house_number.lower()
):
searcher.ordnance_survey_client.property_type = "Flat"
searcher.find_property(skip_os=True)
# As a final resort, we estimate the EPC
if property_type is not None and searcher.newest_epc is None:
searcher.ordnance_survey_client.property_type = property_type
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
no_epc.append(home[row_id_name])
continue
# Look for EPC recommendatons
try:
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
except:
property_recommendations = {"rows": []}
if epc_api_only:
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"]
}
epc_data.append(epc)
continue
# Retrieve data from FindMyEPC
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
try:
find_epc_searcher = RetrieveFindMyEpc(
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
except ValueError as e:
if "No EPC found" in str(e):
find_epc_data = {}
else:
find_epc_data = {}
except Exception as e:
raise Exception(f"Error retrieving FindMyEPC data: {e}")
time.sleep(np.random.uniform(0.1, 1))
epc = {
row_id_name: home[row_id_name],
**searcher.newest_epc.copy(),
"recommendations": property_recommendations["rows"],
"find_my_epc_data": find_epc_data,
}
epc_data.append(epc)
except Exception as e:
errors.append(home[row_id_name])
time.sleep(5)
return epc_data, errors, no_epc
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list["address1_extracted"] = asset_list.apply(
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
raise ValueError(f"Method {method} not recognized")
def app():
"""
This app is EPC pulling data for some properties owned by Livewest
Data request contents:
Date of last EPC
Reason for EPC
SAP score on register
Property Type
Property Area
Property Age
Any Dimensions (HLP,PW,RH)
Property Wall Construction
Heating Type
Secondary Heating
Loft Insulation Depth
Additional if possible:
Heat loss calculations
EPC recommendations
Property UPRN
"""
# TODO:
# For cavity work:
# - Flag any entries that have a different wall type between non-intrusive data against EPC
# - Worth double checking entries that have a difference in wall construction
# - Look at anything that is flagged as an empty cavity but the EPC data says its a filled cavity
# - Look at the current EPC scores - Anything that is C75 or above, especially if its assumed no insulation
# - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats
# are less than C75
# - Flag anything pre SAP2012
# - Flag anything over 5 years old
# - Look at year built vs age band
#
# For Solar:
# - Discount any that have solar PV - based on non-intrusives and from the inspections team
# - In the heating, discount anything that isnt ashp, ghsp, hhrs, electric storage - possibly homes with
# electric room heaters but it might need to be an EPC E
# - Fabric - check the floor, wall and roof:
# - Filled or empty cavity is good
# - Insulated solid/timber/system built is good
# - SCIS/CEG needs solid floors
# - JJC dont care
# - Anything with a loft 200 or below
# - Anything C75 and above wont qualify
# - Insulated loft = 200mm
# - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
# - Or the insulation required is loft/cavity (floors should be solid)
# Ealing
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Ealing/Programme data - 04032025"
# data_filename = "Ealing BC - Property Plus Tenure 25.02.2025.xlsx"
# sheet_name = "IGNORE - FULL MAIN"
# postcode_column = 'Postcode'
# fulladdress_column = "Address"
# address1_column = None
# address1_method = "first_word"
# address_cols_to_concat = []
# missing_postcodes_method = None
# landlord_year_built = "Year Built"
# landlord_os_uprn = None
# landlord_property_type = "Property Type Code"
# landlord_wall_construction = None
# landlord_heating_system = None
# landlord_existing_pv = None
# landlord_property_id = "Property ref"
# For Westward
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward"
# data_filename = "WESTWARD - completed list..xlsx"
# sheet_name = "Sheet1"
# postcode_column = "WFT EDIT Postcode"
# fulladdress_column = "Address"
# address1_column = None
# address1_method = "house_number_extraction"
# address_cols_to_concat = []
# missing_postcodes_method = None
# landlord_year_built = "Build date"
# landlord_os_uprn = "UPRN"
# landlord_property_type = "Location type"
# landlord_wall_construction = "Wall Construction (EPC)"
# landlord_heating_system = "Heat Source"
# landlord_existing_pv = "PV (Y/N)"
# landlord_property_id = "Place ref"
# For ACIS - programme re-build
# data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/ACIS Full Programme Review March 2025"
# data_filename = "ACIS asset list.xlsx"
# sheet_name = "Assets"
# address1_column = "House No"
# postcode_column = "Postcode"
# landlord_property_id = "UPRN"
# fulladdress_column = None
# address_cols_to_concat = ["House No", "Street", "Town"]
# missing_postcodes_method = None
# address1_method = None
# landlord_year_built = "YEAR BUILT"
# landlord_os_uprn = None
# landlord_property_type = "Property type"
# landlord_wall_construction = "Wall Constuction"
# landlord_heating_system = "Heating"
# landlord_existing_pv = None
# outcomes_filename = "ACIS Group - 25.11.2024 - outcomes.xlsx"
# outcomes_sheetname = "Feedback"
# outcomes_postcode = "Postcode"
# outcomes_houseno = "No"
# master_filepaths = [
# os.path.join(data_folder, "ECO 3 -Table 1.csv"),
# os.path.join(data_folder, "ECO 4 -Table 1.csv"),
# ]
# master_to_asset_list_filepath = None
# For plus dane
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Plus Dane"
data_filename = "PLUS DANE Asset List - for analysis.xlsx"
sheet_name = "Asset List"
address1_column = " Address"
postcode_column = " Postcode"
landlord_property_id = "UPRN"
fulladdress_column = " Address"
address_cols_to_concat = []
missing_postcodes_method = None
address1_method = None
landlord_year_built = "Property Age"
landlord_os_uprn = None
landlord_property_type = "Property Type"
landlord_wall_construction = "Landlord Wall Full"
landlord_heating_system = "Landlord Heating"
landlord_existing_pv = None
outcomes_filename = "plus dane outcomes.xlsx"
outcomes_sheetname = "EVERYTHING"
outcomes_postcode = "Post Code"
outcomes_houseno = "Numb."
master_filepaths = [
os.path.join(data_folder, "JJC Rolling Master.csv"),
os.path.join(data_folder, "SCIS Rolling Master.csv"),
]
master_to_asset_list_filepath = os.path.join(data_folder, "surveys_to_assets.csv")
# Maps addresses to uprn in problematic cases
manual_uprn_map = {}
asset_list = AssetList(
local_filepath=os.path.join(data_folder, data_filename),
header=0,
sheet_name=sheet_name,
address1_colname=address1_column,
postcode_colname=postcode_column,
landlord_property_id=landlord_property_id,
full_address_colname=fulladdress_column,
full_address_cols_to_concat=address_cols_to_concat,
missing_postcodes_method=missing_postcodes_method,
address1_extraction_method=address1_method,
landlord_year_built=landlord_year_built,
landlord_uprn=landlord_os_uprn,
landlord_property_type=landlord_property_type,
landlord_wall_construction=landlord_wall_construction,
landlord_heating_system=landlord_heating_system,
landlord_existing_pv=landlord_existing_pv
)
asset_list.init_standardise()
# We produce the new maps, which can be saved for future useage
new_property_type_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_property_type] if
asset_list.landlord_property_type else {}
).items()
if k not in PROPERTY_MAPPING
}
new_wall_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_wall_construction] if
asset_list.landlord_wall_construction else {}
).items()
if k not in WALL_CONSTRUCTION_MAPPINGS
}
new_heating_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_heating_system] if
asset_list.landlord_heating_system else {}
).items()
if k not in HEATING_MAPPINGS
}
new_existing_pv_map = {
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {}
).items()
if k not in EXISTING_PV_MAPPINGS
}
asset_list.apply_standardiation()
# We now flag properties that have been treated under existing programmes
asset_list.flag_outcomes(
outcomes_filepath=os.path.join(data_folder, outcomes_filename),
outcomes_sheetname=outcomes_sheetname,
outcomes_postcode=outcomes_postcode,
outcomes_houseno=outcomes_houseno
)
asset_list.flag_survey_master(
master_filepaths=master_filepaths,
master_to_asset_list_filepath=master_to_asset_list_filepath
)
### We retrieve the EPC data
# We chunk up this data into 5000 rows at a time
# Create the chunks directory
epc_api_only = False
force_retrieve_data = False
skip = None # Used to skip already completed chunks
chunk_size = 5000
filename = "Chunk {i}.csv"
download_folder = os.path.join(data_folder, "Chunks")
if not os.path.exists(download_folder):
os.makedirs(download_folder)
chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size))
downloaded_files = {filename.format(i=i) for i in chunk_indexes}
# We check if we have files associated to these files already and if we do, and we do not want to force the
# fetching of the data, we skip
folder_contents = os.listdir(download_folder)
if all(x in folder_contents for x in downloaded_files):
skip = max(chunk_indexes)
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only
)
# We now retrieve any failed properties
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
epc_data_failed, _, _ = get_data(
df=chunk_failed,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only
)
epc_data_chunk.extend(epc_data_failed)
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False)
# Store the errors and no-data locally
with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
with open(os.path.join(data_folder, f"Chunks/Chunk {i} nodata.csv"), "w") as f:
json.dump(no_epc_chunk, f)
# We read in and concatenate the created created chunks
# List the contents
epc_data = []
for file in downloaded_files:
csv_data = pd.read_csv(os.path.join(download_folder, file))
# We need to convert the recommendations back to a list
csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
# We don't have this if we didn't run the pulling from find my epc
if "find_my_epc_data" in csv_data.columns:
csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)
epc_data.append(csv_data)
epc_df = pd.concat(epc_data)
epc_df["estimated"] = epc_df["estimated"].fillna(False)
# We expand out the recommendations
recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]]
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
transformed_data = []
for _, row in recommendations_df.iterrows():
# Initialize a dictionary for this row with False for all recommendations
row_data = {col: False for col in columns}
row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID]
# Set True for each recommendation present in this row
for rec in row["recommendations"]:
recommendation_text = rec["improvement-summary-text"]
row_data[recommendation_text] = True
# Append the row data to transformed_data
transformed_data.append(row_data)
transformed_df = pd.DataFrame(transformed_data)
transformed_df = transformed_df[
[
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
"Floor insulation", "Floor insulation (suspended floor)"
]
]
transformed_df["epc_has_floor_recommendation"] = (
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
transformed_df["Floor insulation (suspended floor)"]
)
# Get the find my epc data
if "find_my_epc_data" not in epc_df.columns:
epc_df["find_my_epc_data"] = None
find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop(
columns=["find_my_epc_data"]).join(
pd.json_normalize(epc_df["find_my_epc_data"])
)
find_my_epc_data = find_my_epc_data.merge(
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
how="left", on=asset_list.DOMNA_PROPERTY_ID
)
# We check if we get the solar pv column:
if "Solar photovoltaics" not in find_my_epc_data.columns:
find_my_epc_data["Solar photovoltaics"] = False
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
].rename(
columns=asset_list.EPC_API_DATA_NAMES
)
# Look for columns not in the find my EPC data, which will have happened if we didn't
# retrieve it in the first place
missed_find_epc_cols = [c for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) if c not in find_my_epc_data.columns]
if missed_find_epc_cols:
for c in missed_find_epc_cols:
find_my_epc_data[c] = None
epc_df = epc_df.merge(
find_my_epc_data[
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
]
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
on=asset_list.DOMNA_PROPERTY_ID
)
asset_list.merge_data(epc_df)
asset_list.extract_attributes()
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
# TODO: We should break out the identification of work types to flag blocks of flats specifically
# TODO: Append existing outcomes onto the sheet.
asset_list.identify_worktypes(cleaned)
pprint(asset_list.work_type_figures)
asset_list.flat_analysis()
asset_list.load_contact_details(
local_filepath=os.path.join(data_folder, "Full property list wth D&V report V look up 12.2.25.xlsx"),
sheet_name="Report 1",
landlord_property_id=asset_list.landlord_property_id,
phone_number_column='Property Current Tel. Number',
fullname_column='Proeprty Current Occupant',
firstname_column=None,
lastname_column=None,
email_column=None, # TODO - we need this
)
# Convert to a format suitable for CRM
# TODO: TEMP
assigned_surveyors = pd.DataFrame(
[
{
asset_list.landlord_property_id: "02610001",
"week_commencing": "10/10/2025",
"surveyor_name": "Khalim Conn-Kowlessar",
"surveyor_email": "khalim@domna.homes",
}
]
)
# TODO: Sort the output by postcode
company_domain = "ealing.gov.uk"
crm_pipeline_name = "Survey Management"
first_dealstage = "READY TO BEGIN SCHEDULING"
# TODO - temp, upload to either SharePoint or AWS
asset_list.prepare_for_crm(
assigned_surveyors=assigned_surveyors,
company_domain=company_domain,
crm_pipeline_name=crm_pipeline_name,
first_dealstage=first_dealstage
)
hubspot_data = asset_list.hubspot_data
# Store as an excel
filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx"
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
with pd.ExcelWriter(filename) as writer:
asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False)
asset_list.flat_data.to_excel(writer, sheet_name="Flat Data", index=False)
# Store the Hubspot export as a csv
hubspot_data.to_csv(os.path.join(data_folder, "Hubspot Export.csv"), index=False)