setting up cache

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-22 18:46:43 +00:00
parent 4ed4c15480
commit 709a50f02e
2 changed files with 129 additions and 7 deletions

View file

@ -6,7 +6,7 @@ from tqdm import tqdm
from datetime import datetime
import pandas as pd
import numpy as np
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet, save_pickle_to_s3, read_pickle_from_s3
from utils.logger import setup_logger
from dotenv import load_dotenv
from tqdm import tqdm
@ -39,8 +39,11 @@ class DataLoader:
},
}
def __init__(self, files):
def __init__(self, files, use_cache):
self.files = files
self.use_cache = use_cache
self.data = {}
def load_asset_list(self, file_path, ha_name, sheet_name=None):
workbook = openpyxl.load_workbook(file_path)
@ -149,7 +152,8 @@ class DataLoader:
return survey_list, matched_lookup
def merge_ha_6(self, asset_list, survey_list):
@staticmethod
def merge_ha_6(asset_list, survey_list):
# Correct the asset list
asset_list["propertyaddress"] = asset_list["propertyaddress"].str.replace("Baggott Place", "Baggotts Place")
@ -268,8 +272,39 @@ class DataLoader:
"Eastdale Place", "Easdale Place"
)
survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace(
"Wedgewood Road", "Wedgwood Road"
)
survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace(
"Droitwich Drive", "Droitwich Close"
)
survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace(
"Longdale Road", "Langdale Road"
)
# We have 2 addresses in the survey list that don't have postcodes. We'll manually add them in
survey_list.loc[
(survey_list["Street / Block Name"] == "Rogers Avenue") &
pd.isnull(survey_list["Post Code"]),
"Post Code"
] = "ST5 9AT"
survey_list.loc[
(survey_list["Street / Block Name"] == "Cedar Road") &
pd.isnull(survey_list["Post Code"]),
"Post Code"
] = "ST5 7BY"
missed_postcodes = [
postcode.lower() for postcode in survey_list["Post Code"] if
postcode.lower() not in asset_list["matching_postcode"].values
]
matching_lookup = []
for _, row in tqdm(survey_list.iterrows(), total=len(survey_list)):
house_number = row["NO."]
if isinstance(house_number, str):
house_number = house_number.lower().strip()
@ -285,6 +320,16 @@ class DataLoader:
if df.shape[0] != 1:
df = df[df["matching_postcode"].str.lower().str.contains(row["Post Code"].lower())]
if df.shape[0] != 1:
postcode_lower = row["Post Code"].lower()
if postcode_lower in missed_postcodes:
matching_lookup.append(
{
"survey_list_row_id": row["survey_list_row_id"],
"asset_list_row_id": None,
}
)
continue
print(row["Street / Block Name"])
print(house_number)
print(row["Post Code"].lower())
@ -297,8 +342,19 @@ class DataLoader:
}
)
matching_lookup = pd.DataFrame(matching_lookup)
return matching_lookup
def load(self):
if self.use_cache:
self.data = read_pickle_from_s3(
bucket_name="retrofit-datalake-dev",
s3_file_name="ha-analysis/batch3-inputs.pickle",
)
return
data = {}
for ha_name, file_config in self.files.items():
# Load asset list
@ -311,19 +367,31 @@ class DataLoader:
if file_config.get("survey_list"):
logger.info("Loading survey list for {}".format(ha_name))
survey_list = self.load_survey_list(
survey_list, matched_lookup = self.load_survey_list(
file_path=file_config["survey_list"]["filepath"],
ha_name=ha_name,
sheet_name=file_config["survey_list"]["sheetname"]
)
else:
survey_list = None
matched_lookup = None
data[ha_name] = {
"asset_list": asset_list,
"survey_list": survey_list
"survey_list": survey_list,
"matched_lookup": matched_lookup
}
self.data = data
# Cache the data in s3
# We need to pickle the data and store in s3
save_pickle_to_s3(
data=self.data,
bucket_name="retrofit-datalake-dev",
s3_file_name="ha-analysis/batch3-inputs.pickle",
)
def app():
"""
@ -332,6 +400,8 @@ def app():
:return:
"""
use_cache = False
files = {
"ha_1": {
"asset_list": {
@ -354,5 +424,5 @@ def app():
"ha_107": {"asset_list": "etl/eligibility/ha_15_32/HA 107 - ASSET LIST.xlsx"}
}
loader = DataLoader(files)
loader = DataLoader(files, use_cache)
loader.load()

View file

@ -1,3 +1,4 @@
import pickle
import boto3
from io import BytesIO, StringIO
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
@ -141,5 +142,56 @@ def save_csv_to_s3(dataframe, bucket_name, file_name):
s3.put_object(Body=csv_buffer.getvalue(), Bucket=bucket_name, Key=file_name)
return True
except Exception as e:
print(f"An error occurred: {e}")
logger.error(f"An error occurred: {e}")
return False
def save_pickle_to_s3(data, bucket_name, s3_file_name):
"""
Save an object to an S3 bucket as a pickle file.
:param data: The data to save
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3 (should end in .pkl)
"""
# Serialize data to a pickle format
try:
serialized_data = pickle.dumps(data)
except Exception as e:
print(f'Failed to serialize data: {str(e)}')
return
# Use save_data_to_s3 function to upload the serialized data to S3
save_data_to_s3(serialized_data, bucket_name, s3_file_name)
def read_pickle_from_s3(bucket_name, s3_file_name):
"""
Read a pickle file from an S3 bucket and return the data.
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name of the pickle file in S3
:return: The data read from the pickle file
"""
try:
s3 = boto3.client('s3')
s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name)
serialized_data = s3_response['Body'].read()
except NoCredentialsError:
logger.errpr("Credentials not available.")
return None
except PartialCredentialsError:
logger.errpr("Incomplete credentials provided.")
return None
except Exception as e:
logger.errpr(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}')
return None
# Deserialize data from pickle format
try:
data = pickle.loads(serialized_data)
except Exception as e:
logger.errpr(f'Failed to deserialize data: {str(e)}')
return None
return data