set up ordnance survey pipeline for stonewater

This commit is contained in:
Khalim Conn-Kowlessar 2024-06-07 11:46:08 +01:00
parent 236aaa1f1c
commit 8e33e8bce4
3 changed files with 238 additions and 11 deletions

View file

@ -191,15 +191,14 @@ class SearchEpc:
self.property_type = property_type
self.fast = fast
@classmethod
def get_house_number(cls, address: str) -> str | None:
@staticmethod
def get_house_number(address: str) -> str | None:
"""
This method uses the usaddress library to parse an address and extract the primary house or flat number.
"""
try:
# Custom regex to catch a broad range of cases
pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)'
# Updated regex to catch house numbers including alphanumeric ones
pattern = r'(?i)(?:flat|apartment)\s*(\d+\w*)|^\s*(\d+\w*)'
match = re.search(pattern, address)
if match:
return next(g for g in match.groups() if g is not None)

View file

@ -1206,3 +1206,41 @@ def check_mds(results, input_properties, recommendations, optimise_measures):
hhr_check = pd.DataFrame(hhr_check)
return walls_check, hhr_check
from utils.s3 import read_dataframe_from_s3_parquet
z = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev",
file_key="sap_change_model/2024-05-28-19-08-25/dataset_rooms.parquet"
)
k = z[z["heat_demand_ending"] != z["heat_demand_starting"]]
k = k[k["walls_thermal_transmittance"] == k["walls_thermal_transmittance_ending"]]
k = k[k["roof_thermal_transmittance"] == k["roof_thermal_transmittance_ending"]]
k = k[k["floor_thermal_transmittance"] == k["floor_thermal_transmittance_ending"]]
ending_cols = [c for c in k.columns if "_ending" in c]
eg = k.head(2).tail(1).squeeze()
diff = []
for c in ending_cols:
split = c.split("_ending")[0]
if split + "_starting" in k.columns:
starting_col = split + "_starting"
else:
starting_col = split
b4 = eg[starting_col]
after = eg[c]
if b4 != after:
diff.append(
{
"measure": split,
"starting": b4,
"ending": after
}
)
diff = pd.DataFrame(diff)
eg["heat_demand_starting"]
eg["heat_demand_ending"]
eg["uprn"]

View file

@ -1,5 +1,50 @@
import json
from tqdm import tqdm
from fuzzywuzzy import fuzz
import numpy as np
import pandas as pd
import time
from utils.s3 import save_data_to_s3, read_excel_from_s3, read_from_s3
def remove_commas_and_full_stops(input_string: str) -> str:
"""
Removes commas and full stops from the input string.
Args:
input_string (str): The string from which to remove commas and full stops.
Returns:
str: The string with commas and full stops removed.
"""
return input_string.replace(',', '').replace('.', '')
def get_places_with_retry(searcher, max_retries=5, wait_time=2):
"""
Tries to call the get_places_api method up to max_retries times,
with a wait_time interval between attempts in case of failure.
Args:
searcher (object): The searcher object with the ordnance_survey_client.
max_retries (int): Maximum number of retry attempts.
wait_time (int): Wait time in seconds between retries.
Returns:
result: The result from the get_places_api method or None if all attempts fail.
"""
for attempt in range(max_retries):
try:
result = searcher.ordnance_survey_client.get_places_api()
return result # Return the result if successful
except Exception as e:
print(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
print(f"All {max_retries} attempts failed.")
return None
def app():
@ -16,6 +61,12 @@ def app():
"/Users/khalimconn-kowlessar/Downloads/Stonewater SHDF_3_0_Board Triage 22.05.24.xlsx", header=4
)
# asset_list = read_excel_from_s3(
# file_key="customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24.xlsx",
# bucket_name="retrofit-data-dev",
# header_row=4
# )
# Drop the bottom 4 rows, which are completely missing
asset_list = asset_list.head(-4)
@ -62,12 +113,12 @@ def app():
asset_list["address1"] + ", " +
asset_list["address2"] + ", " +
asset_list["city_town"].str.title() + ", " +
asset_list["county"] + ", " +
# asset_list["county"] + ", " +
asset_list["postcode"]
),
asset_list["address1"] + ", " +
asset_list["city_town"].str.title() + ", " +
asset_list["county"] + ", " +
# asset_list["county"] + ", " +
asset_list["postcode"]
)
@ -89,13 +140,14 @@ def app():
# Perform an initial pull without ordnance survey data
epc_data = []
older_epc_data = {}
for row_number, asset in asset_list.iterrows():
for row_number, asset in tqdm(asset_list.iterrows(), total=len(asset_list)):
searcher = SearchEpc(
address1=asset["address1"],
postcode=asset["postcode"],
address1=str(asset["address1"]),
postcode=str(asset["postcode"]),
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
full_address=asset["full_address"],
full_address=str(asset["full_address"]),
uprn=asset.get("uprn", None),
)
searcher.find_property(skip_os=True)
@ -112,3 +164,141 @@ def app():
if searcher.older_epcs is not None:
older_epc_data[asset["internal_id"]] = searcher.older_epcs
# # Store to S3
# save_data_to_s3(
# data=json.dumps(epc_data),
# s3_file_name="customers/Stonewater/clustering/epc_data.json",
# bucket_name="retrofit-data-dev"
# )
#
# save_data_to_s3(
# data=json.dumps(older_epc_data),
# s3_file_name="customers/Stonewater/clustering/old_epc_data.json",
# bucket_name="retrofit-data-dev"
# )
# We read this directly from s3
epc_data = json.loads(
read_from_s3(
bucket_name="retrofit-data-dev",
s3_file_name="customers/Stonewater/clustering/epc_data.json"
)
)
older_epc_data = json.loads(
read_from_s3(
bucket_name="retrofit-data-dev",
s3_file_name="customers/Stonewater/clustering/old_epc_data.json"
)
)
# TODO: Perform a comparison between the EPC address and the asset list address, just to double check
epc_data_df = pd.DataFrame(epc_data)
address_comparison = (
asset_list[["internal_id", "full_address", "postcode", "house_number", "address1"]].merge(
epc_data_df[["internal_id", "address", "postcode", "address1"]].rename(
columns={
"address": "epc_address",
"postcode": "epc_postcode",
"address1": "epc_address1"
}
),
how="inner",
on="internal_id"
)
)
# Produce a metric, showing the matching confidence between the two
address_comparison["epc_extracted_house_number"] = address_comparison["epc_address1"].apply(
lambda x: SearchEpc.get_house_number(x)
)
address_comparison["house_numbers_match"] = (
address_comparison["house_number"].str.lower() == address_comparison["epc_extracted_house_number"].str.lower()
)
# We also produce a address similarity metric
# We convert the strings to lower and remove common punctuation
address_comparison["address_similarity_score"] = address_comparison.apply(
lambda x: fuzz.ratio(
remove_commas_and_full_stops(x["address1"].lower()),
remove_commas_and_full_stops(x["epc_address1"].lower())
),
axis=1
)
address_comparison = address_comparison.sort_values("address_similarity_score", ascending=True)
address_comparison = address_comparison[
["internal_id", "full_address", "epc_address", "address_similarity_score", "house_numbers_match"]
]
# Anything with less than a 90 similarity score, let's do again
needs_ordnance_survey = address_comparison[
(address_comparison["address_similarity_score"] <= 90) |
(~address_comparison["house_numbers_match"])
].copy()
is_ok = address_comparison[~address_comparison["internal_id"].isin(needs_ordnance_survey["internal_id"])]
is_ok = is_ok.sort_values("address_similarity_score", ascending=True)
os_data_pull_asset_list = asset_list[
~asset_list["internal_id"].isin(is_ok["internal_id"].values)
].copy()
os_data_pull_asset_list = os_data_pull_asset_list.reset_index(drop=True)
# For each of these records, we pull the OS data
ORDNANCE_SURVEY_API_KEY = "" # This API key is a temp key which
os_most_relevant = []
os_all = {}
errors = []
for _, asset in tqdm(os_data_pull_asset_list.iterrows(), total=len(os_data_pull_asset_list)):
# Calls are throttled to 50 per minute in development mode, so lets just slow this down
time.sleep(1.3)
searcher = SearchEpc(
address1=str(asset["address1"]),
postcode=str(asset["postcode"]),
auth_token=EPC_AUTH_TOKEN,
os_api_key=ORDNANCE_SURVEY_API_KEY,
full_address=str(asset["full_address"]),
uprn=asset.get("uprn", None),
)
searcher.ordnance_survey_client.full_address = asset["full_address"]
# Attempt to get places data with retry logic
result = get_places_with_retry(searcher)
if result:
# Get the most relevant response
os_most_relevant.append(
{
"internal_id": asset["internal_id"],
**searcher.ordnance_survey_client.most_relevant_result
}
)
# Also keep the best 100 results
os_all[asset["internal_id"]] = searcher.ordnance_survey_client.results
else:
# Record the internal_id of the asset that failed
errors.append(asset["internal_id"])
# Store to S3
save_data_to_s3(
data=json.dumps(os_most_relevant),
s3_file_name="customers/Stonewater/clustering/os_most_relevant.json",
bucket_name="retrofit-data-dev"
)
save_data_to_s3(
data=json.dumps(os_all),
s3_file_name="customers/Stonewater/clustering/os_all.json",
bucket_name="retrofit-data-dev"
)
save_data_to_s3(
data=json.dumps(errors),
s3_file_name="customers/Stonewater/clustering/errors.json",
bucket_name="retrofit-data-dev"
)