From d3e3a72c3ddd42e95578f53ea5e635cf9dc1af7d Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Tue, 12 Sep 2023 13:11:12 +0100 Subject: [PATCH] Added message pack encoding to store data in slightly more optimised format --- model_data/cleaner_app.py | 21 +++++++++++++++----- model_data/requirements/requirements.txt | 1 + model_data/utils.py | 25 ++++++++++++++++++++---- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/model_data/cleaner_app.py b/model_data/cleaner_app.py index 9a7d25b4..1ff27a35 100644 --- a/model_data/cleaner_app.py +++ b/model_data/cleaner_app.py @@ -1,13 +1,13 @@ from tqdm import tqdm import os import pandas as pd -import json +import msgpack from model_data.EpcClean import EpcClean from model_data.analysis.UvalueEstimations import UvalueEstimations from model_data.simulation_system.core.Settings import EARLIEST_EPC_DATE from pathlib import Path -from model_data.utils import save_json_to_s3 +from model_data.utils import save_data_to_s3 LAND_REGISTRY_PATHS = [ os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv", @@ -71,8 +71,19 @@ def app(): # uvalue_estimates.floors # uvalue_estimates.roofs - save_json_to_s3( - json_data=json.dumps(cleaned_data), - s3_file_name="cleaned_epc_data/cleaned.json", + # Basic check to make sure all descriptions are unique + for _, cleaned in cleaned_data.items(): + descriptions = [x["original_description"] for x in cleaned] + if len(descriptions) != len(set(descriptions)): + raise ValueError("Duplicated descriptions found, check me") + + # We store a singular file however we could store the data under the following file path: + # cleaned_epc_data/{component}/{original_description}/cleaned.bson + # where component is one of the keys of cleaned_data. If we store it against the original data, this + # data being read in will be extremely small, meaning quicker load times. We'll begin by storing as a single + # file and monitor usage patterns to see if it makes sense to split the data up + save_data_to_s3( + data=msgpack.packb(cleaned_data, use_bin_type=True), + s3_file_name="cleaned_epc_data/cleaned.bson", bucket_name=f"retrofit-data-{ENVIRONMENT}" ) diff --git a/model_data/requirements/requirements.txt b/model_data/requirements/requirements.txt index d4de6b71..1d84fc3d 100644 --- a/model_data/requirements/requirements.txt +++ b/model_data/requirements/requirements.txt @@ -20,3 +20,4 @@ pyspellchecker textblob boto3 pyarrow +msgpack==1.0.5 diff --git a/model_data/utils.py b/model_data/utils.py index d0c2f330..f2012691 100644 --- a/model_data/utils.py +++ b/model_data/utils.py @@ -50,11 +50,11 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key): client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue()) -def save_json_to_s3(json_data, bucket_name, s3_file_name): +def save_data_to_s3(data, bucket_name, s3_file_name): """ - Save a JSON object to an S3 bucket + Save an object to an S3 bucket - :param json_data: The JSON data to save + :param data: The data to save :param bucket_name: The name of the S3 bucket :param s3_file_name: The file name to use for the saved data in S3 """ @@ -69,7 +69,24 @@ def save_json_to_s3(json_data, bucket_name, s3_file_name): return try: - s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=json_data) + s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data) print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}') except Exception as e: print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}') + + +def read_from_s3(bucket_name, s3_file_name): + """ + Read an object from s3. Decoding of the data is left for outside of this function + + :param bucket_name: The name of the S3 bucket + :param s3_file_name: The file name to use for the saved data in S3 + """ + # Initialize a session using Amazon S3 + s3 = boto3.resource('s3') + + # Get the MessagePack data from S3 + obj = s3.Object(bucket_name, s3_file_name) + data = obj.get()['Body'].read() + + return data