slice 14d: build_features wires bulk reader -> mapper -> EpcMlTransform

ijson use_float fixes Decimal/float coercion when streaming JSON.
pyright extraPaths so the new pkg type-checks against domna-domain.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-16 18:38:41 +00:00
parent 0ff9d546b8
commit 20fd55d5a1
5 changed files with 136 additions and 1 deletions

View file

@ -2,6 +2,10 @@
"typeCheckingMode": "strict",
"venvPath": "/Users/khalimconn-kowlessar/opt/anaconda3/envs/",
"venv": "Fastapi-backend",
"extraPaths": [
"packages/domain/src",
"services/ml_training_data/src"
],
"include": [
"."
]

View file

@ -4,11 +4,15 @@ version = "0.1.0"
description = "Pipeline that turns the EPC open-data CSV into ML training parquet + baseline models."
requires-python = ">=3.11"
dependencies = [
"domna-domain",
"pandas>=2.0",
"pandas-stubs",
"ijson>=3.2",
]
[tool.uv.sources]
domna-domain = { workspace = true }
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View file

@ -0,0 +1,40 @@
"""JSON certs -> EpcPropertyData -> feature+target DataFrame.
Wires three pieces from elsewhere in the repo:
- BulkZipReader streams raw cert dicts out of the gov bulk ZIP.
- EpcPropertyDataMapper.from_api_response parses each cert into EpcPropertyData.
- EpcMlTransform.to_rows produces the ML feature+target DataFrame.
A leading `certificate_number` column is prepended so every row stays traceable
back to its source cert through the rest of the pipeline (parquet, training).
"""
import pandas as pd
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.ml.transform import EpcMlTransform
from ml_training_data.bulk_zip_reader import BulkZipReader
def build_features(
bulk_reader: BulkZipReader,
certificate_numbers: set[str],
*,
skip_unsupported_schemas: bool = True,
) -> pd.DataFrame:
transform = EpcMlTransform()
properties: list[EpcPropertyData] = []
cert_nums: list[str] = []
for cert in bulk_reader.iter_certificates_filtered(certificate_numbers):
try:
prop = EpcPropertyDataMapper.from_api_response(cert)
except ValueError:
if skip_unsupported_schemas:
continue
raise
properties.append(prop)
cert_nums.append(str(cert["certificate_number"]))
df = transform.to_rows(properties)
df["certificate_number"] = cert_nums
return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]]

View file

@ -27,7 +27,7 @@ class BulkZipReader:
def iter_certificates(self, entry: str) -> Iterator[dict[str, Any]]:
with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf:
with zf.open(entry) as f:
for item in ijson.items(f, "item"):
for item in ijson.items(f, "item", use_float=True):
yield cast(dict[str, Any], item)
def iter_certificates_filtered(

View file

@ -0,0 +1,87 @@
"""Tests for build_features() — JSON cert → EpcPropertyData → feature row.
build_features wires three existing pieces: BulkZipReader yields JSON cert dicts,
EpcPropertyDataMapper.from_api_response parses them into EpcPropertyData, and
EpcMlTransform.to_rows produces the feature+target DataFrame. The function adds a
leading `certificate_number` column so each row stays traceable to its source cert.
"""
import io
import json
import zipfile
from pathlib import Path
import pytest
from ml_training_data.bulk_zip_reader import BulkZipReader
from ml_training_data.build_features import build_features
from ml_training_data.storage import LocalStorage
_FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json")
def _load_fixture_cert(cert_number: str) -> dict[str, object]:
cert = json.loads(_FIXTURE_PATH.read_text())
cert["certificate_number"] = cert_number
return cert
def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry_name, payload in entries.items():
zf.writestr(entry_name, json.dumps(payload))
storage.write_bytes(key, buf.getvalue())
def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
cert = _load_fixture_cert("CERT-001")
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [cert]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
df = build_features(reader, certificate_numbers={"CERT-001"})
# Assert
assert len(df) == 1
assert df.iloc[0]["certificate_number"] == "CERT-001"
assert "certificate_number" == df.columns[0]
def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
supported = _load_fixture_cert("OK-1")
unsupported = _load_fixture_cert("BAD-1")
unsupported["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch
_write_zip(
storage,
"bulk.zip",
{"certificates-2024.json": [supported, unsupported]},
)
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act
df = build_features(reader, certificate_numbers={"OK-1", "BAD-1"})
# Assert
assert df["certificate_number"].tolist() == ["OK-1"]
def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None:
# Arrange
storage = LocalStorage(root=tmp_path)
unsupported = _load_fixture_cert("BAD-1")
unsupported["schema_type"] = "RdSAP-Schema-19.0.0"
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]})
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
# Act / Assert
with pytest.raises(ValueError, match="Unsupported EPC schema"):
build_features(
reader,
certificate_numbers={"BAD-1"},
skip_unsupported_schemas=False,
)