slice 14b: Storage protocol + LocalStorage impl

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-16 17:52:54 +00:00
parent eb42cb88a1
commit 7a6c8b4f24
2 changed files with 121 additions and 0 deletions

View file

@ -0,0 +1,45 @@
"""Storage protocol + LocalStorage impl for the training-data pipeline.
The protocol is the seam between local-dev (filesystem, gitignored ./data/) and a
future S3-backed implementation. Downstream stages depend only on the protocol so
the swap is a constructor change, not a callsite rewrite.
"""
from collections.abc import Iterator
from pathlib import Path
from typing import Protocol
class Storage(Protocol):
def write_bytes(self, key: str, data: bytes) -> None: ...
def read_bytes(self, key: str) -> bytes: ...
def exists(self, key: str) -> bool: ...
def iter_keys(self, prefix: str = "") -> Iterator[str]: ...
class LocalStorage:
def __init__(self, root: Path) -> None:
self._root = root
def _path(self, key: str) -> Path:
return self._root / key
def write_bytes(self, key: str, data: bytes) -> None:
target = self._path(key)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
def read_bytes(self, key: str) -> bytes:
return self._path(key).read_bytes()
def exists(self, key: str) -> bool:
return self._path(key).exists()
def iter_keys(self, prefix: str = "") -> Iterator[str]:
if not self._root.exists():
return
for p in self._root.rglob("*"):
if p.is_file():
key = p.relative_to(self._root).as_posix()
if key.startswith(prefix):
yield key

View file

@ -0,0 +1,76 @@
"""Tests for LocalStorage — fs-backed Storage protocol for the training pipeline.
Storage is the swap-point between local-dev (LocalStorage rooted at ./data/) and the
eventual S3-backed impl. Downstream stages (bulk_fetch, write_parquet) talk to the
Storage protocol only, not Path.
"""
from pathlib import Path
import pytest
from ml_training_data.storage import LocalStorage
def test_write_bytes_then_read_bytes_returns_same_data(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
payload = b"hello world"
# Act
storage.write_bytes("greetings/hello.txt", payload)
out = storage.read_bytes("greetings/hello.txt")
# Assert
assert out == payload
def test_exists_is_false_before_write_and_true_after(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
# Act
before = storage.exists("a/b.bin")
storage.write_bytes("a/b.bin", b"x")
after = storage.exists("a/b.bin")
# Assert
assert before is False
assert after is True
def test_iter_keys_yields_every_written_key(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
storage.write_bytes("certs/a.json", b"1")
storage.write_bytes("certs/b.json", b"2")
storage.write_bytes("manifest.json", b"3")
# Act
keys = sorted(storage.iter_keys())
# Assert
assert keys == ["certs/a.json", "certs/b.json", "manifest.json"]
def test_iter_keys_filters_by_prefix(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
storage.write_bytes("certs/a.json", b"1")
storage.write_bytes("certs/b.json", b"2")
storage.write_bytes("manifest.json", b"3")
# Act
keys = sorted(storage.iter_keys(prefix="certs/"))
# Assert
assert keys == ["certs/a.json", "certs/b.json"]
def test_read_bytes_raises_filenotfound_for_missing_key(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
# Act / Assert
with pytest.raises(FileNotFoundError):
storage.read_bytes("nope.bin")