From d1fb1a6d39a9457f3944442b981b77fd4fccc2c0 Mon Sep 17 00:00:00 2001 From: Daniel Roth Date: Mon, 16 Feb 2026 09:45:26 +0000 Subject: [PATCH] typehint read_io_from_s3 signature to remove pylance problems in calling modules --- utils/s3.py | 119 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 46 deletions(-) diff --git a/utils/s3.py b/utils/s3.py index 2e67d4f0..b243b2ab 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -17,11 +17,11 @@ def read_from_s3(bucket_name, s3_file_name): :param s3_file_name: The file name to use for the saved data in S3 """ # Initialize a session using Amazon S3 - s3 = boto3.resource('s3') + s3 = boto3.resource("s3") # Get the MessagePack data from S3 obj = s3.Object(bucket_name, s3_file_name) - data = obj.get()['Body'].read() + data = obj.get()["Body"].read() return data @@ -36,7 +36,7 @@ def save_data_to_s3(data, bucket_name, s3_file_name): """ # Ensure you have AWS credentials set up - either via environment variables, AWS CLI, or IAM roles try: - s3 = boto3.client('s3') + s3 = boto3.client("s3") except NoCredentialsError: print("Credentials not available.") return @@ -46,12 +46,12 @@ def save_data_to_s3(data, bucket_name, s3_file_name): try: s3.put_object(Bucket=bucket_name, Key=s3_file_name, Body=data) - print(f'Successfully uploaded data to {bucket_name}/{s3_file_name}') + print(f"Successfully uploaded data to {bucket_name}/{s3_file_name}") except Exception as e: - print(f'Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}') + print(f"Failed to upload data to {bucket_name}/{s3_file_name}: {str(e)}") -def read_io_from_s3(bucket_name, file_key): +def read_io_from_s3(bucket_name: str, file_key: str) -> BytesIO: """ Read a file from S3 into a BytesIO object. This can be used by other methods to parse the response @@ -61,13 +61,13 @@ def read_io_from_s3(bucket_name, file_key): :param file_key: The file name of the shapefile in S3 :return: Io file to be parsed by another method """ - client = boto3.client('s3') + client = boto3.client("s3") # Get the Parquet file from S3 response = client.get_object(Bucket=bucket_name, Key=file_key) # Read the file into an io object - buffer = BytesIO(response['Body'].read()) + buffer = BytesIO(response["Body"].read()) return buffer @@ -86,7 +86,7 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key): df.to_parquet(parquet_buffer) # Create the boto3 client - client = boto3.client('s3') + client = boto3.client("s3") # Upload the Parquet file to S3 client.put_object(Bucket=bucket_name, Key=file_key, Body=parquet_buffer.getvalue()) @@ -102,15 +102,14 @@ def read_dataframe_from_s3_parquet(bucket_name, file_key): """ if bucket_name is None: - raise ValueError("Bucket name is None when trying to read dataframe from parquet") + raise ValueError( + "Bucket name is None when trying to read dataframe from parquet" + ) if not file_key.endswith(".parquet"): raise ValueError("This file doesn't look like a parquet file") - parquet_buffer = read_io_from_s3( - bucket_name=bucket_name, - file_key=file_key - ) + parquet_buffer = read_io_from_s3(bucket_name=bucket_name, file_key=file_key) df = pd.read_parquet(parquet_buffer) @@ -130,7 +129,7 @@ def save_csv_to_s3(dataframe, bucket_name, file_name): bool: True if the file was successfully saved, False otherwise. """ # Initialize S3 client - s3 = boto3.client('s3') + s3 = boto3.client("s3") # Create an in-memory text stream csv_buffer = StringIO() @@ -159,7 +158,7 @@ def save_pickle_to_s3(data, bucket_name, s3_file_name): try: serialized_data = pickle.dumps(data) except Exception as e: - print(f'Failed to serialize data: {str(e)}') + print(f"Failed to serialize data: {str(e)}") return # Use save_data_to_s3 function to upload the serialized data to S3 @@ -175,9 +174,9 @@ def read_pickle_from_s3(bucket_name, s3_file_name): :return: The data read from the pickle file """ try: - s3 = boto3.client('s3') + s3 = boto3.client("s3") s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name) - serialized_data = s3_response['Body'].read() + serialized_data = s3_response["Body"].read() except NoCredentialsError: logger.errpr("Credentials not available.") return None @@ -185,20 +184,24 @@ def read_pickle_from_s3(bucket_name, s3_file_name): logger.errpr("Incomplete credentials provided.") return None except Exception as e: - logger.error(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}') + logger.error( + f"Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}" + ) return None # Deserialize data from pickle format try: data = pickle.loads(serialized_data) except Exception as e: - logger.error(f'Failed to deserialize data: {str(e)}') + logger.error(f"Failed to deserialize data: {str(e)}") return None return data -def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None): +def read_excel_from_s3( + bucket_name, file_key, header_row, drop_all_na=True, sheet_name=None +): """ Read an Excel file from an S3 bucket and return it as a pandas DataFrame. @@ -222,7 +225,7 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True, shee # Drop columns where all values are NaN if drop_all_na: - df.dropna(axis=1, how='all', inplace=True) + df.dropna(axis=1, how="all", inplace=True) # Reset index if the first column is just an index or entirely NaN df.reset_index(drop=True, inplace=True) @@ -254,7 +257,7 @@ def save_excel_to_s3(df, bucket_name, file_key): # Initialize a session using boto3 session = boto3.session.Session() - s3 = session.resource('s3') + s3 = session.resource("s3") # Upload the Excel file from the buffer to S3 bucket = s3.Bucket(bucket_name) @@ -264,17 +267,19 @@ def save_excel_to_s3(df, bucket_name, file_key): def read_csv_from_s3(bucket_name, filepath): - logger.info(f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'") - s3 = boto3.client('s3') + logger.info( + f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'" + ) + s3 = boto3.client("s3") # Get the object from s3 s3_object = s3.get_object(Bucket=bucket_name, Key=filepath) # Read the CSV body from the s3 object - body = s3_object['Body'].read() + body = s3_object["Body"].read() # Use StringIO to create a file-like object from the string - csv_data = StringIO(body.decode('utf-8')) + csv_data = StringIO(body.decode("utf-8")) # Use csv library to read it into a list of dictionaries reader = csv.DictReader(csv_data) @@ -292,14 +297,16 @@ def list_files_in_s3_folder(bucket_name, folder_name): :return: A list of file keys in the specified S3 folder. """ try: - s3 = boto3.client('s3') + s3 = boto3.client("s3") response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) - if 'Contents' not in response: - logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.") + if "Contents" not in response: + logger.info( + f"No files found in folder {folder_name} in bucket {bucket_name}." + ) return [] - file_keys = [content['Key'] for content in response['Contents']] + file_keys = [content["Key"] for content in response["Contents"]] return file_keys except NoCredentialsError: @@ -309,7 +316,9 @@ def list_files_in_s3_folder(bucket_name, folder_name): logger.error("Incomplete credentials provided.") return [] except Exception as e: - logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}') + logger.error( + f"Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}" + ) return [] @@ -335,22 +344,30 @@ def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name): """ # For this function, folder_name should end with a forward slash - if not folder_name.endswith('/'): - folder_name += '/' + if not folder_name.endswith("/"): + folder_name += "/" try: - s3 = boto3.client('s3') - response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name, Delimiter='/') + s3 = boto3.client("s3") + response = s3.list_objects_v2( + Bucket=bucket_name, Prefix=folder_name, Delimiter="/" + ) items = [] # Add files to the list - if 'Contents' in response: - items.extend([content['Key'] for content in response['Contents'] if content['Key'] != folder_name]) + if "Contents" in response: + items.extend( + [ + content["Key"] + for content in response["Contents"] + if content["Key"] != folder_name + ] + ) # Add immediate subfolders to the list - if 'CommonPrefixes' in response: - items.extend([prefix['Prefix'] for prefix in response['CommonPrefixes']]) + if "CommonPrefixes" in response: + items.extend([prefix["Prefix"] for prefix in response["CommonPrefixes"]]) return items @@ -361,7 +378,9 @@ def list_files_and_subfolders_in_s3_folder(bucket_name, folder_name): logger.error("Incomplete credentials provided.") return [] except Exception as e: - logger.error(f'Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}') + logger.error( + f"Failed to list files and subfolders in folder {folder_name} in bucket {bucket_name}: {str(e)}" + ) return [] @@ -374,15 +393,21 @@ def list_xmls_in_s3_folder(bucket_name, folder_name): :return: A list of XML file keys in the specified S3 folder. """ try: - s3 = boto3.client('s3') + s3 = boto3.client("s3") response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) - if 'Contents' not in response: - logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.") + if "Contents" not in response: + logger.info( + f"No files found in folder {folder_name} in bucket {bucket_name}." + ) return [] # Filter XML files - xml_files = [content['Key'] for content in response['Contents'] if content['Key'].endswith('.xml')] + xml_files = [ + content["Key"] + for content in response["Contents"] + if content["Key"].endswith(".xml") + ] return xml_files except NoCredentialsError: @@ -392,5 +417,7 @@ def list_xmls_in_s3_folder(bucket_name, folder_name): logger.error("Incomplete credentials provided.") return [] except Exception as e: - logger.error(f'Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}') + logger.error( + f"Failed to list XML files in folder {folder_name} in bucket {bucket_name}: {str(e)}" + ) return []