mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Bulk entries are NDJSON of wrapper records, not a JSON array. Each wrapper carries certificate_number, assessment_type, and a stringified document with the actual EPC schema payload. Filter to RdSAP, unwrap document, then map. remote_bulk_fetcher: per-entry presigned-URL refresh (30s S3 TTL). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
110 lines
3.9 KiB
Python
110 lines
3.9 KiB
Python
"""Tests for build_features() — wrapper record → EpcPropertyData → feature row.
|
|
|
|
build_features wires three existing pieces: BulkZipReader yields wrapper records
|
|
out of the bulk ZIP, EpcPropertyDataMapper.from_api_response parses the
|
|
JSON-encoded `document` payload into EpcPropertyData, and EpcMlTransform.to_rows
|
|
produces the feature+target DataFrame. The function adds a leading
|
|
`certificate_number` column so each row stays traceable to its source cert.
|
|
"""
|
|
|
|
import io
|
|
import json
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from ml_training_data.bulk_zip_reader import BulkZipReader
|
|
from ml_training_data.build_features import build_features
|
|
from ml_training_data.storage import LocalStorage
|
|
|
|
_FIXTURE_PATH = Path("datatypes/epc/schema/tests/fixtures/21_0_0.json")
|
|
|
|
|
|
def _wrapper(cert_number: str, *, assessment_type: str = "RdSAP", document: str | None = None) -> dict[str, object]:
|
|
if document is None:
|
|
document = _FIXTURE_PATH.read_text()
|
|
return {
|
|
"certificate_number": cert_number,
|
|
"assessment_type": assessment_type,
|
|
"document": document,
|
|
}
|
|
|
|
|
|
def _write_zip(storage: LocalStorage, key: str, entries: dict[str, list[dict[str, object]]]) -> None:
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for entry_name, records in entries.items():
|
|
ndjson = "\n".join(json.dumps(r) for r in records)
|
|
zf.writestr(entry_name, ndjson)
|
|
storage.write_bytes(key, buf.getvalue())
|
|
|
|
|
|
def test_build_features_returns_one_row_per_supported_cert(tmp_path: Path) -> None:
|
|
# Arrange
|
|
storage = LocalStorage(root=tmp_path)
|
|
record = _wrapper("CERT-001")
|
|
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [record]})
|
|
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
|
|
|
|
# Act
|
|
df = build_features(reader, certificate_numbers={"CERT-001"})
|
|
|
|
# Assert
|
|
assert len(df) == 1
|
|
assert df.iloc[0]["certificate_number"] == "CERT-001"
|
|
assert "certificate_number" == df.columns[0]
|
|
|
|
|
|
def test_build_features_skips_non_rdsap_assessment_types(tmp_path: Path) -> None:
|
|
# Arrange
|
|
storage = LocalStorage(root=tmp_path)
|
|
rdsap = _wrapper("OK-1", assessment_type="RdSAP")
|
|
sap = _wrapper("SAP-1", assessment_type="SAP")
|
|
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [rdsap, sap]})
|
|
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
|
|
|
|
# Act
|
|
df = build_features(reader, certificate_numbers={"OK-1", "SAP-1"})
|
|
|
|
# Assert
|
|
assert df["certificate_number"].tolist() == ["OK-1"]
|
|
|
|
|
|
def test_build_features_skips_unsupported_schemas_by_default(tmp_path: Path) -> None:
|
|
# Arrange
|
|
storage = LocalStorage(root=tmp_path)
|
|
bad_doc = json.loads(_FIXTURE_PATH.read_text())
|
|
bad_doc["schema_type"] = "RdSAP-Schema-19.0.0" # not in mapper dispatch
|
|
supported = _wrapper("OK-1")
|
|
unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc))
|
|
_write_zip(
|
|
storage,
|
|
"bulk.zip",
|
|
{"certificates-2024.json": [supported, unsupported]},
|
|
)
|
|
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
|
|
|
|
# Act
|
|
df = build_features(reader, certificate_numbers={"OK-1", "BAD-1"})
|
|
|
|
# Assert
|
|
assert df["certificate_number"].tolist() == ["OK-1"]
|
|
|
|
|
|
def test_build_features_raises_when_skip_unsupported_schemas_is_false(tmp_path: Path) -> None:
|
|
# Arrange
|
|
storage = LocalStorage(root=tmp_path)
|
|
bad_doc = json.loads(_FIXTURE_PATH.read_text())
|
|
bad_doc["schema_type"] = "RdSAP-Schema-19.0.0"
|
|
unsupported = _wrapper("BAD-1", document=json.dumps(bad_doc))
|
|
_write_zip(storage, "bulk.zip", {"certificates-2024.json": [unsupported]})
|
|
reader = BulkZipReader(storage=storage, zip_key="bulk.zip")
|
|
|
|
# Act / Assert
|
|
with pytest.raises(ValueError, match="Unsupported EPC schema"):
|
|
build_features(
|
|
reader,
|
|
certificate_numbers={"BAD-1"},
|
|
skip_unsupported_schemas=False,
|
|
)
|