Model/infrastructure/csv_s3_client.py
2026-05-20 13:21:11 +00:00

28 lines
1 KiB
Python

import csv
from io import StringIO
from infrastructure.s3_client import S3Client
from infrastructure.s3_uri import parse_s3_uri
class CsvS3Client(S3Client):
def read_rows(self, s3_uri: str) -> list[dict[str, str]]:
bucket, key = parse_s3_uri(s3_uri)
if bucket != self.bucket:
raise ValueError(
f"s3_uri bucket {bucket!r} does not match client bucket {self.bucket!r}"
)
raw = self.get_object(key)
text = raw.decode("utf-8-sig")
reader = csv.DictReader(StringIO(text))
return [dict(row) for row in reader]
def save_rows(self, rows: list[dict[str, str]], key: str) -> str:
if not rows:
raise ValueError("Cannot save an empty rows list: header is unknown")
buffer = StringIO()
fieldnames = list(rows[0].keys())
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return self.put_object(key, buffer.getvalue().encode("utf-8"))