diff --git a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py index 52117d17..bf91d8b6 100644 --- a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py +++ b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py @@ -6,7 +6,7 @@ from tqdm import tqdm from datetime import datetime import pandas as pd import numpy as np -from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet +from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet, save_pickle_to_s3, read_pickle_from_s3 from utils.logger import setup_logger from dotenv import load_dotenv from tqdm import tqdm @@ -39,8 +39,11 @@ class DataLoader: }, } - def __init__(self, files): + def __init__(self, files, use_cache): self.files = files + self.use_cache = use_cache + + self.data = {} def load_asset_list(self, file_path, ha_name, sheet_name=None): workbook = openpyxl.load_workbook(file_path) @@ -149,7 +152,8 @@ class DataLoader: return survey_list, matched_lookup - def merge_ha_6(self, asset_list, survey_list): + @staticmethod + def merge_ha_6(asset_list, survey_list): # Correct the asset list asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Baggott Place", "Baggotts Place") @@ -268,8 +272,39 @@ class DataLoader: "Eastdale Place", "Easdale Place" ) + survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace( + "Wedgewood Road", "Wedgwood Road" + ) + + survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace( + "Droitwich Drive", "Droitwich Close" + ) + + survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace( + "Longdale Road", "Langdale Road" + ) + + # We have 2 addresses in the survey list that don't have postcodes. We'll manually add them in + survey_list.loc[ + (survey_list["Street / Block Name"] == "Rogers Avenue") & + pd.isnull(survey_list["Post Code"]), + "Post Code" + ] = "ST5 9AT" + + survey_list.loc[ + (survey_list["Street / Block Name"] == "Cedar Road") & + pd.isnull(survey_list["Post Code"]), + "Post Code" + ] = "ST5 7BY" + + missed_postcodes = [ + postcode.lower() for postcode in survey_list["Post Code"] if + postcode.lower() not in asset_list["matching_postcode"].values + ] + matching_lookup = [] for _, row in tqdm(survey_list.iterrows(), total=len(survey_list)): + house_number = row["NO."] if isinstance(house_number, str): house_number = house_number.lower().strip() @@ -285,6 +320,16 @@ class DataLoader: if df.shape[0] != 1: df = df[df["matching_postcode"].str.lower().str.contains(row["Post Code"].lower())] if df.shape[0] != 1: + postcode_lower = row["Post Code"].lower() + if postcode_lower in missed_postcodes: + matching_lookup.append( + { + "survey_list_row_id": row["survey_list_row_id"], + "asset_list_row_id": None, + } + ) + continue + print(row["Street / Block Name"]) print(house_number) print(row["Post Code"].lower()) @@ -297,8 +342,19 @@ class DataLoader: } ) + matching_lookup = pd.DataFrame(matching_lookup) + + return matching_lookup + def load(self): + if self.use_cache: + self.data = read_pickle_from_s3( + bucket_name="retrofit-datalake-dev", + s3_file_name="ha-analysis/batch3-inputs.pickle", + ) + return + data = {} for ha_name, file_config in self.files.items(): # Load asset list @@ -311,19 +367,31 @@ class DataLoader: if file_config.get("survey_list"): logger.info("Loading survey list for {}".format(ha_name)) - survey_list = self.load_survey_list( + survey_list, matched_lookup = self.load_survey_list( file_path=file_config["survey_list"]["filepath"], ha_name=ha_name, sheet_name=file_config["survey_list"]["sheetname"] ) else: survey_list = None + matched_lookup = None data[ha_name] = { "asset_list": asset_list, - "survey_list": survey_list + "survey_list": survey_list, + "matched_lookup": matched_lookup } + self.data = data + + # Cache the data in s3 + # We need to pickle the data and store in s3 + save_pickle_to_s3( + data=self.data, + bucket_name="retrofit-datalake-dev", + s3_file_name="ha-analysis/batch3-inputs.pickle", + ) + def app(): """ @@ -332,6 +400,8 @@ def app(): :return: """ + use_cache = False + files = { "ha_1": { "asset_list": { @@ -354,5 +424,5 @@ def app(): "ha_107": {"asset_list": "etl/eligibility/ha_15_32/HA 107 - ASSET LIST.xlsx"} } - loader = DataLoader(files) + loader = DataLoader(files, use_cache) loader.load() diff --git a/utils/s3.py b/utils/s3.py index e63b7192..3d6cf038 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -1,3 +1,4 @@ +import pickle import boto3 from io import BytesIO, StringIO from botocore.exceptions import NoCredentialsError, PartialCredentialsError @@ -141,5 +142,56 @@ def save_csv_to_s3(dataframe, bucket_name, file_name): s3.put_object(Body=csv_buffer.getvalue(), Bucket=bucket_name, Key=file_name) return True except Exception as e: - print(f"An error occurred: {e}") + logger.error(f"An error occurred: {e}") return False + + +def save_pickle_to_s3(data, bucket_name, s3_file_name): + """ + Save an object to an S3 bucket as a pickle file. + + :param data: The data to save + :param bucket_name: The name of the S3 bucket + :param s3_file_name: The file name to use for the saved data in S3 (should end in .pkl) + """ + # Serialize data to a pickle format + try: + serialized_data = pickle.dumps(data) + except Exception as e: + print(f'Failed to serialize data: {str(e)}') + return + + # Use save_data_to_s3 function to upload the serialized data to S3 + save_data_to_s3(serialized_data, bucket_name, s3_file_name) + + +def read_pickle_from_s3(bucket_name, s3_file_name): + """ + Read a pickle file from an S3 bucket and return the data. + + :param bucket_name: The name of the S3 bucket + :param s3_file_name: The file name of the pickle file in S3 + :return: The data read from the pickle file + """ + try: + s3 = boto3.client('s3') + s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name) + serialized_data = s3_response['Body'].read() + except NoCredentialsError: + logger.errpr("Credentials not available.") + return None + except PartialCredentialsError: + logger.errpr("Incomplete credentials provided.") + return None + except Exception as e: + logger.errpr(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}') + return None + + # Deserialize data from pickle format + try: + data = pickle.loads(serialized_data) + except Exception as e: + logger.errpr(f'Failed to deserialize data: {str(e)}') + return None + + return data