mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
180 lines
7.1 KiB
Python
180 lines
7.1 KiB
Python
import os
|
|
import pickle
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
|
|
def load_sample_certificates():
|
|
"""Load sample_certificates.csv as a DataFrame with normalized columns."""
|
|
csv_path = os.path.join(os.getcwd(), 'backend', 'tests', 'test_data', 'sample_certificates.csv')
|
|
if not os.path.exists(csv_path):
|
|
raise FileNotFoundError(
|
|
f"sample_certificates.csv not found at {csv_path}. Make sure it exists relative to the project root.")
|
|
df = pd.read_csv(csv_path)
|
|
df.columns = [c.strip().lower().replace('_', '-') for c in df.columns]
|
|
df = df[~pd.isnull(df["uprn"])]
|
|
df = df[~pd.isnull(df["low-energy-fixed-light-count"])]
|
|
df = df.fillna("")
|
|
for col in ["uprn", "low-energy-fixed-light-count"]:
|
|
df[col] = df[col].astype(int).astype(str)
|
|
df = df.astype(str)
|
|
return df
|
|
|
|
|
|
def make_property_from_row(row, cleaning_data):
|
|
from etl.epc.Record import EPCRecord
|
|
from backend.Property import Property
|
|
row_dict = row.to_dict()
|
|
from etl.epc.Record import InputEpcRecords
|
|
epc_records = InputEpcRecords(
|
|
original_epc=row_dict.copy(),
|
|
full_sap_epc=row_dict.copy(),
|
|
old_data=[]
|
|
)
|
|
epc_record = EPCRecord(
|
|
epc_records=epc_records,
|
|
run_mode="newdata",
|
|
cleaning_data=cleaning_data
|
|
)
|
|
id_val = row.get('uprn')
|
|
postcode_val = row.get('postcode')
|
|
address_val = row.get('address') or row.get('address1')
|
|
return Property(
|
|
id=id_val,
|
|
postcode=postcode_val,
|
|
address=address_val,
|
|
epc_record=epc_record,
|
|
uprn=int(row['uprn']) if 'uprn' in row and not pd.isnull(row['uprn']) else None,
|
|
)
|
|
|
|
|
|
def load_cleaned():
|
|
with open("recommendations/tests/test_data/cleaned.pkl", "rb") as f:
|
|
return pickle.load(f)
|
|
|
|
|
|
def load_cleaning_data():
|
|
with open("recommendations/tests/test_data/cleaning_data.pkl", "rb") as f:
|
|
return pickle.load(f)
|
|
|
|
|
|
@pytest.mark.integration
|
|
def test_rebaselining_pipeline_with_real_data():
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
from backend.ml_models.api import ModelApi
|
|
from backend.app.utils import sap_to_epc
|
|
from backend.app.config import get_prediction_buckets
|
|
|
|
df = load_sample_certificates()
|
|
cleaning_data = load_cleaning_data()
|
|
input_properties = [make_property_from_row(row, cleaning_data=cleaning_data) for _, row in df.iterrows()]
|
|
cleaned = load_cleaned()
|
|
rebaselining_scoring_data = []
|
|
for p in input_properties:
|
|
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
|
scoring_data = p.base_difference_record.df.copy()
|
|
rebaselining_scoring_data.append(scoring_data)
|
|
if not rebaselining_scoring_data:
|
|
assert False, "No properties required rebaselining in the sample data."
|
|
rebaselining_scoring_data = pd.concat(rebaselining_scoring_data)
|
|
rebaselining_scoring_data["is_post_sap10_starting"] = False
|
|
|
|
model_api = ModelApi(
|
|
portfolio_id="test-portfolio",
|
|
timestamp=datetime.now().isoformat(),
|
|
prediction_buckets=get_prediction_buckets(),
|
|
max_retries=1
|
|
)
|
|
bucket = "retrofit-data-dev"
|
|
model_prefixes = model_api.BASELINE_MODEL_PREFIXES
|
|
rebaselining_response = model_api.predict_all(
|
|
df=rebaselining_scoring_data,
|
|
bucket=bucket,
|
|
model_prefixes=model_prefixes,
|
|
extract_ids=False,
|
|
extract_uprn=True
|
|
)
|
|
input_properties_by_uprn = {int(p.uprn): p for p in input_properties if p.uprn is not None}
|
|
model_names = [
|
|
"retrofit_sap_baseline_predictions",
|
|
"retrofit_carbon_baseline_predictions",
|
|
"retrofit_heat_baseline_predictions",
|
|
]
|
|
predictions_by_model_and_uprn = {}
|
|
uprn_to_originals = {}
|
|
for p in input_properties:
|
|
if p.uprn is not None and hasattr(p, 'epc_record') and hasattr(p.epc_record, 'original_epc'):
|
|
orig = p.epc_record.original_epc
|
|
uprn_to_originals[int(p.uprn)] = {
|
|
'original_sap': orig.get('current-energy-efficiency'),
|
|
'original_carbon': orig.get('co2-emissions-current'),
|
|
'original_heat': orig.get('energy-consumption-current'),
|
|
}
|
|
|
|
def calculate_mape(df, pred_col, actual_col):
|
|
df = df.copy()
|
|
df[pred_col] = pd.to_numeric(df[pred_col], errors="coerce")
|
|
df[actual_col] = pd.to_numeric(df[actual_col], errors="coerce")
|
|
valid = (
|
|
df[actual_col].notnull() &
|
|
df[pred_col].notnull() &
|
|
(df[actual_col] != 0)
|
|
)
|
|
if valid.sum() == 0:
|
|
return None
|
|
mape = ((df.loc[valid, pred_col] - df.loc[valid, actual_col]).abs() / df.loc[
|
|
valid, actual_col].abs()).mean() * 100
|
|
return mape
|
|
|
|
mape_results = {}
|
|
for model in model_names:
|
|
df_pred = rebaselining_response[model]
|
|
df_pred['original_sap'] = df_pred['uprn'].map(lambda u: uprn_to_originals.get(int(u), {}).get('original_sap'))
|
|
df_pred['original_carbon'] = df_pred['uprn'].map(
|
|
lambda u: uprn_to_originals.get(int(u), {}).get('original_carbon'))
|
|
df_pred['original_heat'] = df_pred['uprn'].map(lambda u: uprn_to_originals.get(int(u), {}).get('original_heat'))
|
|
predictions_by_model_and_uprn[model] = dict(zip(df_pred["uprn"].astype(int), df_pred["predictions"]))
|
|
if model == "retrofit_sap_baseline_predictions":
|
|
actual_col = "original_sap"
|
|
metric_name = "sap"
|
|
elif model == "retrofit_carbon_baseline_predictions":
|
|
actual_col = "original_carbon"
|
|
metric_name = "carbon"
|
|
elif model == "retrofit_heat_baseline_predictions":
|
|
actual_col = "original_heat"
|
|
metric_name = "heat"
|
|
else:
|
|
continue
|
|
mape = calculate_mape(df_pred, "predictions", actual_col)
|
|
if mape is not None:
|
|
mape_results[metric_name] = mape
|
|
print(f"MAPE ({metric_name}): {mape:.2f}%")
|
|
else:
|
|
print(f"MAPE ({metric_name}): No valid data")
|
|
|
|
MAX_MAPE = {
|
|
"sap": 4.6,
|
|
"carbon": 21.0,
|
|
"heat": 16.0,
|
|
}
|
|
for metric, mape in mape_results.items():
|
|
max_allowed = MAX_MAPE.get(metric, 100.0)
|
|
assert mape < max_allowed, f"{metric.upper()} MAPE too high: {mape:.2f}% > {max_allowed}%"
|
|
|
|
for uprn_int in rebaselining_scoring_data["uprn"].unique().astype(int):
|
|
property_instance = input_properties_by_uprn.get(uprn_int)
|
|
if property_instance is None:
|
|
continue
|
|
new_sap = predictions_by_model_and_uprn["retrofit_sap_baseline_predictions"][uprn_int]
|
|
new_carbon = predictions_by_model_and_uprn["retrofit_carbon_baseline_predictions"][uprn_int]
|
|
new_heat_demand = predictions_by_model_and_uprn["retrofit_heat_baseline_predictions"][uprn_int]
|
|
property_instance.epc_record.insert_new_performance_values(
|
|
new_sap=new_sap,
|
|
new_epc=sap_to_epc(new_sap),
|
|
new_carbon=new_carbon,
|
|
new_heat_demand=new_heat_demand,
|
|
)
|
|
updated = sum(1 for p in input_properties if getattr(p.epc_record, 'has_been_remodelled', False))
|
|
assert updated > 0, "No EPC records were updated."
|
|
print(f"Updated {updated} EPC records with new predictions.")
|