diff --git a/services/ml_training_data/src/ml_training_data/sap_parity_probe.py b/services/ml_training_data/src/ml_training_data/sap_parity_probe.py index a03e6e2c..6a331f82 100644 --- a/services/ml_training_data/src/ml_training_data/sap_parity_probe.py +++ b/services/ml_training_data/src/ml_training_data/sap_parity_probe.py @@ -75,6 +75,20 @@ def main(argv: list[str] | None = None) -> None: inputs = cert_to_inputs(epc, prices=prices) result = calculate_sap_from_inputs(inputs) cert_primary = epc.energy_consumption_current + tfa = epc.total_floor_area_m2 or 1.0 + pf_space = inputs.space_heating_primary_factor + pf_hw = inputs.hot_water_primary_factor + pf_other = inputs.other_primary_factor + main_detail = ( + epc.sap_heating.main_heating_details[0] + if epc.sap_heating and epc.sap_heating.main_heating_details + else None + ) + age_band = ( + epc.sap_building_parts[0].construction_age_band + if epc.sap_building_parts + else None + ) results.append({ "cert": cn, "actual": actual, @@ -91,6 +105,23 @@ def main(argv: list[str] | None = None) -> None: if cert_primary is not None else None ), + # End-use primary-energy split, kWh/m² of TFA (calculator side). + "space_pe_m2": round( + (result.main_heating_fuel_kwh_per_yr + result.secondary_heating_fuel_kwh_per_yr) + * pf_space / tfa, 1 + ), + "hw_pe_m2": round(result.hot_water_kwh_per_yr * pf_hw / tfa, 1), + "lighting_pe_m2": round(result.lighting_kwh_per_yr * pf_other / tfa, 1), + "pumps_pe_m2": round(result.pumps_fans_kwh_per_yr * pf_other / tfa, 1), + "pv_pe_m2": round( + -inputs.pv_generation_kwh_per_yr * pf_other / tfa, 1 + ), + # Strata for residual decomposition. + "main_cat": main_detail.main_heating_category if main_detail else None, + "main_fuel": main_detail.main_fuel_type if main_detail else None, + "age": age_band, + "water_code": epc.sap_heating.water_heating_code if epc.sap_heating else None, + "has_cyl": epc.sap_heating.cylinder_size is not None if epc.sap_heating else False, }) except Exception as e: # noqa: BLE001 — exploratory probe errors.append({"cert": cn, "actual": actual, "error": f"{type(e).__name__}: {e}"}) @@ -108,10 +139,77 @@ def main(argv: list[str] | None = None) -> None: print(f"within ±{thr}: {pct:.1f}%") print("\nresidual distribution:") print(df["residual"].describe(percentiles=[0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95])) - print("\nworst 15 by |residual|:") + print("\nworst 15 by |residual| (SAP):") print(df.nlargest(15, "abs_resid")[ ["cert", "actual", "predicted", "residual", "ecf", "tfa", "ext", "dwelling"] ].to_string(index=False)) + + # ------------------------------------------------------------------ + # Primary-energy decomposition. The cert reports a single PEUI + # (`energy_consumption_current`, kWh/m² TFA), so we can't see the + # cert's per-end-use mix — but we can see ours, and stratify the + # residual by cert attributes to localise which population drives + # the bias. + # ------------------------------------------------------------------ + pe_df = df.dropna(subset=["primary_resid"]).copy() + if not pe_df.empty: + pe_df["abs_pe_resid"] = pe_df["primary_resid"].abs() + print(f"\n=== Primary energy (kWh/m² TFA) ===") + print(f"PE MAE: {pe_df['abs_pe_resid'].mean():.2f}") + print(f"PE bias: {pe_df['primary_resid'].mean():.2f}") + print(f"PE RMSE: {((pe_df['primary_resid'] ** 2).mean()) ** 0.5:.2f}") + print("PE residual distribution:") + print(pe_df["primary_resid"].describe( + percentiles=[0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95] + )) + print("\nCorpus mean OUR primary energy by end-use (kWh/m² TFA):") + mix_cols = ["space_pe_m2", "hw_pe_m2", "lighting_pe_m2", "pumps_pe_m2", "pv_pe_m2"] + for col in mix_cols: + print(f" {col:14s} {pe_df[col].mean():6.1f}") + print(f" {'TOTAL (ours)':14s} {pe_df['our_primary_kwh_m2'].mean():6.1f}") + print(f" {'TOTAL (cert)':14s} {pe_df['cert_primary_kwh_m2'].mean():6.1f}") + + # Stratified bias by main heating category. + print("\nPE residual stratified by main_heating_category:") + by_cat = pe_df.groupby("main_cat", dropna=False).agg( + n=("cert", "count"), + pe_mae=("abs_pe_resid", "mean"), + pe_bias=("primary_resid", "mean"), + our_pe=("our_primary_kwh_m2", "mean"), + cert_pe=("cert_primary_kwh_m2", "mean"), + space_pe=("space_pe_m2", "mean"), + hw_pe=("hw_pe_m2", "mean"), + light_pe=("lighting_pe_m2", "mean"), + ).round(1) + print(by_cat.to_string()) + + # Stratified bias by age band — narrows the envelope-side hypothesis. + print("\nPE residual stratified by construction_age_band:") + by_age = pe_df.groupby("age", dropna=False).agg( + n=("cert", "count"), + pe_mae=("abs_pe_resid", "mean"), + pe_bias=("primary_resid", "mean"), + space_pe=("space_pe_m2", "mean"), + ).round(1).sort_index() + print(by_age.to_string()) + + # Stratified bias by dwelling type — flat vs detached should split + # if envelope-side (party-wall surfaces) is the dominant residual. + print("\nPE residual stratified by dwelling_type:") + by_dwel = pe_df.groupby("dwelling", dropna=False).agg( + n=("cert", "count"), + pe_mae=("abs_pe_resid", "mean"), + pe_bias=("primary_resid", "mean"), + space_pe=("space_pe_m2", "mean"), + ).round(1) + print(by_dwel.to_string()) + + print("\nworst 15 by |PE residual|:") + print(pe_df.nlargest(15, "abs_pe_resid")[ + ["cert", "actual", "primary_resid", + "our_primary_kwh_m2", "cert_primary_kwh_m2", + "space_pe_m2", "hw_pe_m2", "main_cat", "dwelling"] + ].to_string(index=False)) if errors: print("\nerrors:") for e in errors[:10]: