mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
restructuing app location
This commit is contained in:
parent
5df47a86ae
commit
d86ab5ff8d
3 changed files with 497 additions and 502 deletions
|
|
@ -1 +1,498 @@
|
|||
import os
|
||||
import time
|
||||
import json
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
import msgpack
|
||||
from utils.s3 import read_from_s3
|
||||
from asset_list.AssetList import AssetList
|
||||
from asset_list.mappings.property_type import PROPERTY_MAPPING
|
||||
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
|
||||
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
|
||||
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from backend.SearchEpc import SearchEpc
|
||||
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
||||
|
||||
load_dotenv(dotenv_path="backend/.env")
|
||||
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
|
||||
|
||||
|
||||
def get_data(
|
||||
asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map,
|
||||
uprn_column=None, epc_api_only=False, row_id_name="row_id"
|
||||
):
|
||||
epc_data = []
|
||||
errors = []
|
||||
no_epc = []
|
||||
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
|
||||
try:
|
||||
postcode = home[postcode_column]
|
||||
house_number = str(home[address1_column]).strip()
|
||||
full_address = home[fulladdress_column].strip()
|
||||
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
|
||||
if house_no is None:
|
||||
house_no = house_number
|
||||
uprn = manual_uprn_map.get(full_address, None)
|
||||
if uprn is None and home.get(uprn_column):
|
||||
uprn = home[uprn_column]
|
||||
|
||||
if pd.isnull(uprn):
|
||||
uprn = None
|
||||
|
||||
searcher = SearchEpc(
|
||||
address1=str(house_no),
|
||||
postcode=postcode,
|
||||
auth_token=EPC_AUTH_TOKEN,
|
||||
os_api_key="",
|
||||
property_type=None,
|
||||
fast=True,
|
||||
full_address=full_address,
|
||||
max_retries=5,
|
||||
uprn=uprn
|
||||
)
|
||||
# Force the skipping of estimating the EPC
|
||||
searcher.ordnance_survey_client.property_type = None
|
||||
searcher.ordnance_survey_client.built_form = None
|
||||
|
||||
searcher.find_property(skip_os=True)
|
||||
|
||||
# Check if we have a flat or appartment
|
||||
if searcher.newest_epc is None and uprn is None:
|
||||
# Try again:
|
||||
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
|
||||
# Backup
|
||||
add1 = full_address.split(",")
|
||||
if len(add1) > 1:
|
||||
add1 = add1[1].strip()
|
||||
else:
|
||||
# Try splitting on space
|
||||
add1 = full_address.split(" ")[0].strip()
|
||||
|
||||
else:
|
||||
add1 = str(house_number)
|
||||
searcher = SearchEpc(
|
||||
address1=add1,
|
||||
postcode=postcode,
|
||||
auth_token=EPC_AUTH_TOKEN,
|
||||
os_api_key="",
|
||||
property_type=None,
|
||||
fast=True,
|
||||
full_address=full_address,
|
||||
max_retries=5
|
||||
)
|
||||
|
||||
if (
|
||||
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
|
||||
house_number.lower()
|
||||
):
|
||||
searcher.ordnance_survey_client.property_type = "Flat"
|
||||
|
||||
searcher.find_property(skip_os=True)
|
||||
|
||||
if searcher.newest_epc is None:
|
||||
no_epc.append(home[row_id_name])
|
||||
continue
|
||||
|
||||
if epc_api_only:
|
||||
epc = {
|
||||
row_id_name: home[row_id_name],
|
||||
**searcher.newest_epc.copy()
|
||||
}
|
||||
|
||||
epc_data.append(epc)
|
||||
continue
|
||||
|
||||
# Look for EPC recommendatons
|
||||
try:
|
||||
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
|
||||
except:
|
||||
property_recommendations = {"rows": []}
|
||||
|
||||
# Retrieve data from FindMyEPC
|
||||
try:
|
||||
find_epc_searcher = RetrieveFindMyEpc(
|
||||
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
|
||||
)
|
||||
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
|
||||
except ValueError as e:
|
||||
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
|
||||
try:
|
||||
find_epc_searcher = RetrieveFindMyEpc(
|
||||
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
|
||||
)
|
||||
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
|
||||
except ValueError as e:
|
||||
if "No EPC found" in str(e):
|
||||
find_epc_data = {}
|
||||
else:
|
||||
find_epc_data = {}
|
||||
except Exception as e:
|
||||
raise Exception(f"Error retrieving FindMyEPC data: {e}")
|
||||
time.sleep(np.random.uniform(0.1, 1))
|
||||
|
||||
epc = {
|
||||
row_id_name: home[row_id_name],
|
||||
**searcher.newest_epc.copy(),
|
||||
"recommendations": property_recommendations["rows"],
|
||||
"find_my_epc_data": find_epc_data,
|
||||
}
|
||||
|
||||
epc_data.append(epc)
|
||||
except Exception as e:
|
||||
errors.append(home[row_id_name])
|
||||
time.sleep(5)
|
||||
|
||||
return epc_data, errors, no_epc
|
||||
|
||||
|
||||
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
|
||||
if method == "first_two_words":
|
||||
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
|
||||
return asset_list
|
||||
|
||||
if method == "first_word":
|
||||
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
|
||||
return asset_list
|
||||
|
||||
if method == "house_number_extraction":
|
||||
asset_list["address1_extracted"] = asset_list.apply(
|
||||
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
|
||||
axis=1
|
||||
)
|
||||
return asset_list
|
||||
|
||||
raise ValueError(f"Method {method} not recognized")
|
||||
|
||||
|
||||
def app():
|
||||
"""
|
||||
This app is EPC pulling data for some properties owned by Livewest
|
||||
|
||||
Data request contents:
|
||||
Date of last EPC
|
||||
Reason for EPC
|
||||
SAP score on register
|
||||
Property Type
|
||||
Property Area
|
||||
Property Age
|
||||
Any Dimensions (HLP,PW,RH)
|
||||
Property Wall Construction
|
||||
Heating Type
|
||||
Secondary Heating
|
||||
Loft Insulation Depth
|
||||
|
||||
Additional if possible:
|
||||
Heat loss calculations
|
||||
EPC recommendations
|
||||
Property UPRN
|
||||
"""
|
||||
|
||||
# TODO:
|
||||
# For cavity work:
|
||||
# - Flag any entries that have a different wall type between non-intrusive data against EPC
|
||||
# - Worth double checking entries that have a difference in wall construction
|
||||
# - Look at anything that is flagged as an empty cavity but the EPC data says it’s a filled cavity
|
||||
# - Look at the current EPC scores - Anything that is C75 or above, especially if it’s assumed no insulation
|
||||
# - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats
|
||||
# are less than C75
|
||||
# - Flag anything pre SAP2012
|
||||
# - Flag anything over 5 years old
|
||||
# - Look at year built vs age band
|
||||
#
|
||||
# For Solar:
|
||||
# - Discount any that have solar PV - based on non-intrusives and from the inspections team
|
||||
# - In the heating, discount anything that isn’t ashp, ghsp, hhrs, electric storage - possibly homes with
|
||||
# electric room heaters but it might need to be an EPC E
|
||||
# - Fabric - check the floor, wall and roof:
|
||||
# - Filled or empty cavity is good
|
||||
# - Insulated solid/timber/system built is good
|
||||
# - SCIS/CEG needs solid floors
|
||||
# - JJC don’t care
|
||||
# - Anything with a loft 200 or below
|
||||
# - Anything C75 and above won’t qualify
|
||||
# - Insulated loft = 200mm
|
||||
# - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
|
||||
# - Or the insulation required is loft/cavity (floors should be solid)
|
||||
|
||||
# For Westward
|
||||
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward"
|
||||
DATA_FILENAME = "WESTWARD - completed list..xlsx"
|
||||
SHEET_NAME = "Sheet1"
|
||||
|
||||
POSTCODE_COLUMN = "WFT EDIT Postcode"
|
||||
FULLADDRESS_COLUMN = "Address"
|
||||
ADDRESS1_COLUMN = None
|
||||
ADDRESS1_METHOD = "house_number_extraction"
|
||||
|
||||
ADDRESS_COLS_TO_CONCAT = []
|
||||
MISSING_POSTCODES_METHOD = None
|
||||
PROPERTY_YEAR_BUILT = "Build date"
|
||||
UPRN_COLUMN = "UPRN"
|
||||
# If we have the non-intrusives data, this should be true
|
||||
HAS_NON_INTRUSIVES = True
|
||||
PROPERTY_TYPE_COLUMN = "Location type" # This will be used to identify and remove bedsits
|
||||
|
||||
# Maps addresses to uprn in problematic cases
|
||||
MANUAL_UPRN_MAP = {}
|
||||
|
||||
asset_list = AssetList(
|
||||
local_filepath=os.path.join(DATA_FOLDER, DATA_FILENAME),
|
||||
header=0,
|
||||
sheet_name=SHEET_NAME,
|
||||
address1_colname=ADDRESS1_COLUMN,
|
||||
postcode_colname=POSTCODE_COLUMN,
|
||||
landlord_property_id="UPRN",
|
||||
full_address_colname=FULLADDRESS_COLUMN,
|
||||
full_address_cols_to_concat=ADDRESS_COLS_TO_CONCAT,
|
||||
missing_postcodes_method=MISSING_POSTCODES_METHOD,
|
||||
address1_extraction_method=ADDRESS1_METHOD,
|
||||
landlord_year_built=PROPERTY_YEAR_BUILT,
|
||||
landlord_uprn=UPRN_COLUMN,
|
||||
landlord_property_type=PROPERTY_TYPE_COLUMN,
|
||||
landlord_wall_construction="Wall Construction (EPC)",
|
||||
landlord_heating_system="Heat Source",
|
||||
landlord_existing_pv="PV (Y/N)"
|
||||
)
|
||||
asset_list.init_standardise()
|
||||
|
||||
# We produce the new maps, which can be saved for future useage
|
||||
|
||||
new_property_type_map = PROPERTY_MAPPING.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_property_type]
|
||||
)
|
||||
new_wall_map = WALL_CONSTRUCTION_MAPPINGS.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_wall_construction]
|
||||
)
|
||||
new_heating_map = HEATING_MAPPINGS.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_heating_system]
|
||||
)
|
||||
new_existing_pv_map = EXISTING_PV_MAPPINGS.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_existing_pv]
|
||||
)
|
||||
|
||||
asset_list.apply_standardiation()
|
||||
|
||||
# DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester"
|
||||
# DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx"
|
||||
# SHEET_NAME = "Sheet1"
|
||||
# POSTCODE_COLUMN = 'Full Address.1'
|
||||
# FULLADDRESS_COLUMN = "Full Address"
|
||||
# ADDRESS1_COLUMN = None
|
||||
# ADDRESS1_METHOD = "first_word"
|
||||
# ADDRESS_COLS_TO_CONCAT = []
|
||||
# MISSING_POSTCODES_METHOD = None
|
||||
# PROPERTY_YEAR_BUILT = "Build Date"
|
||||
# UPRN_COLUMN = None
|
||||
# # If we have the non-intrusives data, this should be true
|
||||
# HAS_NON_INTRUSIVES = True
|
||||
|
||||
### We retrieve the EPC data
|
||||
|
||||
# We chunk up this data into 5000 rows at a time
|
||||
# Create the chunks directory
|
||||
force_retrieve_data = False
|
||||
skip = None # Used to skip already completed chunks
|
||||
chunk_size = 5000
|
||||
filename = "Chunk {i}.csv"
|
||||
download_folder = os.path.join(DATA_FOLDER, "Chunks")
|
||||
if not os.path.exists(download_folder):
|
||||
os.makedirs(download_folder)
|
||||
|
||||
chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size))
|
||||
downloaded_files = {filename.format(i=i) for i in chunk_indexes}
|
||||
|
||||
# We check if we have files associated to these files already and if we do, and we do not want to force the
|
||||
# fetching of the data, we skip
|
||||
folder_contents = os.listdir(download_folder)
|
||||
if all(x in folder_contents for x in downloaded_files):
|
||||
skip = max(chunk_indexes)
|
||||
|
||||
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
|
||||
print(f"Processing chunk {i} to {i + chunk_size}")
|
||||
if skip is not None and not force_retrieve_data:
|
||||
if i <= skip:
|
||||
continue
|
||||
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
|
||||
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
|
||||
asset_list=chunk,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
fulladdress_column=asset_list.STANDARD_FULL_ADDRESS,
|
||||
address1_column=asset_list.STANDARD_ADDRESS_1,
|
||||
postcode_column=asset_list.STANDARD_POSTCODE,
|
||||
manual_uprn_map=MANUAL_UPRN_MAP,
|
||||
uprn_column=asset_list.STANDARD_UPRN
|
||||
)
|
||||
|
||||
# We now retrieve any failed properties
|
||||
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
|
||||
epc_data_failed, _, _ = get_data(
|
||||
asset_list=chunk_failed,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
fulladdress_column=FULLADDRESS_COLUMN,
|
||||
address1_column=ADDRESS1_COLUMN,
|
||||
postcode_column=POSTCODE_COLUMN,
|
||||
manual_uprn_map=MANUAL_UPRN_MAP,
|
||||
epc_api_only=False
|
||||
)
|
||||
|
||||
epc_data_chunk.extend(epc_data_failed)
|
||||
|
||||
# Append the failed data to the main data
|
||||
# Store the chunk locally as a csv
|
||||
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False)
|
||||
# Store the errors and no-data locally
|
||||
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f:
|
||||
json.dump(errors_chunk, f)
|
||||
|
||||
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f:
|
||||
json.dump(no_epc_chunk, f)
|
||||
|
||||
# We read in and concatenate the created created chunks
|
||||
# List the contents
|
||||
epc_data = []
|
||||
for file in downloaded_files:
|
||||
csv_data = pd.read_csv(os.path.join(download_folder, file))
|
||||
# We need to convert the recommendations back to a list
|
||||
csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
|
||||
csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)
|
||||
epc_data.append(csv_data)
|
||||
|
||||
epc_df = pd.concat(epc_data)
|
||||
# TODO: TEMP!!!
|
||||
epc_df = epc_df.rename(columns={"row_id": asset_list.DOMNA_PROPERTY_ID})
|
||||
|
||||
# We expand out the recommendations
|
||||
recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]]
|
||||
|
||||
unique_recommendations = set()
|
||||
for _, row in recommendations_df.iterrows():
|
||||
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
|
||||
|
||||
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
|
||||
transformed_data = []
|
||||
for _, row in recommendations_df.iterrows():
|
||||
# Initialize a dictionary for this row with False for all recommendations
|
||||
row_data = {col: False for col in columns}
|
||||
row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID]
|
||||
|
||||
# Set True for each recommendation present in this row
|
||||
for rec in row["recommendations"]:
|
||||
recommendation_text = rec["improvement-summary-text"]
|
||||
row_data[recommendation_text] = True
|
||||
|
||||
# Append the row data to transformed_data
|
||||
transformed_data.append(row_data)
|
||||
|
||||
transformed_df = pd.DataFrame(transformed_data)
|
||||
transformed_df = transformed_df[
|
||||
[
|
||||
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
|
||||
"Floor insulation", "Floor insulation (suspended floor)"
|
||||
]
|
||||
]
|
||||
|
||||
transformed_df["epc_has_floor_recommendation"] = (
|
||||
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
|
||||
transformed_df["Floor insulation (suspended floor)"]
|
||||
)
|
||||
|
||||
# Get the find my epc data
|
||||
find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop(
|
||||
columns=["find_my_epc_data"]).join(
|
||||
pd.json_normalize(epc_df["find_my_epc_data"])
|
||||
)
|
||||
find_my_epc_data = find_my_epc_data.merge(
|
||||
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
|
||||
how="left", on=asset_list.DOMNA_PROPERTY_ID
|
||||
)
|
||||
|
||||
# We check if we get the solar pv column:
|
||||
if "Solar photovoltaics" not in find_my_epc_data.columns:
|
||||
find_my_epc_data["Solar photovoltaics"] = False
|
||||
|
||||
# Retrieve just the data we need
|
||||
epc_df = epc_df[
|
||||
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
|
||||
].rename(
|
||||
columns=asset_list.EPC_API_DATA_NAMES
|
||||
)
|
||||
|
||||
epc_df = epc_df.merge(
|
||||
find_my_epc_data[
|
||||
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
|
||||
]
|
||||
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
how="left",
|
||||
on=asset_list.DOMNA_PROPERTY_ID
|
||||
)
|
||||
|
||||
asset_list.merge_data(epc_df)
|
||||
# TODO: TEMP!!!
|
||||
epc_df["epc_os_uprn"] = epc_df["epc_os_uprn"].astype("Int64").astype(str)
|
||||
asset_list.standardised_asset_list = asset_list.standardised_asset_list.merge(
|
||||
epc_df.drop(columns=["domna_property_id"]), how="left", left_on="ordnance_survey_uprn", right_on="epc_os_uprn"
|
||||
)
|
||||
|
||||
asset_list.extract_attributes()
|
||||
|
||||
cleaned = read_from_s3(
|
||||
s3_file_name="cleaned_epc_data/cleaned.bson",
|
||||
bucket_name="retrofit-data-dev"
|
||||
)
|
||||
cleaned = msgpack.unpackb(cleaned, raw=False)
|
||||
|
||||
asset_list.identify_worktypes(cleaned)
|
||||
|
||||
# TODO: We should do this breakdown for flats
|
||||
def flat_analysis(asset_list):
|
||||
|
||||
# We need to deduce the building name - we strip out the house number
|
||||
def extract_building_name(x):
|
||||
# TODO: This doesn't really work
|
||||
if pd.isnull(x):
|
||||
return None
|
||||
house_no = SearchEpc.get_house_number(address=x, postcode=None)
|
||||
if house_no:
|
||||
return x.replace(house_no, "").strip()
|
||||
return x.split(",")[0].strip()
|
||||
|
||||
# We want to deduce if flats have 50% of the properties below C75
|
||||
# We group by postcode and property type
|
||||
grouped = asset_list.groupby([POSTCODE_COLUMN, "Property Type"])
|
||||
|
||||
flat_data = []
|
||||
for _, group in grouped:
|
||||
if "flat" in group["Property Type"].str.lower().values:
|
||||
num_flats = group["Property Type"].str.lower().value_counts().get("flat", 0)
|
||||
num_below_c75 = group["SAP score on register"].lt(75).sum()
|
||||
|
||||
flat_data.append(
|
||||
{
|
||||
"Postcode": group[POSTCODE_COLUMN].iloc[0],
|
||||
"Property Type": "Flat",
|
||||
"Number of Flats with EPC": num_flats,
|
||||
"Number of Flats below C75": num_below_c75,
|
||||
"Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats)
|
||||
}
|
||||
)
|
||||
|
||||
flat_data = pd.DataFrame(flat_data)
|
||||
|
||||
return flat_data
|
||||
|
||||
flat_data = flat_analysis(asset_list)
|
||||
|
||||
# Store as an excel
|
||||
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
|
||||
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
|
||||
|
||||
with pd.ExcelWriter(filename) as writer:
|
||||
asset_list.to_excel(writer, sheet_name="EPC Data", index=False)
|
||||
flat_data.to_excel(writer, sheet_name="Flat Data", index=False)
|
||||
|
||||
matches_review = asset_list[
|
||||
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,502 +0,0 @@
|
|||
import os
|
||||
import time
|
||||
import json
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
import msgpack
|
||||
from utils.s3 import read_from_s3
|
||||
from asset_list.AssetList import AssetList
|
||||
from asset_list.mappings.property_type import PROPERTY_MAPPING
|
||||
from asset_list.mappings.walls import WALL_CONSTRUCTION_MAPPINGS
|
||||
from asset_list.mappings.heating_systems import HEATING_MAPPINGS
|
||||
from asset_list.mappings.exising_pv import EXISTING_PV_MAPPINGS
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from backend.SearchEpc import SearchEpc
|
||||
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
||||
|
||||
from etl.epc_clean.epc_attributes.attribute_utils import (
|
||||
extract_thermal_transmittance
|
||||
)
|
||||
|
||||
load_dotenv(dotenv_path="backend/.env")
|
||||
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
|
||||
|
||||
|
||||
def get_data(
|
||||
asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map,
|
||||
uprn_column=None, epc_api_only=False, row_id_name="row_id"
|
||||
):
|
||||
epc_data = []
|
||||
errors = []
|
||||
no_epc = []
|
||||
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
|
||||
try:
|
||||
postcode = home[postcode_column]
|
||||
house_number = str(home[address1_column]).strip()
|
||||
full_address = home[fulladdress_column].strip()
|
||||
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
|
||||
if house_no is None:
|
||||
house_no = house_number
|
||||
uprn = manual_uprn_map.get(full_address, None)
|
||||
if uprn is None and home.get(uprn_column):
|
||||
uprn = home[uprn_column]
|
||||
|
||||
if pd.isnull(uprn):
|
||||
uprn = None
|
||||
|
||||
searcher = SearchEpc(
|
||||
address1=str(house_no),
|
||||
postcode=postcode,
|
||||
auth_token=EPC_AUTH_TOKEN,
|
||||
os_api_key="",
|
||||
property_type=None,
|
||||
fast=True,
|
||||
full_address=full_address,
|
||||
max_retries=5,
|
||||
uprn=uprn
|
||||
)
|
||||
# Force the skipping of estimating the EPC
|
||||
searcher.ordnance_survey_client.property_type = None
|
||||
searcher.ordnance_survey_client.built_form = None
|
||||
|
||||
searcher.find_property(skip_os=True)
|
||||
|
||||
# Check if we have a flat or appartment
|
||||
if searcher.newest_epc is None and uprn is None:
|
||||
# Try again:
|
||||
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
|
||||
# Backup
|
||||
add1 = full_address.split(",")
|
||||
if len(add1) > 1:
|
||||
add1 = add1[1].strip()
|
||||
else:
|
||||
# Try splitting on space
|
||||
add1 = full_address.split(" ")[0].strip()
|
||||
|
||||
else:
|
||||
add1 = str(house_number)
|
||||
searcher = SearchEpc(
|
||||
address1=add1,
|
||||
postcode=postcode,
|
||||
auth_token=EPC_AUTH_TOKEN,
|
||||
os_api_key="",
|
||||
property_type=None,
|
||||
fast=True,
|
||||
full_address=full_address,
|
||||
max_retries=5
|
||||
)
|
||||
|
||||
if (
|
||||
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
|
||||
house_number.lower()
|
||||
):
|
||||
searcher.ordnance_survey_client.property_type = "Flat"
|
||||
|
||||
searcher.find_property(skip_os=True)
|
||||
|
||||
if searcher.newest_epc is None:
|
||||
no_epc.append(home[row_id_name])
|
||||
continue
|
||||
|
||||
if epc_api_only:
|
||||
epc = {
|
||||
row_id_name: home[row_id_name],
|
||||
**searcher.newest_epc.copy()
|
||||
}
|
||||
|
||||
epc_data.append(epc)
|
||||
continue
|
||||
|
||||
# Look for EPC recommendatons
|
||||
try:
|
||||
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
|
||||
except:
|
||||
property_recommendations = {"rows": []}
|
||||
|
||||
# Retrieve data from FindMyEPC
|
||||
try:
|
||||
find_epc_searcher = RetrieveFindMyEpc(
|
||||
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
|
||||
)
|
||||
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
|
||||
except ValueError as e:
|
||||
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
|
||||
try:
|
||||
find_epc_searcher = RetrieveFindMyEpc(
|
||||
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
|
||||
)
|
||||
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
|
||||
except ValueError as e:
|
||||
if "No EPC found" in str(e):
|
||||
find_epc_data = {}
|
||||
else:
|
||||
find_epc_data = {}
|
||||
except Exception as e:
|
||||
raise Exception(f"Error retrieving FindMyEPC data: {e}")
|
||||
time.sleep(np.random.uniform(0.1, 1))
|
||||
|
||||
epc = {
|
||||
row_id_name: home[row_id_name],
|
||||
**searcher.newest_epc.copy(),
|
||||
"recommendations": property_recommendations["rows"],
|
||||
"find_my_epc_data": find_epc_data,
|
||||
}
|
||||
|
||||
epc_data.append(epc)
|
||||
except Exception as e:
|
||||
errors.append(home[row_id_name])
|
||||
time.sleep(5)
|
||||
|
||||
return epc_data, errors, no_epc
|
||||
|
||||
|
||||
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
|
||||
if method == "first_two_words":
|
||||
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
|
||||
return asset_list
|
||||
|
||||
if method == "first_word":
|
||||
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
|
||||
return asset_list
|
||||
|
||||
if method == "house_number_extraction":
|
||||
asset_list["address1_extracted"] = asset_list.apply(
|
||||
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
|
||||
axis=1
|
||||
)
|
||||
return asset_list
|
||||
|
||||
raise ValueError(f"Method {method} not recognized")
|
||||
|
||||
|
||||
def app():
|
||||
"""
|
||||
This app is EPC pulling data for some properties owned by Livewest
|
||||
|
||||
Data request contents:
|
||||
Date of last EPC
|
||||
Reason for EPC
|
||||
SAP score on register
|
||||
Property Type
|
||||
Property Area
|
||||
Property Age
|
||||
Any Dimensions (HLP,PW,RH)
|
||||
Property Wall Construction
|
||||
Heating Type
|
||||
Secondary Heating
|
||||
Loft Insulation Depth
|
||||
|
||||
Additional if possible:
|
||||
Heat loss calculations
|
||||
EPC recommendations
|
||||
Property UPRN
|
||||
"""
|
||||
|
||||
# TODO:
|
||||
# For cavity work:
|
||||
# - Flag any entries that have a different wall type between non-intrusive data against EPC
|
||||
# - Worth double checking entries that have a difference in wall construction
|
||||
# - Look at anything that is flagged as an empty cavity but the EPC data says it’s a filled cavity
|
||||
# - Look at the current EPC scores - Anything that is C75 or above, especially if it’s assumed no insulation
|
||||
# - By postcode, we can try and deduce if all of the addresses are a flats and then estimate if 50% of the flats
|
||||
# are less than C75
|
||||
# - Flag anything pre SAP2012
|
||||
# - Flag anything over 5 years old
|
||||
# - Look at year built vs age band
|
||||
#
|
||||
# For Solar:
|
||||
# - Discount any that have solar PV - based on non-intrusives and from the inspections team
|
||||
# - In the heating, discount anything that isn’t ashp, ghsp, hhrs, electric storage - possibly homes with
|
||||
# electric room heaters but it might need to be an EPC E
|
||||
# - Fabric - check the floor, wall and roof:
|
||||
# - Filled or empty cavity is good
|
||||
# - Insulated solid/timber/system built is good
|
||||
# - SCIS/CEG needs solid floors
|
||||
# - JJC don’t care
|
||||
# - Anything with a loft 200 or below
|
||||
# - Anything C75 and above won’t qualify
|
||||
# - Insulated loft = 200mm
|
||||
# - We want: fully insulated property (all wall types), EPC D or below (floors should be solid)
|
||||
# - Or the insulation required is loft/cavity (floors should be solid)
|
||||
|
||||
# For Westward
|
||||
DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward"
|
||||
DATA_FILENAME = "WESTWARD - completed list..xlsx"
|
||||
SHEET_NAME = "Sheet1"
|
||||
|
||||
POSTCODE_COLUMN = "WFT EDIT Postcode"
|
||||
FULLADDRESS_COLUMN = "Address"
|
||||
ADDRESS1_COLUMN = None
|
||||
ADDRESS1_METHOD = "house_number_extraction"
|
||||
|
||||
ADDRESS_COLS_TO_CONCAT = []
|
||||
MISSING_POSTCODES_METHOD = None
|
||||
PROPERTY_YEAR_BUILT = "Build date"
|
||||
UPRN_COLUMN = "UPRN"
|
||||
# If we have the non-intrusives data, this should be true
|
||||
HAS_NON_INTRUSIVES = True
|
||||
PROPERTY_TYPE_COLUMN = "Location type" # This will be used to identify and remove bedsits
|
||||
|
||||
# Maps addresses to uprn in problematic cases
|
||||
MANUAL_UPRN_MAP = {}
|
||||
|
||||
asset_list = AssetList(
|
||||
local_filepath=os.path.join(DATA_FOLDER, DATA_FILENAME),
|
||||
header=0,
|
||||
sheet_name=SHEET_NAME,
|
||||
address1_colname=ADDRESS1_COLUMN,
|
||||
postcode_colname=POSTCODE_COLUMN,
|
||||
landlord_property_id="UPRN",
|
||||
full_address_colname=FULLADDRESS_COLUMN,
|
||||
full_address_cols_to_concat=ADDRESS_COLS_TO_CONCAT,
|
||||
missing_postcodes_method=MISSING_POSTCODES_METHOD,
|
||||
address1_extraction_method=ADDRESS1_METHOD,
|
||||
landlord_year_built=PROPERTY_YEAR_BUILT,
|
||||
landlord_uprn=UPRN_COLUMN,
|
||||
landlord_property_type=PROPERTY_TYPE_COLUMN,
|
||||
landlord_wall_construction="Wall Construction (EPC)",
|
||||
landlord_heating_system="Heat Source",
|
||||
landlord_existing_pv="PV (Y/N)"
|
||||
)
|
||||
asset_list.init_standardise()
|
||||
|
||||
# We produce the new maps, which can be saved for future useage
|
||||
|
||||
new_property_type_map = PROPERTY_MAPPING.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_property_type]
|
||||
)
|
||||
new_wall_map = WALL_CONSTRUCTION_MAPPINGS.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_wall_construction]
|
||||
)
|
||||
new_heating_map = HEATING_MAPPINGS.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_heating_system]
|
||||
)
|
||||
new_existing_pv_map = EXISTING_PV_MAPPINGS.copy().update(
|
||||
asset_list.variable_mappings[asset_list.landlord_existing_pv]
|
||||
)
|
||||
|
||||
asset_list.apply_standardiation()
|
||||
|
||||
# DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester"
|
||||
# DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx"
|
||||
# SHEET_NAME = "Sheet1"
|
||||
# POSTCODE_COLUMN = 'Full Address.1'
|
||||
# FULLADDRESS_COLUMN = "Full Address"
|
||||
# ADDRESS1_COLUMN = None
|
||||
# ADDRESS1_METHOD = "first_word"
|
||||
# ADDRESS_COLS_TO_CONCAT = []
|
||||
# MISSING_POSTCODES_METHOD = None
|
||||
# PROPERTY_YEAR_BUILT = "Build Date"
|
||||
# UPRN_COLUMN = None
|
||||
# # If we have the non-intrusives data, this should be true
|
||||
# HAS_NON_INTRUSIVES = True
|
||||
|
||||
### We retrieve the EPC data
|
||||
|
||||
# We chunk up this data into 5000 rows at a time
|
||||
# Create the chunks directory
|
||||
force_retrieve_data = False
|
||||
skip = None # Used to skip already completed chunks
|
||||
chunk_size = 5000
|
||||
filename = "Chunk {i}.csv"
|
||||
download_folder = os.path.join(DATA_FOLDER, "Chunks")
|
||||
if not os.path.exists(download_folder):
|
||||
os.makedirs(download_folder)
|
||||
|
||||
chunk_indexes = list(range(0, len(asset_list.standardised_asset_list), chunk_size))
|
||||
downloaded_files = {filename.format(i=i) for i in chunk_indexes}
|
||||
|
||||
# We check if we have files associated to these files already and if we do, and we do not want to force the
|
||||
# fetching of the data, we skip
|
||||
folder_contents = os.listdir(download_folder)
|
||||
if all(x in folder_contents for x in downloaded_files):
|
||||
skip = max(chunk_indexes)
|
||||
|
||||
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
|
||||
print(f"Processing chunk {i} to {i + chunk_size}")
|
||||
if skip is not None and not force_retrieve_data:
|
||||
if i <= skip:
|
||||
continue
|
||||
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
|
||||
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
|
||||
asset_list=chunk,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
fulladdress_column=asset_list.STANDARD_FULL_ADDRESS,
|
||||
address1_column=asset_list.STANDARD_ADDRESS_1,
|
||||
postcode_column=asset_list.STANDARD_POSTCODE,
|
||||
manual_uprn_map=MANUAL_UPRN_MAP,
|
||||
uprn_column=asset_list.STANDARD_UPRN
|
||||
)
|
||||
|
||||
# We now retrieve any failed properties
|
||||
chunk_failed = chunk[chunk[asset_list.DOMNA_PROPERTY_ID].isin(errors_chunk)]
|
||||
epc_data_failed, _, _ = get_data(
|
||||
asset_list=chunk_failed,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
fulladdress_column=FULLADDRESS_COLUMN,
|
||||
address1_column=ADDRESS1_COLUMN,
|
||||
postcode_column=POSTCODE_COLUMN,
|
||||
manual_uprn_map=MANUAL_UPRN_MAP,
|
||||
epc_api_only=False
|
||||
)
|
||||
|
||||
epc_data_chunk.extend(epc_data_failed)
|
||||
|
||||
# Append the failed data to the main data
|
||||
# Store the chunk locally as a csv
|
||||
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i}.csv"), index=False)
|
||||
# Store the errors and no-data locally
|
||||
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} errors.json"), "w") as f:
|
||||
json.dump(errors_chunk, f)
|
||||
|
||||
with open(os.path.join(DATA_FOLDER, f"Chunks/Chunk {i} nodata.csv"), "w") as f:
|
||||
json.dump(no_epc_chunk, f)
|
||||
|
||||
# We read in and concatenate the created created chunks
|
||||
# List the contents
|
||||
epc_data = []
|
||||
for file in downloaded_files:
|
||||
csv_data = pd.read_csv(os.path.join(download_folder, file))
|
||||
# We need to convert the recommendations back to a list
|
||||
csv_data["recommendations"] = csv_data["recommendations"].apply(eval)
|
||||
csv_data["find_my_epc_data"] = csv_data["find_my_epc_data"].apply(eval)
|
||||
epc_data.append(csv_data)
|
||||
|
||||
epc_df = pd.concat(epc_data)
|
||||
# TODO: TEMP!!!
|
||||
epc_df = epc_df.rename(columns={"row_id": asset_list.DOMNA_PROPERTY_ID})
|
||||
|
||||
# We expand out the recommendations
|
||||
recommendations_df = epc_df[[asset_list.DOMNA_PROPERTY_ID, "recommendations"]]
|
||||
|
||||
unique_recommendations = set()
|
||||
for _, row in recommendations_df.iterrows():
|
||||
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
|
||||
|
||||
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
|
||||
transformed_data = []
|
||||
for _, row in recommendations_df.iterrows():
|
||||
# Initialize a dictionary for this row with False for all recommendations
|
||||
row_data = {col: False for col in columns}
|
||||
row_data[asset_list.DOMNA_PROPERTY_ID] = row[asset_list.DOMNA_PROPERTY_ID]
|
||||
|
||||
# Set True for each recommendation present in this row
|
||||
for rec in row["recommendations"]:
|
||||
recommendation_text = rec["improvement-summary-text"]
|
||||
row_data[recommendation_text] = True
|
||||
|
||||
# Append the row data to transformed_data
|
||||
transformed_data.append(row_data)
|
||||
|
||||
transformed_df = pd.DataFrame(transformed_data)
|
||||
transformed_df = transformed_df[
|
||||
[
|
||||
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
|
||||
"Floor insulation", "Floor insulation (suspended floor)"
|
||||
]
|
||||
]
|
||||
|
||||
transformed_df["epc_has_floor_recommendation"] = (
|
||||
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
|
||||
transformed_df["Floor insulation (suspended floor)"]
|
||||
)
|
||||
|
||||
# Get the find my epc data
|
||||
find_my_epc_data = epc_df[[asset_list.DOMNA_PROPERTY_ID, "find_my_epc_data"]].drop(
|
||||
columns=["find_my_epc_data"]).join(
|
||||
pd.json_normalize(epc_df["find_my_epc_data"])
|
||||
)
|
||||
find_my_epc_data = find_my_epc_data.merge(
|
||||
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
|
||||
how="left", on=asset_list.DOMNA_PROPERTY_ID
|
||||
)
|
||||
|
||||
# We check if we get the solar pv column:
|
||||
if "Solar photovoltaics" not in find_my_epc_data.columns:
|
||||
find_my_epc_data["Solar photovoltaics"] = False
|
||||
|
||||
# Retrieve just the data we need
|
||||
epc_df = epc_df[
|
||||
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
|
||||
].rename(
|
||||
columns=asset_list.EPC_API_DATA_NAMES
|
||||
)
|
||||
|
||||
epc_df = epc_df.merge(
|
||||
find_my_epc_data[
|
||||
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
|
||||
]
|
||||
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
how="left",
|
||||
on=asset_list.DOMNA_PROPERTY_ID
|
||||
)
|
||||
|
||||
asset_list.merge_data(epc_df)
|
||||
# TODO: TEMP!!!
|
||||
epc_df["epc_os_uprn"] = epc_df["epc_os_uprn"].astype("Int64").astype(str)
|
||||
asset_list.standardised_asset_list = asset_list.standardised_asset_list.merge(
|
||||
epc_df.drop(columns=["domna_property_id"]), how="left", left_on="ordnance_survey_uprn", right_on="epc_os_uprn"
|
||||
)
|
||||
|
||||
asset_list.extract_attributes()
|
||||
|
||||
cleaned = read_from_s3(
|
||||
s3_file_name="cleaned_epc_data/cleaned.bson",
|
||||
bucket_name="retrofit-data-dev"
|
||||
)
|
||||
cleaned = msgpack.unpackb(cleaned, raw=False)
|
||||
|
||||
asset_list.identify_worktypes(cleaned)
|
||||
|
||||
# TODO: We should do this breakdown for flats
|
||||
def flat_analysis(asset_list):
|
||||
|
||||
# We need to deduce the building name - we strip out the house number
|
||||
def extract_building_name(x):
|
||||
# TODO: This doesn't really work
|
||||
if pd.isnull(x):
|
||||
return None
|
||||
house_no = SearchEpc.get_house_number(address=x, postcode=None)
|
||||
if house_no:
|
||||
return x.replace(house_no, "").strip()
|
||||
return x.split(",")[0].strip()
|
||||
|
||||
# We want to deduce if flats have 50% of the properties below C75
|
||||
# We group by postcode and property type
|
||||
grouped = asset_list.groupby([POSTCODE_COLUMN, "Property Type"])
|
||||
|
||||
flat_data = []
|
||||
for _, group in grouped:
|
||||
if "flat" in group["Property Type"].str.lower().values:
|
||||
num_flats = group["Property Type"].str.lower().value_counts().get("flat", 0)
|
||||
num_below_c75 = group["SAP score on register"].lt(75).sum()
|
||||
|
||||
flat_data.append(
|
||||
{
|
||||
"Postcode": group[POSTCODE_COLUMN].iloc[0],
|
||||
"Property Type": "Flat",
|
||||
"Number of Flats with EPC": num_flats,
|
||||
"Number of Flats below C75": num_below_c75,
|
||||
"Proportion of Flat EPCs below C75": round(100 * num_below_c75 / num_flats)
|
||||
}
|
||||
)
|
||||
|
||||
flat_data = pd.DataFrame(flat_data)
|
||||
|
||||
return flat_data
|
||||
|
||||
flat_data = flat_analysis(asset_list)
|
||||
|
||||
# Store as an excel
|
||||
filename = os.path.join(DATA_FOLDER, ".".join(DATA_FILENAME.split(".")[:-1])) + " EPC Data Pull.xlsx"
|
||||
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
|
||||
|
||||
with pd.ExcelWriter(filename) as writer:
|
||||
asset_list.to_excel(writer, sheet_name="EPC Data", index=False)
|
||||
flat_data.to_excel(writer, sheet_name="Flat Data", index=False)
|
||||
|
||||
matches_review = asset_list[
|
||||
[FULLADDRESS_COLUMN, ADDRESS1_COLUMN, POSTCODE_COLUMN, "Address on EPC", "Postcode on EPC"]
|
||||
]
|
||||
Loading…
Add table
Reference in a new issue