mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
485 lines
15 KiB
Python
485 lines
15 KiB
Python
import pickle
|
|
import boto3
|
|
import csv
|
|
import pandas as pd
|
|
from io import BytesIO, StringIO
|
|
from urllib.parse import unquote
|
|
from utils.logger import setup_logger
|
|
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
|
|
from typing import Any
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
|
|
"""
|
|
Parse S3 URI to extract bucket and key.
|
|
|
|
Supports two formats:
|
|
1. S3 URI format: s3://bucket/key
|
|
2. AWS console URL format with query parameters
|
|
"""
|
|
logger.info("Parsing S3 URI")
|
|
|
|
try:
|
|
# Check if it's an S3 URI format
|
|
if s3_uri.startswith("s3://"):
|
|
parts = s3_uri[5:].split("/", 1)
|
|
if len(parts) < 2:
|
|
raise ValueError("S3 URI must include both bucket and key")
|
|
bucket = parts[0]
|
|
key = parts[1]
|
|
logger.info(f"Extracted bucket: {bucket}, key: {key}")
|
|
return bucket, key
|
|
|
|
# Otherwise, treat as AWS console URL
|
|
logger.info("Parsing as AWS console URL")
|
|
|
|
# Split base URL and query string
|
|
if "?" not in s3_uri:
|
|
raise ValueError("No query string found")
|
|
|
|
base, query = s3_uri.split("?", 1)
|
|
|
|
# Extract bucket from base URL
|
|
if "/s3/object/" not in base:
|
|
raise ValueError("No '/s3/object/' found in URL path")
|
|
|
|
path_parts = base.split("/s3/object/")
|
|
bucket = path_parts[1]
|
|
logger.info(f"Extracted bucket: {bucket}")
|
|
|
|
# Extract prefix from query parameters
|
|
params = dict(item.split("=") for item in query.split("&") if "=" in item)
|
|
key = unquote(params.get("prefix", ""))
|
|
logger.info(f"Extracted key: {key}")
|
|
|
|
return bucket, key
|
|
except Exception as e:
|
|
logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}")
|
|
raise ValueError(f"Could not parse S3 URI") from e
|
|
|
|
|
|
def read_from_s3(bucket_name, s3_file_name):
|
|
"""
|
|
Read an object from s3. Decoding of the data is left for outside of this function
|
|
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param s3_file_name: The file name to use for the saved data in S3
|
|
"""
|
|
# Initialize a session using Amazon S3
|
|
s3 = boto3.resource("s3")
|
|
|
|
# Get the MessagePack data from S3
|
|
obj = s3.Object(bucket_name, s3_file_name)
|
|
data = obj.get()["Body"].read()
|
|
|
|
return data
|
|
|
|
|
|
def save_data_to_s3(data, bucket_name, s3_file_name):
|
|
"""
|
|
Save an object to an S3 bucket
|
|
|
|
:param data: The data to save
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param s3_file_name: The file name to use for the saved data in S3
|
|
"""
|
|
# Ensure you have AWS credentials set up - either via environment variables, AWS CLI, or IAM roles
|
|
try:
|
|
s3 = boto3.client("s3")
|
|
except NoCredentialsError:
|
|
print("Credentials not available.")
|
|
return
|
|
except PartialCredentialsError:
|
|
print("Incomplete credentials provided.")
|
|
return
|
|
|
|
try:
|
|
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data)
|
|
print(f"Successfully uploaded data to {bucket_name}/{s3_file_name}")
|
|
except Exception as e:
|
|
print(f"Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}")
|
|
|
|
|
|
def read_io_from_s3(bucket_name: str, file_key: str) -> BytesIO:
|
|
"""
|
|
Read a file from S3 into a BytesIO object. This can be used by other methods to parse the response
|
|
|
|
Because we use
|
|
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param file_key: The file name of the shapefile in S3
|
|
:return: Io file to be parsed by another method
|
|
"""
|
|
client = boto3.client("s3")
|
|
|
|
# Get the Parquet file from S3
|
|
response = client.get_object(Bucket=bucket_name, Key=file_key)
|
|
|
|
# Read the file into an io object
|
|
buffer = BytesIO(response["Body"].read())
|
|
|
|
return buffer
|
|
|
|
|
|
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
|
|
"""
|
|
Save a pandas DataFrame to S3 as a Parquet file.
|
|
|
|
:param df: The pandas DataFrame.
|
|
:param bucket_name: Name of the S3 bucket.
|
|
:param file_key: Key of the file (including directory path within the bucket).
|
|
"""
|
|
|
|
# Convert the DataFrame to a Parquet format in memory
|
|
parquet_buffer = BytesIO()
|
|
df.to_parquet(parquet_buffer)
|
|
|
|
# Create the boto3 client
|
|
client = boto3.client("s3")
|
|
|
|
# Upload the Parquet file to S3
|
|
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())
|
|
|
|
|
|
def read_dataframe_from_s3_parquet(bucket_name, file_key):
|
|
"""
|
|
Read a pandas DataFrame from a Parquet file stored in S3.
|
|
|
|
:param bucket_name: Name of the S3 bucket.
|
|
:param file_key: Key of the file (including directory path within the bucket).
|
|
:return: A pandas DataFrame.
|
|
"""
|
|
|
|
if bucket_name is None:
|
|
raise ValueError(
|
|
"Bucket name is None when trying to read dataframe from parquet"
|
|
)
|
|
|
|
if not file_key.endswith(".parquet"):
|
|
raise ValueError("This file doesn't look like a parquet file")
|
|
|
|
parquet_buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key)
|
|
|
|
df = pd.read_parquet(parquet_buffer)
|
|
|
|
return df
|
|
|
|
|
|
def save_csv_to_s3(dataframe, bucket_name, file_name):
|
|
"""
|
|
Save a Pandas DataFrame to a CSV file in an S3 bucket.
|
|
|
|
Parameters:
|
|
dataframe (pd.DataFrame): The Pandas DataFrame to save.
|
|
bucket_name (str): The name of the S3 bucket.
|
|
file_name (str): The name of the file to save in the S3 bucket.
|
|
|
|
Returns:
|
|
bool: True if the file was successfully saved, False otherwise.
|
|
"""
|
|
# Initialize S3 client
|
|
s3 = boto3.client("s3")
|
|
|
|
# Create an in-memory text stream
|
|
csv_buffer = StringIO()
|
|
|
|
# Save DataFrame to buffer
|
|
dataframe.to_csv(csv_buffer, index=False)
|
|
|
|
# Upload buffer contents to S3
|
|
try:
|
|
s3.put_object(Body=csv_buffer.getvalue(), Bucket=bucket_name, Key=file_name)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"An error occurred: {e}")
|
|
return False
|
|
|
|
|
|
def save_pickle_to_s3(data, bucket_name, s3_file_name):
|
|
"""
|
|
Save an object to an S3 bucket as a pickle file.
|
|
|
|
:param data: The data to save
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param s3_file_name: The file name to use for the saved data in S3 (should end in .pkl)
|
|
"""
|
|
# Serialize data to a pickle format
|
|
try:
|
|
serialized_data = pickle.dumps(data)
|
|
except Exception as e:
|
|
print(f"Failed to serialize data: {str(e)}")
|
|
return
|
|
|
|
# Use save_data_to_s3 function to upload the serialized data to S3
|
|
save_data_to_s3(serialized_data, bucket_name, s3_file_name)
|
|
|
|
|
|
def read_pickle_from_s3(bucket_name, s3_file_name):
|
|
"""
|
|
Read a pickle file from an S3 bucket and return the data.
|
|
|
|
:param bucket_name: The name of the S3 bucket
|
|
:param s3_file_name: The file name of the pickle file in S3
|
|
:return: The data read from the pickle file
|
|
"""
|
|
try:
|
|
s3 = boto3.client("s3")
|
|
s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name)
|
|
serialized_data = s3_response["Body"].read()
|
|
except NoCredentialsError:
|
|
logger.errpr("Credentials not available.")
|
|
return None
|
|
except PartialCredentialsError:
|
|
logger.errpr("Incomplete credentials provided.")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}"
|
|
)
|
|
return None
|
|
|
|
# Deserialize data from pickle format
|
|
try:
|
|
data = pickle.loads(serialized_data)
|
|
except Exception as e:
|
|
logger.error(f"Failed to deserialize data: {str(e)}")
|
|
return None
|
|
|
|
return data
|
|
|
|
|
|
def read_excel_from_s3(
|
|
bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None
|
|
):
|
|
"""
|
|
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
|
|
|
|
:param bucket_name: Name of the S3 bucket.
|
|
:param file_key: Key of the file (including directory path within the bucket).
|
|
:param header_row: The row number to use as the header (0-indexed).
|
|
:param drop_all_na: Whether to drop columns where all values are NaN.
|
|
:param sheet_name: The name of the sheet to read from the Excel file. If None, reads the first sheet.
|
|
:return: A pandas DataFrame containing the data from the Excel file.
|
|
"""
|
|
|
|
# Ensure the file_key is an Excel file
|
|
if not file_key.endswith((".xls", ".xlsx")):
|
|
raise ValueError("The specified file does not appear to be an Excel file.")
|
|
|
|
# Use the read_io_from_s3 function to get the data as a BytesIO object
|
|
excel_buffer = read_io_from_s3(bucket_name, file_key)
|
|
|
|
# Read the Excel file into a pandas DataFrame
|
|
df = pd.read_excel(excel_buffer, header=header_row, sheet_name=sheet_name)
|
|
|
|
# Drop columns where all values are NaN
|
|
if drop_all_na:
|
|
df.dropna(axis=1, how="all", inplace=True)
|
|
|
|
# Reset index if the first column is just an index or entirely NaN
|
|
df.reset_index(drop=True, inplace=True)
|
|
|
|
return df
|
|
|
|
|
|
def save_excel_to_s3(df, bucket_name, file_key):
|
|
"""
|
|
Save a pandas DataFrame as an Excel file on S3.
|
|
|
|
:param df: DataFrame to save.
|
|
:param bucket_name: S3 bucket name.
|
|
:param file_key: S3 file key. This includes the file name and path.
|
|
"""
|
|
# Ensure the DataFrame is not empty
|
|
if df.empty:
|
|
raise ValueError("The DataFrame is empty. Nothing to save to Excel.")
|
|
|
|
# Ensure the file_key ends with an appropriate Excel file extension
|
|
if not file_key.endswith((".xls", ".xlsx")):
|
|
raise ValueError("The specified file key does not appear to be an Excel file.")
|
|
|
|
# Create a BytesIO buffer
|
|
output = BytesIO()
|
|
# Save DataFrame to an Excel file buffer
|
|
df.to_excel(output, index=False)
|
|
output.seek(0) # Important: move back to the beginning of the buffer
|
|
|
|
# Initialize a session using boto3
|
|
session = boto3.session.Session()
|
|
s3 = session.resource("s3")
|
|
|
|
# Upload the Excel file from the buffer to S3
|
|
bucket = s3.Bucket(bucket_name)
|
|
bucket.put_object(Body=output, Key=file_key)
|
|
|
|
logger.info(f"Excel file saved to S3 bucket '{bucket_name}' with key '{file_key}'")
|
|
|
|
|
|
def read_csv_from_s3(bucket_name: str, filepath: str) -> list[dict[str, str]]:
|
|
logger.info(
|
|
f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'"
|
|
)
|
|
s3 = boto3.client("s3")
|
|
|
|
# Get the object from s3
|
|
s3_object = s3.get_object(Bucket=bucket_name, Key=filepath)
|
|
|
|
# Read the CSV body from the s3 object
|
|
body = s3_object["Body"].read()
|
|
|
|
# Use StringIO to create a file-like object from the string
|
|
csv_data = StringIO(body.decode("utf-8"))
|
|
|
|
# Use csv library to read it into a list of dictionaries
|
|
reader = csv.DictReader(csv_data)
|
|
data = list(reader)
|
|
|
|
return data
|
|
|
|
|
|
def list_files_in_s3_folder(bucket_name, folder_name):
|
|
"""
|
|
List all files in a given folder in an S3 bucket.
|
|
|
|
:param bucket_name: The name of the S3 bucket.
|
|
:param folder_name: The folder name within the S3 bucket.
|
|
:return: A list of file keys in the specified S3 folder.
|
|
"""
|
|
try:
|
|
s3 = boto3.client("s3")
|
|
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
|
|
|
|
if "Contents" not in response:
|
|
logger.info(
|
|
f"No files found in folder {folder_name} in bucket {bucket_name}."
|
|
)
|
|
return []
|
|
|
|
file_keys = [content["Key"] for content in response["Contents"]]
|
|
return file_keys
|
|
|
|
except NoCredentialsError:
|
|
logger.error("Credentials not available.")
|
|
return []
|
|
except PartialCredentialsError:
|
|
logger.error("Incomplete credentials provided.")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}"
|
|
)
|
|
return []
|
|
|
|
|
|
def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name):
|
|
"""
|
|
List all files and immediate subfolders in a given folder in an S3 bucket.
|
|
|
|
E.g. if we have a folder structure in S3 like this:
|
|
- folder1/
|
|
- file1.csv
|
|
- file2.csv
|
|
- subfolder1/
|
|
- file3.csv
|
|
|
|
Then calling list_files_and_subfolders_in_s3_folder(bucket_name='my-bucket', folder_name='folder1/')
|
|
would return ['folder1/file1.csv', 'folder1/file2.csv', 'folder1/subfolder1/'].
|
|
|
|
Namely, the nested files are not included in the list, only the immediate files and subfolders.
|
|
|
|
:param bucket_name: The name of the S3 bucket.
|
|
:param folder_name: The folder name within the S3 bucket.
|
|
:return: A list of file keys and subfolder prefixes in the specified S3 folder.
|
|
"""
|
|
|
|
# For this function, folder_name should end with a forward slash
|
|
if not folder_name.endswith("/"):
|
|
folder_name += "/"
|
|
|
|
try:
|
|
s3 = boto3.client("s3")
|
|
response = s3.list_objects_v2(
|
|
Bucket=bucket_name, Prefix=folder_name, Delimiter="/"
|
|
)
|
|
|
|
items = []
|
|
|
|
# Add files to the list
|
|
if "Contents" in response:
|
|
items.extend(
|
|
[
|
|
content["Key"]
|
|
for content in response["Contents"]
|
|
if content["Key"] != folder_name
|
|
]
|
|
)
|
|
|
|
# Add immediate subfolders to the list
|
|
if "CommonPrefixes" in response:
|
|
items.extend([prefix["Prefix"] for prefix in response["CommonPrefixes"]])
|
|
|
|
return items
|
|
|
|
except NoCredentialsError:
|
|
logger.error("Credentials not available.")
|
|
return []
|
|
except PartialCredentialsError:
|
|
logger.error("Incomplete credentials provided.")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}"
|
|
)
|
|
return []
|
|
|
|
|
|
def list_xmls_in_s3_folder(bucket_name, folder_name):
|
|
"""
|
|
List all XML files in a given folder in an S3 bucket.
|
|
|
|
:param bucket_name: The name of the S3 bucket.
|
|
:param folder_name: The folder name within the S3 bucket.
|
|
:return: A list of XML file keys in the specified S3 folder.
|
|
"""
|
|
try:
|
|
s3 = boto3.client("s3")
|
|
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
|
|
|
|
if "Contents" not in response:
|
|
logger.info(
|
|
f"No files found in folder {folder_name} in bucket {bucket_name}."
|
|
)
|
|
return []
|
|
|
|
# Filter XML files
|
|
xml_files = [
|
|
content["Key"]
|
|
for content in response["Contents"]
|
|
if content["Key"].endswith(".xml")
|
|
]
|
|
return xml_files
|
|
|
|
except NoCredentialsError:
|
|
logger.error("Credentials not available.")
|
|
return []
|
|
except PartialCredentialsError:
|
|
logger.error("Incomplete credentials provided.")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}"
|
|
)
|
|
return []
|
|
|
|
|
|
def upload_file_to_s3(file_path: str, bucket_name: str, file_key: str) -> None:
|
|
try:
|
|
s3 = boto3.resource("s3")
|
|
bucket = s3.Bucket(bucket_name)
|
|
bucket.upload_file(file_path, file_key)
|
|
logger.info(f"Uploaded {file_path} to s3://{bucket_name}/{file_key}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload {file_path} to S3: {e}")
|
|
raise
|