mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
import boto3
|
|
from io import BytesIO
|
|
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
|
|
import pandas as pd
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def read_from_s3(bucket_name, s3_file_name):
|
|
"""
|
|
Read an object from s3. Decoding of the data is left for outside of this function
|
|
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param s3_file_name: The file name to use for the saved data in S3
|
|
"""
|
|
# Initialize a session using Amazon S3
|
|
s3 = boto3.resource('s3')
|
|
|
|
# Get the MessagePack data from S3
|
|
obj = s3.Object(bucket_name, s3_file_name)
|
|
data = obj.get()['Body'].read()
|
|
|
|
return data
|
|
|
|
|
|
def save_data_to_s3(data, bucket_name, s3_file_name):
|
|
"""
|
|
Save an object to an S3 bucket
|
|
|
|
:param data: The data to save
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param s3_file_name: The file name to use for the saved data in S3
|
|
"""
|
|
# Ensure you have AWS credentials set up - either via environment variables, AWS CLI, or IAM roles
|
|
try:
|
|
s3 = boto3.client('s3')
|
|
except NoCredentialsError:
|
|
print("Credentials not available.")
|
|
return
|
|
except PartialCredentialsError:
|
|
print("Incomplete credentials provided.")
|
|
return
|
|
|
|
try:
|
|
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data)
|
|
print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}')
|
|
except Exception as e:
|
|
print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}')
|
|
|
|
|
|
def read_io_from_s3(bucket_name, file_key):
|
|
"""
|
|
Read a file from S3 into a BytesIO object. This can be used by other methods to parse the response
|
|
|
|
Because we use
|
|
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param file_key: The file name of the shapefile in S3
|
|
:return: Io file to be parsed by another method
|
|
"""
|
|
client = boto3.client('s3')
|
|
|
|
# Get the Parquet file from S3
|
|
response = client.get_object(Bucket=bucket_name, Key=file_key)
|
|
|
|
# Read the file into an io object
|
|
buffer = BytesIO(response['Body'].read())
|
|
|
|
return buffer
|
|
|
|
|
|
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
|
|
"""
|
|
Save a pandas DataFrame to S3 as a Parquet file.
|
|
|
|
:param df: The pandas DataFrame.
|
|
:param bucket_name: Name of the S3 bucket.
|
|
:param file_key: Key of the file (including directory path within the bucket).
|
|
"""
|
|
|
|
# Convert the DataFrame to a Parquet format in memory
|
|
parquet_buffer = BytesIO()
|
|
df.to_parquet(parquet_buffer)
|
|
|
|
# Create the boto3 client
|
|
client = boto3.client('s3')
|
|
|
|
# Upload the Parquet file to S3
|
|
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())
|
|
|
|
|
|
def read_dataframe_from_s3_parquet(bucket_name, file_key):
|
|
"""
|
|
Read a pandas DataFrame from a Parquet file stored in S3.
|
|
|
|
:param bucket_name: Name of the S3 bucket.
|
|
:param file_key: Key of the file (including directory path within the bucket).
|
|
:return: A pandas DataFrame.
|
|
"""
|
|
|
|
if not file_key.endswith(".parquet"):
|
|
raise logger.warning("This file doesn't look like a parquet file")
|
|
|
|
parquet_buffer = read_io_from_s3(
|
|
bucket_name=bucket_name,
|
|
file_key=file_key
|
|
)
|
|
|
|
df = pd.read_parquet(parquet_buffer)
|
|
|
|
return df
|