From 20fd55d5a16effe86ec3e40269038778951f5f15 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Sat, 16 May 2026 18:38:41 +0000 Subject: [PATCH] slice 14d: build_features wires bulk reader -> mapper -> EpcMlTransform ijson use_float fixes Decimal/float coercion when streaming JSON. pyright extraPaths so the new pkg type-checks against domna-domain. Co-Authored-By: Claude Opus 4.7 --- pyrightconfig.json | 4 + services/ml_training_data/pyproject.toml | 4 + .../src/ml_training_data/build_features.py | 40 +++++++++ .../src/ml_training_data/bulk_zip_reader.py | 2 +- .../tests/unit/test_build_features.py | 87 +++++++++++++++++++ 5 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 services/ml_training_data/src/ml_training_data/build_features.py create mode 100644 services/ml_training_data/tests/unit/test_build_features.py diff --git a/pyrightconfig.json b/pyrightconfig.json index d4e0e2a4..13f63df3 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -2,6 +2,10 @@ "typeCheckingMode": "strict", "venvPath": "/Users/khalimconn-kowlessar/opt/anaconda3/envs/", "venv": "Fastapi-backend", + "extraPaths": [ + "packages/domain/src", + "services/ml_training_data/src" + ], "include": [ "." ] diff --git a/services/ml_training_data/pyproject.toml b/services/ml_training_data/pyproject.toml index 4519fd93..c1de1b5e 100644 --- a/services/ml_training_data/pyproject.toml +++ b/services/ml_training_data/pyproject.toml @@ -4,11 +4,15 @@ version = "0.1.0" description = "Pipeline that turns the EPC open-data CSV into ML training parquet + baseline models." requires-python = ">=3.11" dependencies = [ + "domna-domain", "pandas>=2.0", "pandas-stubs", "ijson>=3.2", ] +[tool.uv.sources] +domna-domain = { workspace = true } + [build-system] requires = ["hatchling"] build-backend = "hatchling.build" diff --git a/services/ml_training_data/src/ml_training_data/build_features.py b/services/ml_training_data/src/ml_training_data/build_features.py new file mode 100644 index 00000000..98678c75 --- /dev/null +++ b/services/ml_training_data/src/ml_training_data/build_features.py @@ -0,0 +1,40 @@ +"""JSON certs -> EpcPropertyData -> feature+target DataFrame. + +Wires three pieces from elsewhere in the repo: + - BulkZipReader streams raw cert dicts out of the gov bulk ZIP. + - EpcPropertyDataMapper.from_api_response parses each cert into EpcPropertyData. + - EpcMlTransform.to_rows produces the ML feature+target DataFrame. + +A leading `certificate_number` column is prepended so every row stays traceable +back to its source cert through the rest of the pipeline (parquet, training). +""" + +import pandas as pd + +from datatypes.epc.domain.mapper import EpcPropertyDataMapper +from datatypes.epc.domain.epc_property_data import EpcPropertyData +from domain.ml.transform import EpcMlTransform +from ml_training_data.bulk_zip_reader import BulkZipReader + + +def build_features( + bulk_reader: BulkZipReader, + certificate_numbers: set[str], + *, + skip_unsupported_schemas: bool = True, +) -> pd.DataFrame: + transform = EpcMlTransform() + properties: list[EpcPropertyData] = [] + cert_nums: list[str] = [] + for cert in bulk_reader.iter_certificates_filtered(certificate_numbers): + try: + prop = EpcPropertyDataMapper.from_api_response(cert) + except ValueError: + if skip_unsupported_schemas: + continue + raise + properties.append(prop) + cert_nums.append(str(cert["certificate_number"])) + df = transform.to_rows(properties) + df["certificate_number"] = cert_nums + return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]] diff --git a/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py b/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py index 0790d172..267f77e5 100644 --- a/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py +++ b/services/ml_training_data/src/ml_training_data/bulk_zip_reader.py @@ -27,7 +27,7 @@ class BulkZipReader: def iter_certificates(self, entry: str) -> Iterator[dict[str, Any]]: with zipfile.ZipFile(self._storage.open_read(self._zip_key)) as zf: with zf.open(entry) as f: - for item in ijson.items(f, "item"): + for item in ijson.items(f, "item", use_float=True): yield cast(dict[str, Any], item) def iter_certificates_filtered( diff --git a/services/ml_training_data/tests/unit/test_build_features.py b/services/ml_training_data/tests/unit/test_build_features.py new file mode 100644 index 00000000..1806666b --- /dev/null +++ b/services/ml_training_data/tests/unit/test_build_features.py @@ -0,0 +1,87 @@ +"""Tests for build_features() — JSON cert → EpcPropertyData → feature row. + +build_features wires three existing pieces: BulkZipReader yields JSON cert dicts, +EpcPropertyDataMapper.from_api_response parses them into EpcPropertyData, and +EpcMlTransform.to_rows produces the feature+target DataFrame. The function adds a +leading `certificate_number` column so each row stays traceable to its source cert. +""" + +import io +import json +import zipfile +from pathlib import Path + +import pytest + +from ml_training_data.bulk_zip_reader import BulkZipReader +from ml_training_data.build_features import build_features +from ml_training_data.storage import LocalStorage + +_FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json") + + +def _load_fixture_cert(cert_number: str) -> dict[str, object]: + cert = json.loads(_FIXTURE_PATH.read_text()) + cert["certificate_number"] = cert_number + return cert + + +def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None: + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: + for entry_name, payload in entries.items(): + zf.writestr(entry_name, json.dumps(payload)) + storage.write_bytes(key, buf.getvalue()) + + +def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + cert = _load_fixture_cert("CERT-001") + _write_zip(storage, "bulk.zip", {"certificates-2024.json": [cert]}) + reader = BulkZipReader(storage=storage, zip_key="bulk.zip") + + # Act + df = build_features(reader, certificate_numbers={"CERT-001"}) + + # Assert + assert len(df) == 1 + assert df.iloc[0]["certificate_number"] == "CERT-001" + assert "certificate_number" == df.columns[0] + + +def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + supported = _load_fixture_cert("OK-1") + unsupported = _load_fixture_cert("BAD-1") + unsupported["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch + _write_zip( + storage, + "bulk.zip", + {"certificates-2024.json": [supported, unsupported]}, + ) + reader = BulkZipReader(storage=storage, zip_key="bulk.zip") + + # Act + df = build_features(reader, certificate_numbers={"OK-1", "BAD-1"}) + + # Assert + assert df["certificate_number"].tolist() == ["OK-1"] + + +def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + unsupported = _load_fixture_cert("BAD-1") + unsupported["schema_type"] = "RdSAP-Schema-19.0.0" + _write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]}) + reader = BulkZipReader(storage=storage, zip_key="bulk.zip") + + # Act / Assert + with pytest.raises(ValueError, match="Unsupported EPC schema"): + build_features( + reader, + certificate_numbers={"BAD-1"}, + skip_unsupported_schemas=False, + )