mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
46 lines
1.8 KiB
Python
46 lines
1.8 KiB
Python
import csv
|
|
from io import StringIO
|
|
|
|
from infrastructure.s3_client import S3Client
|
|
from infrastructure.s3_uri import parse_s3_uri
|
|
|
|
|
|
class CsvS3Client(S3Client):
|
|
""":class:`S3Client` subclass that round-trips CSV row dictionaries.
|
|
|
|
Rows are represented as ``list[dict[str, str]]`` — the same shape used by
|
|
:func:`csv.DictReader`/``DictWriter`` — which keeps the API trivially
|
|
compatible with existing CSV helpers in ``utils/s3.py``.
|
|
"""
|
|
|
|
def read_rows(self, s3_uri: str) -> list[dict[str, str]]:
|
|
"""Fetch the object at ``s3_uri`` and decode it as a CSV.
|
|
|
|
The bucket portion of the URI is validated against this client's
|
|
configured bucket so cross-bucket reads fail loudly rather than
|
|
silently fetching from the wrong place.
|
|
"""
|
|
bucket, key = parse_s3_uri(s3_uri)
|
|
if bucket != self.bucket:
|
|
raise ValueError(
|
|
f"s3_uri bucket {bucket!r} does not match client bucket {self.bucket!r}"
|
|
)
|
|
raw = self.get_object(key)
|
|
text = raw.decode("utf-8-sig")
|
|
reader = csv.DictReader(StringIO(text))
|
|
return [dict(row) for row in reader]
|
|
|
|
def save_rows(self, rows: list[dict[str, str]], key: str) -> str:
|
|
"""Serialise ``rows`` to CSV under ``key`` and return the ``s3://`` URI.
|
|
|
|
An empty ``rows`` list is rejected because we cannot otherwise infer
|
|
a header row.
|
|
"""
|
|
if not rows:
|
|
raise ValueError("Cannot save an empty rows list: header is unknown")
|
|
buffer = StringIO()
|
|
fieldnames = list(rows[0].keys())
|
|
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
return self.put_object(key, buffer.getvalue().encode("utf-8"))
|