Added message pack encoding to store data in slightly more optimised format

This commit is contained in:
Khalim Conn-Kowlessar 2023-09-12 13:11:12 +01:00
parent 2e58937337
commit d3e3a72c3d
3 changed files with 38 additions and 9 deletions

View file

@ -1,13 +1,13 @@
from tqdm import tqdm
import os
import pandas as pd
import json
import msgpack
from model_data.EpcClean import EpcClean
from model_data.analysis.UvalueEstimations import UvalueEstimations
from model_data.simulation_system.core.Settings import EARLIEST_EPC_DATE
from pathlib import Path
from model_data.utils import save_json_to_s3
from model_data.utils import save_data_to_s3
LAND_REGISTRY_PATHS = [
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
@ -71,8 +71,19 @@ def app():
# uvalue_estimates.floors
# uvalue_estimates.roofs
save_json_to_s3(
json_data=json.dumps(cleaned_data),
s3_file_name="cleaned_epc_data/cleaned.json",
# Basic check to make sure all descriptions are unique
for _, cleaned in cleaned_data.items():
descriptions = [x["original_description"] for x in cleaned]
if len(descriptions) != len(set(descriptions)):
raise ValueError("Duplicated descriptions found, check me")
# We store a singular file however we could store the data under the following file path:
# cleaned_epc_data/{component}/{original_description}/cleaned.bson
# where component is one of the keys of cleaned_data. If we store it against the original data, this
# data being read in will be extremely small, meaning quicker load times. We'll begin by storing as a single
# file and monitor usage patterns to see if it makes sense to split the data up
save_data_to_s3(
data=msgpack.packb(cleaned_data, use_bin_type=True),
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name=f"retrofit-data-{ENVIRONMENT}"
)

View file

@ -20,3 +20,4 @@ pyspellchecker
textblob
boto3
pyarrow
msgpack==1.0.5

View file

@ -50,11 +50,11 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())
def save_json_to_s3(json_data, bucket_name, s3_file_name):
def save_data_to_s3(data, bucket_name, s3_file_name):
"""
Save a JSON object to an S3 bucket
Save an object to an S3 bucket
:param json_data: The JSON data to save
:param data: The data to save
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3
"""
@ -69,7 +69,24 @@ def save_json_to_s3(json_data, bucket_name, s3_file_name):
return
try:
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=json_data)
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data)
print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}')
except Exception as e:
print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}')
def read_from_s3(bucket_name, s3_file_name):
"""
Read an object from s3. Decoding of the data is left for outside of this function
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3
"""
# Initialize a session using Amazon S3
s3 = boto3.resource('s3')
# Get the MessagePack data from S3
obj = s3.Object(bucket_name, s3_file_name)
data = obj.get()['Body'].read()
return data