Model/utils/s3.py
2024-03-01 16:29:19 +00:00

226 lines
6.9 KiB
Python

import pickle
import boto3
from io import BytesIO, StringIO
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import pandas as pd
from utils.logger import setup_logger
logger = setup_logger()
def read_from_s3(bucket_name, s3_file_name):
"""
Read an object from s3. Decoding of the data is left for outside of this function
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3
"""
# Initialize a session using Amazon S3
s3 = boto3.resource('s3')
# Get the MessagePack data from S3
obj = s3.Object(bucket_name, s3_file_name)
data = obj.get()['Body'].read()
return data
def save_data_to_s3(data, bucket_name, s3_file_name):
"""
Save an object to an S3 bucket
:param data: The data to save
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3
"""
# Ensure you have AWS credentials set up - either via environment variables, AWS CLI, or IAM roles
try:
s3 = boto3.client('s3')
except NoCredentialsError:
print("Credentials not available.")
return
except PartialCredentialsError:
print("Incomplete credentials provided.")
return
try:
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data)
print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}')
except Exception as e:
print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}')
def read_io_from_s3(bucket_name, file_key):
"""
Read a file from S3 into a BytesIO object. This can be used by other methods to parse the response
Because we use
:param bucket_name: The name of the S3 bucket
:param file_key: The file name of the shapefile in S3
:return: Io file to be parsed by another method
"""
client = boto3.client('s3')
# Get the Parquet file from S3
response = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the file into an io object
buffer = BytesIO(response['Body'].read())
return buffer
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.
:param df: The pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
"""
# Convert the DataFrame to a Parquet format in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer)
# Create the boto3 client
client = boto3.client('s3')
# Upload the Parquet file to S3
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())
def read_dataframe_from_s3_parquet(bucket_name, file_key):
"""
Read a pandas DataFrame from a Parquet file stored in S3.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:return: A pandas DataFrame.
"""
if bucket_name is None:
raise ValueError("Bucket name is None when trying to read dataframe from parquet")
if not file_key.endswith(".parquet"):
raise ValueError("This file doesn't look like a parquet file")
parquet_buffer = read_io_from_s3(
bucket_name=bucket_name,
file_key=file_key
)
df = pd.read_parquet(parquet_buffer)
return df
def save_csv_to_s3(dataframe, bucket_name, file_name):
"""
Save a Pandas DataFrame to a CSV file in an S3 bucket.
Parameters:
dataframe (pd.DataFrame): The Pandas DataFrame to save.
bucket_name (str): The name of the S3 bucket.
file_name (str): The name of the file to save in the S3 bucket.
Returns:
bool: True if the file was successfully saved, False otherwise.
"""
# Initialize S3 client
s3 = boto3.client('s3')
# Create an in-memory text stream
csv_buffer = StringIO()
# Save DataFrame to buffer
dataframe.to_csv(csv_buffer, index=False)
# Upload buffer contents to S3
try:
s3.put_object(Body=csv_buffer.getvalue(), Bucket=bucket_name, Key=file_name)
return True
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def save_pickle_to_s3(data, bucket_name, s3_file_name):
"""
Save an object to an S3 bucket as a pickle file.
:param data: The data to save
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3 (should end in .pkl)
"""
# Serialize data to a pickle format
try:
serialized_data = pickle.dumps(data)
except Exception as e:
print(f'Failed to serialize data: {str(e)}')
return
# Use save_data_to_s3 function to upload the serialized data to S3
save_data_to_s3(serialized_data, bucket_name, s3_file_name)
def read_pickle_from_s3(bucket_name, s3_file_name):
"""
Read a pickle file from an S3 bucket and return the data.
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name of the pickle file in S3
:return: The data read from the pickle file
"""
try:
s3 = boto3.client('s3')
s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name)
serialized_data = s3_response['Body'].read()
except NoCredentialsError:
logger.errpr("Credentials not available.")
return None
except PartialCredentialsError:
logger.errpr("Incomplete credentials provided.")
return None
except Exception as e:
logger.error(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}')
return None
# Deserialize data from pickle format
try:
data = pickle.loads(serialized_data)
except Exception as e:
logger.errpr(f'Failed to deserialize data: {str(e)}')
return None
return data
def read_excel_from_s3(bucket_name, file_key, header_row):
"""
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param header_row: The row number to use as the header (0-indexed).
:return: A pandas DataFrame containing the data from the Excel file.
"""
# Ensure the file_key is an Excel file
if not file_key.endswith((".xls", ".xlsx")):
raise ValueError("The specified file does not appear to be an Excel file.")
# Use the read_io_from_s3 function to get the data as a BytesIO object
excel_buffer = read_io_from_s3(bucket_name, file_key)
# Read the Excel file into a pandas DataFrame
df = pd.read_excel(excel_buffer, header=header_row)
# Drop columns where all values are NaN
df.dropna(axis=1, how='all', inplace=True)
# Reset index if the first column is just an index or entirely NaN
df.reset_index(drop=True, inplace=True)
return df