From 9e32b8bf740f96b26f140218c136a08fa98c35df Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 7 Jun 2024 14:31:22 +0100 Subject: [PATCH] working on collating the data from OS for Stonewater --- etl/customers/stonewater/shdf_3_clustering.py | 251 ++++++++++-------- 1 file changed, 142 insertions(+), 109 deletions(-) diff --git a/etl/customers/stonewater/shdf_3_clustering.py b/etl/customers/stonewater/shdf_3_clustering.py index e72c5000..45b435ed 100644 --- a/etl/customers/stonewater/shdf_3_clustering.py +++ b/etl/customers/stonewater/shdf_3_clustering.py @@ -1,5 +1,8 @@ import json from tqdm import tqdm +import os +from dotenv import load_dotenv +from backend.SearchEpc import SearchEpc from fuzzywuzzy import fuzz import numpy as np @@ -7,6 +10,9 @@ import pandas as pd import time from utils.s3 import save_data_to_s3, read_excel_from_s3, read_from_s3 +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + def remove_commas_and_full_stops(input_string: str) -> str: """ @@ -36,13 +42,19 @@ def get_places_with_retry(searcher, max_retries=5, wait_time=2): """ for attempt in range(max_retries): try: - result = searcher.ordnance_survey_client.get_places_api() - return result # Return the result if successful + response = searcher.ordnance_survey_client.get_places_api() + status = response.get("status") + if status == 200: + return response # Return the result if successful + else: + print(f"Attempt {attempt + 1} failed with status code: {status}") except Exception as e: print(f"Attempt {attempt + 1} failed with error: {e}") - if attempt < max_retries - 1: - print(f"Retrying in {wait_time} seconds...") - time.sleep(wait_time) + + if attempt < max_retries - 1: + print(f"Retrying in {wait_time} seconds...") + time.sleep(wait_time) + print(f"All {max_retries} attempts failed.") return None @@ -57,16 +69,16 @@ def app(): """ # TODO: Temp read from local machine - move to s3 - asset_list = pd.read_excel( - "/Users/khalimconn-kowlessar/Downloads/Stonewater SHDF_3_0_Board Triage 22.05.24.xlsx", header=4 - ) - - # asset_list = read_excel_from_s3( - # file_key="customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24.xlsx", - # bucket_name="retrofit-data-dev", - # header_row=4 + # asset_list = pd.read_excel( + # "/Users/khalimconn-kowlessar/Downloads/Stonewater SHDF_3_0_Board Triage 22.05.24.xlsx", header=4 # ) + asset_list = read_excel_from_s3( + file_key="customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24.xlsx", + bucket_name="retrofit-data-dev", + header_row=4 + ) + # Drop the bottom 4 rows, which are completely missing asset_list = asset_list.head(-4) @@ -98,15 +110,6 @@ def app(): ) # Create full address - # TODO: handle cases where one of these is null - asset_list["full_address"] = ( - asset_list["address1"] + ", " + - asset_list["address2"] + ", " + - asset_list["city_town"] + ", " + - asset_list["county"] + ", " + - asset_list["postcode"] - ) - asset_list["full_address"] = np.where( ~pd.isnull(asset_list["address2"]), ( @@ -125,46 +128,37 @@ def app(): if pd.isnull(asset_list["full_address"]).sum(): raise ValueError("Missing full addresses") - # TODO: Store in S3 - - # TODO: Move ths # Pull in the data - - import os - from dotenv import load_dotenv - from backend.SearchEpc import SearchEpc - - load_dotenv(dotenv_path="backend/.env") - EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + # This data has already been pulled as much as it can be, so we retrieve the existing extraction from S3 # Perform an initial pull without ordnance survey data - epc_data = [] - older_epc_data = {} - - for row_number, asset in tqdm(asset_list.iterrows(), total=len(asset_list)): - searcher = SearchEpc( - address1=str(asset["address1"]), - postcode=str(asset["postcode"]), - auth_token=EPC_AUTH_TOKEN, - os_api_key="", - full_address=str(asset["full_address"]), - uprn=asset.get("uprn", None), - ) - searcher.find_property(skip_os=True) - - if searcher.newest_epc is None: - continue - - epc_data.append( - { - "internal_id": asset["internal_id"], - **searcher.newest_epc - } - ) - - if searcher.older_epcs is not None: - older_epc_data[asset["internal_id"]] = searcher.older_epcs - + # epc_data = [] + # older_epc_data = {} + # + # for row_number, asset in tqdm(asset_list.iterrows(), total=len(asset_list)): + # searcher = SearchEpc( + # address1=str(asset["address1"]), + # postcode=str(asset["postcode"]), + # auth_token=EPC_AUTH_TOKEN, + # os_api_key="", + # full_address=str(asset["full_address"]), + # uprn=asset.get("uprn", None), + # ) + # searcher.find_property(skip_os=True) + # + # if searcher.newest_epc is None: + # continue + # + # epc_data.append( + # { + # "internal_id": asset["internal_id"], + # **searcher.newest_epc + # } + # ) + # + # if searcher.older_epcs is not None: + # older_epc_data[asset["internal_id"]] = searcher.older_epcs + # # # Store to S3 # save_data_to_s3( # data=json.dumps(epc_data), @@ -192,7 +186,7 @@ def app(): ) ) - # TODO: Perform a comparison between the EPC address and the asset list address, just to double check + # Perform a comparison between the EPC address and the asset list address, just to double check epc_data_df = pd.DataFrame(epc_data) address_comparison = ( @@ -246,59 +240,98 @@ def app(): os_data_pull_asset_list = asset_list[ ~asset_list["internal_id"].isin(is_ok["internal_id"].values) ].copy() + + # We have already done a partial pull of the Ordnance survey data so we can skip some of the records + # os_most_relevant_1 = json.loads( + # read_from_s3( + # bucket_name="retrofit-data-dev", + # s3_file_name="customers/Stonewater/clustering/os_most_relevant_1.json" + # ) + # ) + # + # os_most_relevant_2 = json.loads( + # read_from_s3( + # bucket_name="retrofit-data-dev", + # s3_file_name="customers/Stonewater/clustering/os_most_relevant_2.json" + # ) + # ) + # + # fetched_internal_ids = ( + # [x["internal_id"] for x in os_most_relevant_1] + [x["internal_id"] for x in os_most_relevant_2] + # ) + # + # # We remove any ids we've already fetched + # os_data_pull_asset_list = os_data_pull_asset_list[ + # ~os_data_pull_asset_list["internal_id"].isin(fetched_internal_ids) + # ] + # + # # Our OK EPC data (is_ok) + ordnance survey fetched data + the data we need to fetch should equal the total + # # number of assets + # assert len(is_ok) + len(fetched_internal_ids) + len(os_data_pull_asset_list) == len(asset_list) + os_data_pull_asset_list = os_data_pull_asset_list.reset_index(drop=True) # For each of these records, we pull the OS data - ORDNANCE_SURVEY_API_KEY = "" # This API key is a temp key which - os_most_relevant = [] - os_all = {} - errors = [] - for _, asset in tqdm(os_data_pull_asset_list.iterrows(), total=len(os_data_pull_asset_list)): - # Calls are throttled to 50 per minute in development mode, so lets just slow this down - time.sleep(1.3) - - searcher = SearchEpc( - address1=str(asset["address1"]), - postcode=str(asset["postcode"]), - auth_token=EPC_AUTH_TOKEN, - os_api_key=ORDNANCE_SURVEY_API_KEY, - full_address=str(asset["full_address"]), - uprn=asset.get("uprn", None), - ) - searcher.ordnance_survey_client.full_address = asset["full_address"] - # Attempt to get places data with retry logic - result = get_places_with_retry(searcher) - - if result: - # Get the most relevant response - os_most_relevant.append( - { - "internal_id": asset["internal_id"], - **searcher.ordnance_survey_client.most_relevant_result - } - ) - - # Also keep the best 100 results - os_all[asset["internal_id"]] = searcher.ordnance_survey_client.results - else: - # Record the internal_id of the asset that failed - errors.append(asset["internal_id"]) + # ORDNANCE_SURVEY_API_KEY = "" # This API key is a temp key which I have copied locally + # os_most_relevant = [] + # os_all = {} + # errors = [] + # for _, asset in tqdm(os_data_pull_asset_list.iterrows(), total=len(os_data_pull_asset_list)): + # # Calls are throttled to 50 per minute in development mode, so lets just slow this down + # time.sleep(2) + # + # searcher = SearchEpc( + # address1=str(asset["address1"]), + # postcode=str(asset["postcode"]), + # auth_token=EPC_AUTH_TOKEN, + # os_api_key=ORDNANCE_SURVEY_API_KEY, + # full_address=str(asset["full_address"]), + # uprn=asset.get("uprn", None), + # ) + # searcher.ordnance_survey_client.full_address = asset["full_address"] + # # Attempt to get places data with retry logic + # result = get_places_with_retry(searcher) + # + # if result: + # # Get the most relevant response + # os_most_relevant.append( + # { + # "internal_id": asset["internal_id"], + # **searcher.ordnance_survey_client.most_relevant_result + # } + # ) + # + # # Also keep the best 100 results + # os_all[asset["internal_id"]] = searcher.ordnance_survey_client.results + # else: + # # Record the internal_id of the asset that failed + # print("Error for address: " + asset["full_address"]) + # errors.append(asset["internal_id"]) # Store to S3 - save_data_to_s3( - data=json.dumps(os_most_relevant), - s3_file_name="customers/Stonewater/clustering/os_most_relevant.json", - bucket_name="retrofit-data-dev" - ) + # save_data_to_s3( + # data=json.dumps(os_most_relevant), + # s3_file_name="customers/Stonewater/clustering/os_most_relevant_3.json", + # bucket_name="retrofit-data-dev" + # ) + # + # save_data_to_s3( + # data=json.dumps(os_all), + # s3_file_name="customers/Stonewater/clustering/os_all_3.json", + # bucket_name="retrofit-data-dev" + # ) + # + # save_data_to_s3( + # data=json.dumps(errors), + # s3_file_name="customers/Stonewater/clustering/errors_3.json", + # bucket_name="retrofit-data-dev" + # ) - save_data_to_s3( - data=json.dumps(os_all), - s3_file_name="customers/Stonewater/clustering/os_all.json", - bucket_name="retrofit-data-dev" - ) + # We now collate all of the data for the following steps: + # 1) Checking the retrieve ordnance survey data against ordnance survey data + # 2) A second round of querying the EPC api to find the EPC data, in case we retrieve something using uprn + # 3) Predicting the EPC data for the properties we have no data for + # 4) Retrieveing additional data against the internal_id + # 5) Creation of final dataset for clustering - save_data_to_s3( - data=json.dumps(errors), - s3_file_name="customers/Stonewater/clustering/errors.json", - bucket_name="retrofit-data-dev" - ) + for i in ["1", "2", "3"]: