mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
typehint read_io_from_s3 signature to remove pylance problems in calling modules
This commit is contained in:
parent
e6c0feaf1c
commit
d1fb1a6d39
1 changed files with 73 additions and 46 deletions
119
utils/s3.py
119
utils/s3.py
|
|
@ -17,11 +17,11 @@ def read_from_s3(bucket_name, s3_file_name):
|
|||
:param s3_file_name: The file name to use for the saved data in S3
|
||||
"""
|
||||
# Initialize a session using Amazon S3
|
||||
s3 = boto3.resource('s3')
|
||||
s3 = boto3.resource("s3")
|
||||
|
||||
# Get the MessagePack data from S3
|
||||
obj = s3.Object(bucket_name, s3_file_name)
|
||||
data = obj.get()['Body'].read()
|
||||
data = obj.get()["Body"].read()
|
||||
|
||||
return data
|
||||
|
||||
|
|
@ -36,7 +36,7 @@ def save_data_to_s3(data, bucket_name, s3_file_name):
|
|||
"""
|
||||
# Ensure you have AWS credentials set up - either via environment variables, AWS CLI, or IAM roles
|
||||
try:
|
||||
s3 = boto3.client('s3')
|
||||
s3 = boto3.client("s3")
|
||||
except NoCredentialsError:
|
||||
print("Credentials not available.")
|
||||
return
|
||||
|
|
@ -46,12 +46,12 @@ def save_data_to_s3(data, bucket_name, s3_file_name):
|
|||
|
||||
try:
|
||||
s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data)
|
||||
print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}')
|
||||
print(f"Successfully uploaded data to {bucket_name}/{s3_file_name}")
|
||||
except Exception as e:
|
||||
print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}')
|
||||
print(f"Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}")
|
||||
|
||||
|
||||
def read_io_from_s3(bucket_name, file_key):
|
||||
def read_io_from_s3(bucket_name: str, file_key: str) -> BytesIO:
|
||||
"""
|
||||
Read a file from S3 into a BytesIO object. This can be used by other methods to parse the response
|
||||
|
||||
|
|
@ -61,13 +61,13 @@ def read_io_from_s3(bucket_name, file_key):
|
|||
:param file_key: The file name of the shapefile in S3
|
||||
:return: Io file to be parsed by another method
|
||||
"""
|
||||
client = boto3.client('s3')
|
||||
client = boto3.client("s3")
|
||||
|
||||
# Get the Parquet file from S3
|
||||
response = client.get_object(Bucket=bucket_name, Key=file_key)
|
||||
|
||||
# Read the file into an io object
|
||||
buffer = BytesIO(response['Body'].read())
|
||||
buffer = BytesIO(response["Body"].read())
|
||||
|
||||
return buffer
|
||||
|
||||
|
|
@ -86,7 +86,7 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
|
|||
df.to_parquet(parquet_buffer)
|
||||
|
||||
# Create the boto3 client
|
||||
client = boto3.client('s3')
|
||||
client = boto3.client("s3")
|
||||
|
||||
# Upload the Parquet file to S3
|
||||
client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue())
|
||||
|
|
@ -102,15 +102,14 @@ def read_dataframe_from_s3_parquet(bucket_name, file_key):
|
|||
"""
|
||||
|
||||
if bucket_name is None:
|
||||
raise ValueError("Bucket name is None when trying to read dataframe from parquet")
|
||||
raise ValueError(
|
||||
"Bucket name is None when trying to read dataframe from parquet"
|
||||
)
|
||||
|
||||
if not file_key.endswith(".parquet"):
|
||||
raise ValueError("This file doesn't look like a parquet file")
|
||||
|
||||
parquet_buffer = read_io_from_s3(
|
||||
bucket_name=bucket_name,
|
||||
file_key=file_key
|
||||
)
|
||||
parquet_buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key)
|
||||
|
||||
df = pd.read_parquet(parquet_buffer)
|
||||
|
||||
|
|
@ -130,7 +129,7 @@ def save_csv_to_s3(dataframe, bucket_name, file_name):
|
|||
bool: True if the file was successfully saved, False otherwise.
|
||||
"""
|
||||
# Initialize S3 client
|
||||
s3 = boto3.client('s3')
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
# Create an in-memory text stream
|
||||
csv_buffer = StringIO()
|
||||
|
|
@ -159,7 +158,7 @@ def save_pickle_to_s3(data, bucket_name, s3_file_name):
|
|||
try:
|
||||
serialized_data = pickle.dumps(data)
|
||||
except Exception as e:
|
||||
print(f'Failed to serialize data: {str(e)}')
|
||||
print(f"Failed to serialize data: {str(e)}")
|
||||
return
|
||||
|
||||
# Use save_data_to_s3 function to upload the serialized data to S3
|
||||
|
|
@ -175,9 +174,9 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
|
|||
:return: The data read from the pickle file
|
||||
"""
|
||||
try:
|
||||
s3 = boto3.client('s3')
|
||||
s3 = boto3.client("s3")
|
||||
s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name)
|
||||
serialized_data = s3_response['Body'].read()
|
||||
serialized_data = s3_response["Body"].read()
|
||||
except NoCredentialsError:
|
||||
logger.errpr("Credentials not available.")
|
||||
return None
|
||||
|
|
@ -185,20 +184,24 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
|
|||
logger.errpr("Incomplete credentials provided.")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}')
|
||||
logger.error(
|
||||
f"Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Deserialize data from pickle format
|
||||
try:
|
||||
data = pickle.loads(serialized_data)
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to deserialize data: {str(e)}')
|
||||
logger.error(f"Failed to deserialize data: {str(e)}")
|
||||
return None
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None):
|
||||
def read_excel_from_s3(
|
||||
bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None
|
||||
):
|
||||
"""
|
||||
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
|
||||
|
||||
|
|
@ -222,7 +225,7 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True, shee
|
|||
|
||||
# Drop columns where all values are NaN
|
||||
if drop_all_na:
|
||||
df.dropna(axis=1, how='all', inplace=True)
|
||||
df.dropna(axis=1, how="all", inplace=True)
|
||||
|
||||
# Reset index if the first column is just an index or entirely NaN
|
||||
df.reset_index(drop=True, inplace=True)
|
||||
|
|
@ -254,7 +257,7 @@ def save_excel_to_s3(df, bucket_name, file_key):
|
|||
|
||||
# Initialize a session using boto3
|
||||
session = boto3.session.Session()
|
||||
s3 = session.resource('s3')
|
||||
s3 = session.resource("s3")
|
||||
|
||||
# Upload the Excel file from the buffer to S3
|
||||
bucket = s3.Bucket(bucket_name)
|
||||
|
|
@ -264,17 +267,19 @@ def save_excel_to_s3(df, bucket_name, file_key):
|
|||
|
||||
|
||||
def read_csv_from_s3(bucket_name, filepath):
|
||||
logger.info(f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'")
|
||||
s3 = boto3.client('s3')
|
||||
logger.info(
|
||||
f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'"
|
||||
)
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
# Get the object from s3
|
||||
s3_object = s3.get_object(Bucket=bucket_name, Key=filepath)
|
||||
|
||||
# Read the CSV body from the s3 object
|
||||
body = s3_object['Body'].read()
|
||||
body = s3_object["Body"].read()
|
||||
|
||||
# Use StringIO to create a file-like object from the string
|
||||
csv_data = StringIO(body.decode('utf-8'))
|
||||
csv_data = StringIO(body.decode("utf-8"))
|
||||
|
||||
# Use csv library to read it into a list of dictionaries
|
||||
reader = csv.DictReader(csv_data)
|
||||
|
|
@ -292,14 +297,16 @@ def list_files_in_s3_folder(bucket_name, folder_name):
|
|||
:return: A list of file keys in the specified S3 folder.
|
||||
"""
|
||||
try:
|
||||
s3 = boto3.client('s3')
|
||||
s3 = boto3.client("s3")
|
||||
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
|
||||
|
||||
if 'Contents' not in response:
|
||||
logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.")
|
||||
if "Contents" not in response:
|
||||
logger.info(
|
||||
f"No files found in folder {folder_name} in bucket {bucket_name}."
|
||||
)
|
||||
return []
|
||||
|
||||
file_keys = [content['Key'] for content in response['Contents']]
|
||||
file_keys = [content["Key"] for content in response["Contents"]]
|
||||
return file_keys
|
||||
|
||||
except NoCredentialsError:
|
||||
|
|
@ -309,7 +316,9 @@ def list_files_in_s3_folder(bucket_name, folder_name):
|
|||
logger.error("Incomplete credentials provided.")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
|
||||
logger.error(
|
||||
f"Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}"
|
||||
)
|
||||
return []
|
||||
|
||||
|
||||
|
|
@ -335,22 +344,30 @@ def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name):
|
|||
"""
|
||||
|
||||
# For this function, folder_name should end with a forward slash
|
||||
if not folder_name.endswith('/'):
|
||||
folder_name += '/'
|
||||
if not folder_name.endswith("/"):
|
||||
folder_name += "/"
|
||||
|
||||
try:
|
||||
s3 = boto3.client('s3')
|
||||
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name, Delimiter='/')
|
||||
s3 = boto3.client("s3")
|
||||
response = s3.list_objects_v2(
|
||||
Bucket=bucket_name, Prefix=folder_name, Delimiter="/"
|
||||
)
|
||||
|
||||
items = []
|
||||
|
||||
# Add files to the list
|
||||
if 'Contents' in response:
|
||||
items.extend([content['Key'] for content in response['Contents'] if content['Key'] != folder_name])
|
||||
if "Contents" in response:
|
||||
items.extend(
|
||||
[
|
||||
content["Key"]
|
||||
for content in response["Contents"]
|
||||
if content["Key"] != folder_name
|
||||
]
|
||||
)
|
||||
|
||||
# Add immediate subfolders to the list
|
||||
if 'CommonPrefixes' in response:
|
||||
items.extend([prefix['Prefix'] for prefix in response['CommonPrefixes']])
|
||||
if "CommonPrefixes" in response:
|
||||
items.extend([prefix["Prefix"] for prefix in response["CommonPrefixes"]])
|
||||
|
||||
return items
|
||||
|
||||
|
|
@ -361,7 +378,9 @@ def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name):
|
|||
logger.error("Incomplete credentials provided.")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}')
|
||||
logger.error(
|
||||
f"Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}"
|
||||
)
|
||||
return []
|
||||
|
||||
|
||||
|
|
@ -374,15 +393,21 @@ def list_xmls_in_s3_folder(bucket_name, folder_name):
|
|||
:return: A list of XML file keys in the specified S3 folder.
|
||||
"""
|
||||
try:
|
||||
s3 = boto3.client('s3')
|
||||
s3 = boto3.client("s3")
|
||||
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
|
||||
|
||||
if 'Contents' not in response:
|
||||
logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.")
|
||||
if "Contents" not in response:
|
||||
logger.info(
|
||||
f"No files found in folder {folder_name} in bucket {bucket_name}."
|
||||
)
|
||||
return []
|
||||
|
||||
# Filter XML files
|
||||
xml_files = [content['Key'] for content in response['Contents'] if content['Key'].endswith('.xml')]
|
||||
xml_files = [
|
||||
content["Key"]
|
||||
for content in response["Contents"]
|
||||
if content["Key"].endswith(".xml")
|
||||
]
|
||||
return xml_files
|
||||
|
||||
except NoCredentialsError:
|
||||
|
|
@ -392,5 +417,7 @@ def list_xmls_in_s3_folder(bucket_name, folder_name):
|
|||
logger.error("Incomplete credentials provided.")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
|
||||
logger.error(
|
||||
f"Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}"
|
||||
)
|
||||
return []
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue