mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
added testing for integration
This commit is contained in:
parent
5c94ecf3fb
commit
a700ead260
3 changed files with 639 additions and 55 deletions
|
|
@ -21,19 +21,25 @@ class Addresses:
|
|||
@classmethod
|
||||
def from_plan_input(cls, plan_input: list[dict], body) -> "Addresses":
|
||||
addresses = []
|
||||
if body.file_format == "ara_property_list":
|
||||
row_parser = cls.parse_ara_row
|
||||
else:
|
||||
warnings.warn(
|
||||
"_parse_row_deprecated is deprecated and will be removed in a future version. "
|
||||
"Use the parse_ara_row method instead",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
row_parser = cls._parse_row_deprecated
|
||||
|
||||
for row in plan_input:
|
||||
addresses.append(row_parser(row, body))
|
||||
try:
|
||||
if body.file_format == "ara_property_list":
|
||||
addr = cls.parse_ara_row(row, body)
|
||||
else:
|
||||
addr = cls._parse_row_deprecated(row, body)
|
||||
|
||||
# Fallback if new parser fails
|
||||
except Exception:
|
||||
warnings.warn(
|
||||
"Falling back to deprecated parser for row",
|
||||
RuntimeWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
addr = cls._parse_row_deprecated(row, body)
|
||||
|
||||
addresses.append(addr)
|
||||
|
||||
addresses = cls(addresses)
|
||||
addresses.validate_uprns()
|
||||
return addresses
|
||||
|
|
@ -107,77 +113,53 @@ class Addresses:
|
|||
|
||||
@staticmethod
|
||||
def _parse_row_deprecated(row: dict, body) -> Address:
|
||||
"""
|
||||
Is a method to be deprecated in favour of using the new array property list format
|
||||
:param row:
|
||||
:param body:
|
||||
:return:
|
||||
"""
|
||||
|
||||
def clean_uprn(v):
|
||||
if v is None:
|
||||
return None
|
||||
try:
|
||||
return int(float(v))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
raise ValueError(f"Invalid UPRN value: {v}")
|
||||
|
||||
uprn = clean_uprn(row.get("uprn"))
|
||||
if uprn is None:
|
||||
raise ValueError(f"Invalid or missing UPRN in row: {row}")
|
||||
uprn = clean_uprn(row.get("uprn") or row.get("ordnance_survey_uprn"))
|
||||
|
||||
address = row.get("address")
|
||||
if not address and body.file_format == "domna_asset_list":
|
||||
address = row.get("domna_address_1")
|
||||
address = row.get("address") or row.get("domna_address_1") or ""
|
||||
full_address = row.get("domna_full_address") or address or ""
|
||||
|
||||
full_address = (
|
||||
row.get("domna_full_address")
|
||||
if body.file_format == "domna_asset_list"
|
||||
else None
|
||||
)
|
||||
if not isinstance(full_address, str):
|
||||
full_address = None
|
||||
|
||||
postcode = str(row["postcode"]).strip().upper()
|
||||
|
||||
address_1 = str(address).strip() if address else ""
|
||||
full_address = str(full_address).strip() if full_address else ""
|
||||
landlord_property_id = str(row["landlord_property_id"]) if row.get("landlord_property_id") else ""
|
||||
postcode = str(row.get("postcode", "")).strip().upper()
|
||||
|
||||
return Address(
|
||||
uprn=uprn,
|
||||
landlord_property_id=landlord_property_id,
|
||||
address_1=address_1,
|
||||
full_address=full_address,
|
||||
postcode=postcode,
|
||||
landlord_property_type=row.get("property_type"),
|
||||
landlord_built_form=row.get("built_form"),
|
||||
# estimated=bool(row.get("estimated", False)),
|
||||
landlord_property_id=str(row["landlord_property_id"]) if row.get("landlord_property_id") else None,
|
||||
address_1=str(address).strip(),
|
||||
address_2=None,
|
||||
address_3=None,
|
||||
full_address=str(full_address).strip(),
|
||||
postcode=postcode,
|
||||
|
||||
landlord_total_floor_area_m2=None,
|
||||
|
||||
# Map old to new fields
|
||||
landlord_property_type=row.get("property_type") or row.get("landlord_property_type"),
|
||||
landlord_built_form=row.get("built_form") or row.get("landlord_built_form"),
|
||||
|
||||
landlord_wall_construction=None,
|
||||
landlord_roof_construction=None,
|
||||
landlord_floor_construction=None,
|
||||
landlord_windows_type=None,
|
||||
landlord_heating_system=None,
|
||||
landlord_heating_system=row.get("epc_heating_type"),
|
||||
landlord_fuel_type=None,
|
||||
landlord_heating_controls=None,
|
||||
landlord_hot_water_system=None,
|
||||
|
||||
landlord_wall_efficiency=None,
|
||||
landlord_roof_efficiency=None,
|
||||
landlord_windows_efficiency=None,
|
||||
landlord_heating_efficiency=None,
|
||||
landlord_heating_controls_efficiency=None,
|
||||
landlord_hot_water_efficiency=None,
|
||||
|
||||
landlord_has_sloping_ceiling=None,
|
||||
landlord_multi_glaze_proportion=None,
|
||||
landlord_construction_age_band=None,
|
||||
)
|
||||
|
||||
# def _build_identity_index(self) -> dict:
|
||||
# index = {}
|
||||
# for addr in self._addresses:
|
||||
# key = addr.identity_key()
|
||||
# if key in index:
|
||||
# raise ValueError(f"Duplicate address identity detected: {key}")
|
||||
# index[key] = addr
|
||||
# return index
|
||||
|
|
|
|||
214
backend/tests/test_addresses.py
Normal file
214
backend/tests/test_addresses.py
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
import pytest
|
||||
|
||||
from backend.addresses.Addresses import Addresses
|
||||
|
||||
|
||||
# -------------------------
|
||||
# Helpers
|
||||
# -------------------------
|
||||
|
||||
class AraBody:
|
||||
file_format = "ara_property_list"
|
||||
|
||||
|
||||
class LegacyBody:
|
||||
file_format = "legacy"
|
||||
|
||||
|
||||
# -------------------------
|
||||
# ARA FORMAT TESTS
|
||||
# -------------------------
|
||||
|
||||
def test_parse_ara_row_valid():
|
||||
row = {
|
||||
"uprn": "123",
|
||||
"address_1": "10 Downing St",
|
||||
"full_address": "10 Downing St, London",
|
||||
"postcode": "SW1A 2AA",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], AraBody())
|
||||
|
||||
assert len(addresses) == 1
|
||||
addr = addresses[0]
|
||||
|
||||
assert addr.uprn == 123
|
||||
assert addr.address_1 == "10 Downing St"
|
||||
assert addr.full_address == "10 Downing St, London"
|
||||
assert addr.postcode == "SW1A 2AA"
|
||||
|
||||
|
||||
def test_parse_ara_row_optional_fields():
|
||||
row = {
|
||||
"uprn": "456",
|
||||
"address_1": "Flat 2",
|
||||
"full_address": "Flat 2, Test House",
|
||||
"postcode": "AB1 2CD",
|
||||
"landlord_property_id": "ABC123",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], AraBody())
|
||||
addr = addresses[0]
|
||||
|
||||
assert addr.uprn == 456
|
||||
assert addr.landlord_property_id == "ABC123"
|
||||
|
||||
|
||||
# -------------------------
|
||||
# LEGACY FORMAT TESTS
|
||||
# -------------------------
|
||||
|
||||
def test_parse_legacy_basic():
|
||||
row = {
|
||||
"landlord_property_id": 144002000000,
|
||||
"address": "15 Rosebank Hall Angle Terrace",
|
||||
"postcode": "NE28 7BQ",
|
||||
"ordnance_survey_uprn": 47002793,
|
||||
"property_type": "Bungalow",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], LegacyBody())
|
||||
addr = addresses[0]
|
||||
|
||||
assert addr.uprn == 47002793
|
||||
assert addr.address_1 == "15 Rosebank Hall Angle Terrace"
|
||||
assert addr.postcode == "NE28 7BQ"
|
||||
assert addr.landlord_property_type == "Bungalow"
|
||||
|
||||
|
||||
def test_legacy_uses_domna_address_if_missing_address():
|
||||
row = {
|
||||
"domna_address_1": "Domna Address",
|
||||
"postcode": "AA1 1AA",
|
||||
"ordnance_survey_uprn": 123456,
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], LegacyBody())
|
||||
addr = addresses[0]
|
||||
|
||||
assert addr.address_1 == "Domna Address"
|
||||
|
||||
|
||||
def test_legacy_full_address_fallback():
|
||||
row = {
|
||||
"address": "Fallback Address",
|
||||
"postcode": "ZZ1 1ZZ",
|
||||
"ordnance_survey_uprn": 999,
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], LegacyBody())
|
||||
addr = addresses[0]
|
||||
|
||||
assert addr.full_address == "Fallback Address"
|
||||
|
||||
|
||||
# -------------------------
|
||||
# UPRN HANDLING
|
||||
# -------------------------
|
||||
|
||||
def test_uprn_from_float_string():
|
||||
row = {
|
||||
"uprn": "123.0",
|
||||
"address": "Test Address",
|
||||
"postcode": "AA1 1AA",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], LegacyBody())
|
||||
assert addresses[0].uprn == 123
|
||||
|
||||
|
||||
def test_uprn_fallback_to_os_uprn():
|
||||
row = {
|
||||
"uprn": None,
|
||||
"ordnance_survey_uprn": 555,
|
||||
"address": "Test Address",
|
||||
"postcode": "AA1 1AA",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], LegacyBody())
|
||||
assert addresses[0].uprn == 555
|
||||
|
||||
|
||||
def test_missing_uprn_is_none():
|
||||
row = {
|
||||
"address": "No UPRN Address",
|
||||
"postcode": "BB1 1BB",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], LegacyBody())
|
||||
assert addresses[0].uprn is None
|
||||
|
||||
|
||||
# -------------------------
|
||||
# FALLBACK LOGIC
|
||||
# -------------------------
|
||||
|
||||
def test_fallback_to_legacy_when_ara_fails():
|
||||
"""
|
||||
If ARA parser fails (missing required fields),
|
||||
system should fallback to legacy parser.
|
||||
"""
|
||||
row = {
|
||||
"address": "Fallback Address",
|
||||
"postcode": "ZZ1 1ZZ",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], AraBody())
|
||||
addr = addresses[0]
|
||||
|
||||
assert addr.address_1 == "Fallback Address"
|
||||
|
||||
|
||||
# -------------------------
|
||||
# VALIDATION
|
||||
# -------------------------
|
||||
|
||||
def test_validate_uprn_rejects_invalid():
|
||||
row = {
|
||||
"uprn": "not_a_number",
|
||||
"address_1": "Test",
|
||||
"full_address": "Test",
|
||||
"postcode": "AA1 1AA",
|
||||
}
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
Addresses.from_plan_input([row], AraBody())
|
||||
|
||||
|
||||
# -------------------------
|
||||
# COLLECTION METHODS
|
||||
# -------------------------
|
||||
|
||||
def test_get_uprns():
|
||||
rows = [
|
||||
{"uprn": "1", "address_1": "A", "full_address": "A", "postcode": "AA"},
|
||||
{"uprn": "2", "address_1": "B", "full_address": "B", "postcode": "BB"},
|
||||
]
|
||||
|
||||
addresses = Addresses.from_plan_input(rows, AraBody())
|
||||
assert addresses.get_uprns() == [1, 2]
|
||||
|
||||
|
||||
def test_get_unique_postcodes():
|
||||
rows = [
|
||||
{"uprn": "1", "address_1": "A", "full_address": "A", "postcode": "AA"},
|
||||
{"uprn": "2", "address_1": "B", "full_address": "B", "postcode": "AA"},
|
||||
]
|
||||
|
||||
addresses = Addresses.from_plan_input(rows, AraBody())
|
||||
assert addresses.get_unique_postcodes() == ["AA"]
|
||||
|
||||
|
||||
def test_get_property_requests():
|
||||
row = {
|
||||
"uprn": "123",
|
||||
"address_1": "10 Downing St",
|
||||
"full_address": "10 Downing St",
|
||||
"postcode": "SW1A 2AA",
|
||||
}
|
||||
|
||||
addresses = Addresses.from_plan_input([row], AraBody())
|
||||
request = addresses.get_property_requests()[0]
|
||||
|
||||
assert request["uprn"] == 123
|
||||
assert request["postcode"] == "SW1A 2AA"
|
||||
388
backend/tests/test_rebaselining_pipeline.py
Normal file
388
backend/tests/test_rebaselining_pipeline.py
Normal file
|
|
@ -0,0 +1,388 @@
|
|||
# --- Integration Test with Real Data ---
|
||||
import os
|
||||
|
||||
|
||||
def load_sample_certificates():
|
||||
"""Load sample_certificates.csv as a list of dicts."""
|
||||
# Always look for the file relative to the project root (cwd)
|
||||
import pandas as pd
|
||||
csv_path = os.path.join(os.getcwd(), 'backend', 'tests', 'test_data', 'sample_certificates.csv')
|
||||
if os.path.exists(csv_path):
|
||||
df = pd.read_csv(csv_path)
|
||||
# Normalize columns: lowercase, replace underscores with hyphens, strip spaces
|
||||
df.columns = [c.strip().lower().replace('_', '-') for c in df.columns]
|
||||
df = df[~pd.isnull(df["uprn"])]
|
||||
df = df[~pd.isnull(df["low-energy-fixed-light-count"])]
|
||||
df = df.fillna("")
|
||||
for col in ["uprn", "low-energy-fixed-light-count"]:
|
||||
df[col] = df[col].astype(int).astype(str)
|
||||
df = df.astype(str)
|
||||
return df
|
||||
raise FileNotFoundError(
|
||||
f"sample_certificates.csv not found at {csv_path}. Make sure it exists relative to the project root.")
|
||||
|
||||
|
||||
def make_property_from_row(row, cleaning_data):
|
||||
# Convert row to dict with correct keys (hyphens, lower case)
|
||||
# Convert all keys to snake_case (replace hyphens with underscores, lower case)
|
||||
from etl.epc.Record import EPCRecord
|
||||
|
||||
row_dict = row.to_dict()
|
||||
|
||||
epc_records = {
|
||||
"original_epc": row_dict.copy(),
|
||||
"full_sap_epc": row_dict.copy(),
|
||||
"old_data": []
|
||||
}
|
||||
|
||||
epc_record = EPCRecord(
|
||||
epc_records=epc_records,
|
||||
run_mode="newdata",
|
||||
cleaning_data=cleaning_data
|
||||
)
|
||||
# Extract required fields for Property constructor
|
||||
# Use lmk-key as id if present, else fallback to uprn or index
|
||||
id_val = row.get('uprn')
|
||||
postcode_val = row.get('postcode')
|
||||
address_val = row.get('address') or row.get('address1')
|
||||
from backend.Property import Property
|
||||
property_obj = Property(
|
||||
id=id_val,
|
||||
postcode=postcode_val,
|
||||
address=address_val,
|
||||
epc_record=epc_record,
|
||||
uprn=int(row['uprn']) if 'uprn' in row and not pd.isnull(row['uprn']) else None,
|
||||
# Provide defaults for other optional args as needed
|
||||
)
|
||||
return property_obj
|
||||
|
||||
|
||||
def load_cleaned():
|
||||
import pickle
|
||||
with open("recommendations/tests/test_data/cleaned.pkl", "rb") as f:
|
||||
df = pickle.load(f)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def load_cleaning_data():
|
||||
import pickle
|
||||
with open("recommendations/tests/test_data/cleaning_data.pkl", "rb") as f:
|
||||
df = pickle.load(f)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def test_rebaselining_pipeline_with_real_data(mock_model_api):
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from backend.ml_models.api import ModelApi
|
||||
from backend.app.utils import sap_to_epc
|
||||
|
||||
df = load_sample_certificates()
|
||||
|
||||
cleaning_data = load_cleaning_data()
|
||||
input_properties = [make_property_from_row(row, cleaning_data=cleaning_data) for _, row in df.iterrows()]
|
||||
cleaned = load_cleaned()
|
||||
rebaselining_scoring_data = []
|
||||
# List of required columns for the model pipeline
|
||||
required_columns = [
|
||||
'secondheat_description_ending',
|
||||
'windows_description_ending',
|
||||
'low_energy_lighting_ending',
|
||||
'solar_water_heating_flag_ending',
|
||||
'photo_supply_ending',
|
||||
'floor_height_ending',
|
||||
'floor_energy_eff_ending',
|
||||
'sheating_energy_eff_ending',
|
||||
'lighting_energy_eff_ending',
|
||||
'is_post_sap10_ending',
|
||||
'secondheat_description_starting',
|
||||
'windows_description_starting',
|
||||
'low_energy_lighting_starting',
|
||||
'solar_water_heating_flag_starting',
|
||||
'photo_supply_starting',
|
||||
'floor_height_starting',
|
||||
'floor_energy_eff_starting',
|
||||
'sheating_energy_eff_starting',
|
||||
'lighting_energy_eff_starting',
|
||||
'is_post_sap10_starting',
|
||||
'fixed_lighting_outlets_count',
|
||||
]
|
||||
for p in input_properties:
|
||||
# Already rebaseline for tests
|
||||
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
||||
scoring_data = p.base_difference_record.df.copy()
|
||||
rebaselining_scoring_data.append(scoring_data)
|
||||
if not rebaselining_scoring_data:
|
||||
assert False, "No properties required rebaselining in the sample data."
|
||||
rebaselining_scoring_data = pd.concat(rebaselining_scoring_data)
|
||||
# Set is_post_sap10_starting after concatenation
|
||||
rebaselining_scoring_data["is_post_sap10_starting"] = False
|
||||
|
||||
# Instantiate ModelApi as in engine.py
|
||||
portfolio_id = "test-portfolio"
|
||||
timestamp = datetime.now().isoformat()
|
||||
from backend.app.config import get_prediction_buckets
|
||||
prediction_buckets = get_prediction_buckets()
|
||||
model_api = ModelApi(
|
||||
portfolio_id=portfolio_id,
|
||||
timestamp=timestamp,
|
||||
prediction_buckets=prediction_buckets,
|
||||
max_retries=1
|
||||
)
|
||||
|
||||
# Use the real model_api and bucket
|
||||
bucket = "retrofit-data-dev"
|
||||
model_prefixes = model_api.BASELINE_MODEL_PREFIXES
|
||||
rebaselining_response = model_api.predict_all(
|
||||
df=rebaselining_scoring_data,
|
||||
bucket=bucket,
|
||||
model_prefixes=model_prefixes,
|
||||
extract_ids=False,
|
||||
extract_uprn=True
|
||||
)
|
||||
input_properties_by_uprn = {int(p.uprn): p for p in input_properties if p.uprn is not None}
|
||||
model_names = [
|
||||
"retrofit_sap_baseline_predictions",
|
||||
"retrofit_carbon_baseline_predictions",
|
||||
"retrofit_heat_baseline_predictions",
|
||||
]
|
||||
predictions_by_model_and_uprn = {}
|
||||
# Build a mapping from uprn to original values for easy lookup
|
||||
uprn_to_originals = {}
|
||||
for p in input_properties:
|
||||
if p.uprn is not None and hasattr(p, 'epc_record') and hasattr(p.epc_record, 'original_epc'):
|
||||
orig = p.epc_record.original_epc
|
||||
uprn_to_originals[int(p.uprn)] = {
|
||||
'original_sap': orig.get('current-energy-efficiency'),
|
||||
'original_carbon': orig.get('co2-emissions-current'),
|
||||
'original_heat': orig.get('energy-consumption-current'),
|
||||
}
|
||||
|
||||
def calculate_mape(df, pred_col, actual_col):
|
||||
df = df.copy()
|
||||
df[pred_col] = pd.to_numeric(df[pred_col], errors="coerce")
|
||||
df[actual_col] = pd.to_numeric(df[actual_col], errors="coerce")
|
||||
valid = (
|
||||
df[actual_col].notnull() &
|
||||
df[pred_col].notnull() &
|
||||
(df[actual_col] != 0)
|
||||
)
|
||||
if valid.sum() == 0:
|
||||
return None # No valid rows
|
||||
mape = (
|
||||
(df.loc[valid, pred_col] - df.loc[valid, actual_col]).abs()
|
||||
/ df.loc[valid, actual_col].abs()
|
||||
).mean() * 100
|
||||
return mape
|
||||
|
||||
mape_results = {}
|
||||
for model in model_names:
|
||||
df_pred = rebaselining_response[model]
|
||||
# Map originals
|
||||
df_pred['original_sap'] = df_pred['uprn'].map(
|
||||
lambda u: uprn_to_originals.get(int(u), {}).get('original_sap')
|
||||
)
|
||||
df_pred['original_carbon'] = df_pred['uprn'].map(
|
||||
lambda u: uprn_to_originals.get(int(u), {}).get('original_carbon')
|
||||
)
|
||||
df_pred['original_heat'] = df_pred['uprn'].map(
|
||||
lambda u: uprn_to_originals.get(int(u), {}).get('original_heat')
|
||||
)
|
||||
# Save predictions
|
||||
predictions_by_model_and_uprn[model] = dict(
|
||||
zip(df_pred["uprn"].astype(int), df_pred["predictions"])
|
||||
)
|
||||
# For debugging
|
||||
# df_pred.to_csv(f"rebaselining_{model}.csv", index=False)
|
||||
# Select correct actual column
|
||||
if model == "retrofit_sap_baseline_predictions":
|
||||
actual_col = "original_sap"
|
||||
metric_name = "sap"
|
||||
elif model == "retrofit_carbon_baseline_predictions":
|
||||
actual_col = "original_carbon"
|
||||
metric_name = "carbon"
|
||||
elif model == "retrofit_heat_baseline_predictions":
|
||||
actual_col = "original_heat"
|
||||
metric_name = "heat"
|
||||
else:
|
||||
continue
|
||||
mape = calculate_mape(df_pred, "predictions", actual_col)
|
||||
if mape is not None:
|
||||
mape_results[metric_name] = mape
|
||||
print(f"MAPE ({metric_name}): {mape:.2f}%")
|
||||
else:
|
||||
print(f"MAPE ({metric_name}): No valid data")
|
||||
# --- ASSERT PERFORMANCE ---
|
||||
# each model has varying impacts under SAP 10. We see a small SAP movement
|
||||
# but much higher carbon and heat changes. We expect this. E.g. we see
|
||||
# cases where EPC C properties had 0.2 carbon which should be higher
|
||||
MAX_MAPE = {
|
||||
"sap": 4.6, # %
|
||||
"carbon": 21.0, # %
|
||||
"heat": 16.0, # %
|
||||
}
|
||||
for metric, mape in mape_results.items():
|
||||
max_allowed = MAX_MAPE.get(metric, 100.0)
|
||||
assert mape < max_allowed, f"{metric.upper()} MAPE too high: {mape:.2f}% > {max_allowed}%"
|
||||
|
||||
for uprn_int in rebaselining_scoring_data["uprn"].unique().astype(int):
|
||||
property_instance = input_properties_by_uprn.get(uprn_int)
|
||||
if property_instance is None:
|
||||
continue
|
||||
new_sap = predictions_by_model_and_uprn["retrofit_sap_baseline_predictions"][uprn_int]
|
||||
new_carbon = predictions_by_model_and_uprn["retrofit_carbon_baseline_predictions"][uprn_int]
|
||||
new_heat_demand = predictions_by_model_and_uprn["retrofit_heat_baseline_predictions"][uprn_int]
|
||||
property_instance.epc_record.insert_new_performance_values(
|
||||
new_sap=new_sap,
|
||||
new_epc=sap_to_epc(new_sap),
|
||||
new_carbon=new_carbon,
|
||||
new_heat_demand=new_heat_demand,
|
||||
)
|
||||
# Assert that EPC records were updated for the right properties
|
||||
updated = 0
|
||||
for p in input_properties:
|
||||
if p.epc_record.has_been_remodelled:
|
||||
updated += 1
|
||||
assert updated > 0, "No EPC records were updated."
|
||||
|
||||
# Optionally: Add accuracy/performance checks here if you have ground truth
|
||||
# For now, just print a summary
|
||||
print(f"Updated {updated} EPC records with new predictions.")
|
||||
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
import pandas as pd
|
||||
|
||||
|
||||
# Import the relevant classes and functions
|
||||
# from backend.Property import Property # Uncomment and adjust as needed
|
||||
# from etl.epc.Record import EpcRecord # Uncomment and adjust as needed
|
||||
# from backend.engine.engine import sap_to_epc # Uncomment and adjust as needed
|
||||
|
||||
# --- Fixtures ---
|
||||
@pytest.fixture
|
||||
def sample_input_properties():
|
||||
"""Return a list of mock property objects with required attributes for rebaselining."""
|
||||
|
||||
class MockEpcRecord:
|
||||
def __init__(self):
|
||||
self.landlord_differences = {'wall_insulation': 'yes'}
|
||||
self.current_energy_efficiency = 60
|
||||
self.lodgement_date = '2020-01-01'
|
||||
self.original_epc = {'wall-insulation': 'no'}
|
||||
|
||||
def insert_new_performance_values(self, new_sap, new_epc, new_carbon, new_heat_demand):
|
||||
self.new_sap = new_sap
|
||||
self.new_epc = new_epc
|
||||
self.new_carbon = new_carbon
|
||||
self.new_heat_demand = new_heat_demand
|
||||
|
||||
class MockProperty:
|
||||
def __init__(self, uprn, expired=False, estimated=False):
|
||||
self.uprn = uprn
|
||||
self.epc_is_expired = expired
|
||||
self.epc_is_estimated = estimated
|
||||
self.epc_record = MockEpcRecord()
|
||||
|
||||
def create_base_difference_epc_record(self, cleaned_lookup=None):
|
||||
# Simulate creation of base_difference_record
|
||||
self.base_difference_record = MagicMock()
|
||||
self.base_difference_record.df = pd.DataFrame({
|
||||
'uprn': [self.uprn],
|
||||
'feature1': [1],
|
||||
'feature2': [2],
|
||||
})
|
||||
|
||||
return [MockProperty(1001, expired=True), MockProperty(1002, estimated=True), MockProperty(1003)]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_model_api():
|
||||
mock = MagicMock()
|
||||
# Simulate model_api.predict_all returning a dict of DataFrames
|
||||
mock.predict_all.return_value = {
|
||||
'retrofit_sap_baseline_predictions': pd.DataFrame({'uprn': [1001, 1002], 'predictions': [70, 65]}),
|
||||
'retrofit_carbon_baseline_predictions': pd.DataFrame({'uprn': [1001, 1002], 'predictions': [1.2, 1.1]}),
|
||||
'retrofit_heat_baseline_predictions': pd.DataFrame({'uprn': [1001, 1002], 'predictions': [10000, 9500]}),
|
||||
}
|
||||
mock.BASELINE_MODEL_PREFIXES = ['retrofit_sap_baseline_predictions', 'retrofit_carbon_baseline_predictions',
|
||||
'retrofit_heat_baseline_predictions']
|
||||
return mock
|
||||
|
||||
|
||||
# --- Integration Test ---
|
||||
def test_rebaselining_pipeline(sample_input_properties, mock_model_api):
|
||||
# Simulate the rebaselining process
|
||||
input_properties = sample_input_properties
|
||||
cleaned = None # Placeholder for cleaned_lookup
|
||||
rebaselining_scoring_data = []
|
||||
for p in input_properties:
|
||||
needs_rebaselining = True # Force rebaselining for all properties
|
||||
if needs_rebaselining:
|
||||
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
|
||||
scoring_data = p.base_difference_record.df.copy()
|
||||
rebaselining_scoring_data.append(scoring_data)
|
||||
rebaselining_scoring_data = pd.concat(rebaselining_scoring_data)
|
||||
if not rebaselining_scoring_data.empty:
|
||||
rebaselining_scoring_data["is_post_sap10_starting"] = True
|
||||
# Patch sap_to_epc if needed
|
||||
with patch('backend.engine.engine.sap_to_epc', lambda x: 'C'):
|
||||
rebaselining_response = mock_model_api.predict_all(
|
||||
df=rebaselining_scoring_data,
|
||||
bucket='dummy-bucket',
|
||||
model_prefixes=mock_model_api.BASELINE_MODEL_PREFIXES,
|
||||
extract_ids=False,
|
||||
extract_uprn=True
|
||||
)
|
||||
input_properties_by_uprn = {int(p.uprn): p for p in input_properties if p.uprn is not None}
|
||||
model_names = [
|
||||
"retrofit_sap_baseline_predictions",
|
||||
"retrofit_carbon_baseline_predictions",
|
||||
"retrofit_heat_baseline_predictions",
|
||||
]
|
||||
predictions_by_model_and_uprn = {}
|
||||
for model in model_names:
|
||||
df = rebaselining_response[model]
|
||||
predictions_by_model_and_uprn[model] = dict(zip(df["uprn"].astype(int), df["predictions"]))
|
||||
for uprn_int in rebaselining_scoring_data["uprn"].unique().astype(int):
|
||||
property_instance = input_properties_by_uprn.get(uprn_int)
|
||||
if property_instance is None:
|
||||
continue
|
||||
new_sap = predictions_by_model_and_uprn["retrofit_sap_baseline_predictions"].get(uprn_int)
|
||||
new_carbon = predictions_by_model_and_uprn["retrofit_carbon_baseline_predictions"].get(uprn_int)
|
||||
new_heat_demand = predictions_by_model_and_uprn["retrofit_heat_baseline_predictions"].get(uprn_int)
|
||||
property_instance.epc_record.insert_new_performance_values(
|
||||
new_sap=new_sap,
|
||||
new_epc='C',
|
||||
new_carbon=new_carbon,
|
||||
new_heat_demand=new_heat_demand,
|
||||
)
|
||||
# Assert that EPC records were updated for the right properties
|
||||
# Only properties that were marked as expired or estimated should have new_sap set
|
||||
for p in input_properties:
|
||||
needs_rebaselining = p.epc_is_expired or p.epc_is_estimated or (
|
||||
len(getattr(p.epc_record, 'landlord_differences', {})) > 0)
|
||||
if needs_rebaselining:
|
||||
assert hasattr(p.epc_record, 'new_sap')
|
||||
else:
|
||||
assert not hasattr(p.epc_record, 'new_sap')
|
||||
|
||||
|
||||
# --- Unit Test Example ---
|
||||
def test_insert_new_performance_values():
|
||||
class DummyEpcRecord:
|
||||
def insert_new_performance_values(self, new_sap, new_epc, new_carbon, new_heat_demand):
|
||||
self.new_sap = new_sap
|
||||
self.new_epc = new_epc
|
||||
self.new_carbon = new_carbon
|
||||
self.new_heat_demand = new_heat_demand
|
||||
|
||||
record = DummyEpcRecord()
|
||||
record.insert_new_performance_values(80, 'B', 1.0, 9000)
|
||||
assert record.new_sap == 80
|
||||
assert record.new_epc == 'B'
|
||||
assert record.new_carbon == 1.0
|
||||
assert record.new_heat_demand == 9000
|
||||
Loading…
Add table
Reference in a new issue