mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
189 lines
6.8 KiB
Python
189 lines
6.8 KiB
Python
import time
|
|
import numpy as np
|
|
import pandas as pd
|
|
from backend.SearchEpc import SearchEpc
|
|
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
|
|
from tqdm import tqdm
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def get_data(
|
|
df,
|
|
manual_uprn_map,
|
|
epc_auth_token,
|
|
uprn_column,
|
|
fulladdress_column,
|
|
address1_column,
|
|
postcode_column,
|
|
property_type_column,
|
|
built_form_column,
|
|
epc_api_only=False,
|
|
row_id_name="row_id",
|
|
):
|
|
# These re-map the standard property types to forms accepted by the EPC api, so we can predict EPCs
|
|
property_type_map = {
|
|
"house": "House",
|
|
"flat": "Flat",
|
|
"maisonette": "Maisonette",
|
|
"bungalow": "Bungalow",
|
|
"block house": "House",
|
|
"coach house": "House",
|
|
"bedsit": "Flat"
|
|
}
|
|
|
|
built_form_map = {
|
|
"mid-terrace": "Mid-Terrace",
|
|
"end-terrace": "End-Terrace",
|
|
"semi-detached": "Semi-Detached",
|
|
"detached": "Detached"
|
|
}
|
|
|
|
epc_data = []
|
|
errors = []
|
|
no_epc = []
|
|
for _, home in tqdm(df.iterrows(), total=len(df)):
|
|
try:
|
|
|
|
# If we have a block of flats, we cannot retrieve this data
|
|
if home.get(property_type_column) == "block of flats":
|
|
no_epc.append(home[row_id_name])
|
|
continue
|
|
|
|
postcode = home[postcode_column]
|
|
house_number = str(home[address1_column]).strip()
|
|
full_address = home[fulladdress_column].strip()
|
|
house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode)
|
|
if house_no is None:
|
|
house_no = house_number
|
|
uprn = manual_uprn_map.get(full_address, None)
|
|
if uprn is None and home.get(uprn_column):
|
|
uprn = home[uprn_column]
|
|
|
|
if pd.isnull(uprn):
|
|
uprn = None
|
|
|
|
property_type = property_type_map.get(home.get(property_type_column), None)
|
|
built_form = built_form_map.get(home.get(built_form_column))
|
|
|
|
searcher = SearchEpc(
|
|
address1=str(house_no),
|
|
postcode=postcode,
|
|
auth_token=epc_auth_token,
|
|
os_api_key="",
|
|
property_type=None,
|
|
fast=True,
|
|
full_address=full_address,
|
|
max_retries=5,
|
|
uprn=uprn
|
|
)
|
|
# Force the skipping of estimating the EPC
|
|
# We check if the property was split
|
|
if home["is_expended_block"]:
|
|
searcher.ordnance_survey_client.property_type = "Flat"
|
|
searcher.property_type = "Flat"
|
|
searcher.set_strict_property_type_search()
|
|
else:
|
|
searcher.ordnance_survey_client.property_type = None
|
|
searcher.ordnance_survey_client.built_form = None
|
|
|
|
searcher.find_property(skip_os=True)
|
|
|
|
# Check if we have a flat or appartment
|
|
if searcher.newest_epc is None and uprn is None:
|
|
# Try again:
|
|
if SearchEpc.get_house_number(address=str(house_number), postcode=postcode) is None:
|
|
# Backup
|
|
add1 = full_address.split(",")
|
|
if len(add1) > 1:
|
|
add1 = add1[1].strip()
|
|
else:
|
|
# Try splitting on space
|
|
add1 = full_address.split(" ")[0].strip()
|
|
|
|
else:
|
|
add1 = str(house_number)
|
|
searcher = SearchEpc(
|
|
address1=add1,
|
|
postcode=postcode,
|
|
auth_token=epc_auth_token,
|
|
os_api_key="",
|
|
property_type=None,
|
|
fast=True,
|
|
full_address=full_address,
|
|
max_retries=5
|
|
)
|
|
|
|
if (
|
|
"flat" in house_number.lower() or "apartment" in house_number.lower() or "apt" in
|
|
house_number.lower()
|
|
):
|
|
searcher.ordnance_survey_client.property_type = "Flat"
|
|
|
|
searcher.find_property(skip_os=True)
|
|
|
|
# As a final resort, we estimate the EPC
|
|
if property_type is not None and searcher.newest_epc is None:
|
|
searcher.ordnance_survey_client.property_type = property_type
|
|
searcher.ordnance_survey_client.built_form = built_form
|
|
searcher.find_property(skip_os=True)
|
|
|
|
if searcher.newest_epc is None:
|
|
no_epc.append(home[row_id_name])
|
|
continue
|
|
|
|
# Look for EPC recommendatons
|
|
try:
|
|
property_recommendations = searcher.client.domestic.recommendations(searcher.newest_epc["lmk-key"])
|
|
except:
|
|
property_recommendations = {"rows": []}
|
|
|
|
if epc_api_only:
|
|
epc = {
|
|
row_id_name: home[row_id_name],
|
|
**searcher.newest_epc.copy(),
|
|
"recommendations": property_recommendations["rows"]
|
|
}
|
|
|
|
epc_data.append(epc)
|
|
continue
|
|
|
|
# Retrieve data from FindMyEPC
|
|
try:
|
|
find_epc_searcher = RetrieveFindMyEpc(
|
|
address=searcher.newest_epc["address"], postcode=searcher.newest_epc["postcode"]
|
|
)
|
|
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
|
|
except ValueError as e:
|
|
if "No EPC found" in str(e) and "address1" in searcher.newest_epc:
|
|
try:
|
|
find_epc_searcher = RetrieveFindMyEpc(
|
|
address=searcher.newest_epc["address1"], postcode=searcher.newest_epc["postcode"]
|
|
)
|
|
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
|
|
except ValueError as e:
|
|
if "No EPC found" in str(e):
|
|
find_epc_data = {}
|
|
else:
|
|
logger.error(f"Error retrieving FindMyEPC data: {e}")
|
|
raise Exception(f"Error retrieving FindMyEPC data: {e}")
|
|
else:
|
|
find_epc_data = {}
|
|
except Exception as e:
|
|
raise Exception(f"Error retrieving FindMyEPC data: {e}")
|
|
time.sleep(np.random.uniform(0.1, 1))
|
|
|
|
epc = {
|
|
row_id_name: home[row_id_name],
|
|
**searcher.newest_epc.copy(),
|
|
"recommendations": property_recommendations["rows"],
|
|
"find_my_epc_data": find_epc_data,
|
|
}
|
|
|
|
epc_data.append(epc)
|
|
except Exception as e:
|
|
errors.append(home[row_id_name])
|
|
time.sleep(5)
|
|
|
|
return epc_data, errors, no_epc
|