diff --git a/services/ml_training_data/pyproject.toml b/services/ml_training_data/pyproject.toml index c1de1b5e..15e00384 100644 --- a/services/ml_training_data/pyproject.toml +++ b/services/ml_training_data/pyproject.toml @@ -8,6 +8,9 @@ dependencies = [ "pandas>=2.0", "pandas-stubs", "ijson>=3.2", + "pyarrow>=15", + "lightgbm>=4.0", + "scikit-learn>=1.4", ] [tool.uv.sources] diff --git a/services/ml_training_data/src/ml_training_data/train_baseline.py b/services/ml_training_data/src/ml_training_data/train_baseline.py new file mode 100644 index 00000000..412e922f --- /dev/null +++ b/services/ml_training_data/src/ml_training_data/train_baseline.py @@ -0,0 +1,72 @@ +"""Fit one LightGBM regressor per target; emit metrics + feature importance. + +This is the final stage of the training-data pipeline. Inputs: a feature DataFrame +(produced by build_features + persisted by write_training_dataset) plus a list of +target columns to fit. Output: a metrics dict (MAPE + R^2 per target) and a +per-target JSON file of feature importances written via Storage. + +The certificate_number column, if present, is dropped before fitting so it never +leaks into the model as a feature. +""" + +import json +from typing import Any, cast + +import lightgbm as lgb # type: ignore[import-untyped] +import numpy as np +import pandas as pd +import sklearn.metrics as _sk_metrics # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] +import sklearn.model_selection as _sk_model_selection # type: ignore[import-untyped] # pyright: ignore[reportMissingTypeStubs] + +mean_absolute_percentage_error: Any = _sk_metrics.mean_absolute_percentage_error # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] +r2_score: Any = _sk_metrics.r2_score # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] +train_test_split: Any = _sk_model_selection.train_test_split # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + +from ml_training_data.storage import Storage + +_CERT_NUM_COLUMN = "certificate_number" + + +def train_baseline( + df: pd.DataFrame, + targets: list[str], + storage: Storage, + run_key: str, + *, + test_size: float = 0.2, + seed: int = 42, + n_estimators: int = 200, +) -> dict[str, dict[str, float]]: + feature_cols = [c for c in df.columns if c not in targets and c != _CERT_NUM_COLUMN] + metrics: dict[str, dict[str, float]] = {} + + for target in targets: + x = df[feature_cols] + y = df[target] + split = cast( + tuple[pd.DataFrame, pd.DataFrame, "pd.Series[Any]", "pd.Series[Any]"], + train_test_split(x, y, test_size=test_size, random_state=seed), + ) + x_train, x_test, y_train, y_test = split + + model: Any = lgb.LGBMRegressor(n_estimators=n_estimators, random_state=seed, verbose=-1) + model.fit(x_train, y_train) + preds: np.ndarray[Any, Any] = np.asarray(model.predict(x_test)) + + metrics[target] = { + "mape": float(cast(float, mean_absolute_percentage_error(y_test, preds))), + "r2": float(cast(float, r2_score(y_test, preds))), + } + + importance_arr = np.asarray(model.feature_importances_, dtype=float) + importance = {col: float(score) for col, score in zip(feature_cols, importance_arr)} + storage.write_bytes( + f"{run_key}importance_{target}.json", + json.dumps(importance, indent=2).encode("utf-8"), + ) + + storage.write_bytes( + f"{run_key}metrics.json", + json.dumps(metrics, indent=2).encode("utf-8"), + ) + return metrics diff --git a/services/ml_training_data/tests/unit/test_train_baseline.py b/services/ml_training_data/tests/unit/test_train_baseline.py new file mode 100644 index 00000000..80920fb8 --- /dev/null +++ b/services/ml_training_data/tests/unit/test_train_baseline.py @@ -0,0 +1,94 @@ +"""Tests for train_baseline() — fits one LightGBM regressor per target. + +train_baseline produces the baseline metrics (MAPE + R^2) and dumps per-target +feature-importance JSON to storage. This is the only stage that pulls in +LightGBM + sklearn; downstream training repos read the metrics + parquet only. +""" + +import json +from pathlib import Path + +import numpy as np +import pandas as pd + +from ml_training_data.storage import LocalStorage +from ml_training_data.train_baseline import train_baseline + + +def _synthetic_dataset(n: int = 200, seed: int = 0) -> pd.DataFrame: + rng = np.random.default_rng(seed) + floor_area = rng.uniform(40, 200, size=n) + walls = rng.integers(1, 5, size=n) + # sap_score correlates with floor_area + walls, plus noise. + sap_score = (100 - 0.2 * floor_area + 3 * walls + rng.normal(0, 2, size=n)).astype(int) + return pd.DataFrame( + { + "certificate_number": [f"CN-{i:04d}" for i in range(n)], + "total_floor_area_m2": floor_area, + "wall_count": walls, + "sap_score": sap_score, + } + ) + + +def test_train_baseline_returns_mape_and_r2_per_target(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + df = _synthetic_dataset() + + # Act + metrics = train_baseline( + df=df, + targets=["sap_score"], + storage=storage, + run_key="runs/2026-05-16/", + seed=42, + ) + + # Assert + assert "sap_score" in metrics + assert "mape" in metrics["sap_score"] + assert "r2" in metrics["sap_score"] + assert metrics["sap_score"]["r2"] > 0.0 # learns something on a correlated signal + + +def test_train_baseline_writes_feature_importance_per_target(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + df = _synthetic_dataset() + + # Act + train_baseline( + df=df, + targets=["sap_score"], + storage=storage, + run_key="runs/2026-05-16/", + seed=42, + ) + + # Assert + importance = json.loads(storage.read_bytes("runs/2026-05-16/importance_sap_score.json")) + assert set(importance.keys()) == {"total_floor_area_m2", "wall_count"} + assert all(isinstance(v, (int, float)) for v in importance.values()) + + +def test_train_baseline_handles_multiple_targets_independently(tmp_path: Path) -> None: + # Arrange + storage = LocalStorage(root=tmp_path) + df = _synthetic_dataset() + df["co2_emissions"] = df["sap_score"] * 0.1 + 1.0 # second correlated target + + # Act + metrics = train_baseline( + df=df, + targets=["sap_score", "co2_emissions"], + storage=storage, + run_key="runs/2026-05-16/", + seed=42, + ) + + # Assert + assert set(metrics.keys()) == {"sap_score", "co2_emissions"} + assert storage.exists("runs/2026-05-16/importance_sap_score.json") + assert storage.exists("runs/2026-05-16/importance_co2_emissions.json") + assert storage.exists("runs/2026-05-16/metrics.json")