slice 15c: stream build_features so 500k+ cert runs fit memory

Previously kept the full list of EpcPropertyData in memory before calling
EpcMlTransform.to_rows. For the 25k slice that's ~30 MB; for the 580k
full-2026 corpus it OOM-killed the process silently. Now: parse cert ->
to_row -> append dict -> drop EpcPropertyData reference, so memory is
O(row-dict * n) instead of O(EpcPropertyData * n). Same end-of-frame
post-processing (categorical casts, column-order pin).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Khalim Conn-Kowlessar 2026-05-17 00:36:53 +00:00
parent 9f6f7608b9
commit a1f89b6033

View file

@ -5,7 +5,9 @@ JSON-encoded `document` payload. This function:
- Filters out non-RdSAP assessments (our mapper only handles RdSAP-Schema-21.x).
- Parses `document` and feeds it to EpcPropertyDataMapper.from_api_response.
- Calls EpcMlTransform.to_rows over the parsed properties.
- Calls EpcMlTransform.to_row per cert (streaming) and discards the heavy
EpcPropertyData immediately so memory stays O(row-dict) per cert rather than
O(EpcPropertyData * n) critical for the 500k+ cert full-year runs.
- Prepends a `certificate_number` column so every row is traceable to its source.
"""
@ -15,7 +17,7 @@ from typing import Any, cast
import pandas as pd
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from domain.ml.schema import TransformSchema
from domain.ml.transform import EpcMlTransform
from ml_training_data.bulk_zip_reader import BulkZipReader
@ -29,7 +31,7 @@ def build_features(
skip_unsupported_schemas: bool = True,
) -> pd.DataFrame:
transform = EpcMlTransform()
properties: list[EpcPropertyData] = []
rows: list[dict[str, Any]] = []
cert_nums: list[str] = []
for record in bulk_reader.iter_certificates_filtered(certificate_numbers):
if record.get("assessment_type") != _RDSAP_ASSESSMENT_TYPE:
@ -47,8 +49,26 @@ def build_features(
if skip_unsupported_schemas:
continue
raise
properties.append(prop)
rows.append(transform.to_row(prop))
cert_nums.append(str(record["certificate_number"]))
df = transform.to_rows(properties)
# prop and document drop out of scope; GC reclaims before the next iter.
df = _frame_from_rows(rows, transform.schema())
df["certificate_number"] = cert_nums
return df[["certificate_number", *[c for c in df.columns if c != "certificate_number"]]]
def _frame_from_rows(rows: list[dict[str, Any]], schema: TransformSchema) -> pd.DataFrame:
"""Build the typed DataFrame from streamed row dicts.
Mirrors EpcMlTransform.to_rows post-processing: full column set even when empty,
and pd.Categorical casts for any column flagged categorical in the schema.
"""
all_columns = list(schema.feature_columns.keys()) + list(schema.target_columns.keys())
df = pd.DataFrame(rows, columns=all_columns)
for name, spec in schema.feature_columns.items():
if spec.categorical:
df[name] = df[name].astype("category")
for name, spec in schema.target_columns.items():
if spec.categorical:
df[name] = df[name].astype("category")
return df