Model/backend/export/tests/test_export.py
2026-02-23 12:13:59 +00:00

274 lines
8.3 KiB
Python

import pandas as pd
import numpy as np
from pathlib import Path
import time
from backend.export.property_scenarios.main import process_export
from backend.export.property_scenarios.input_schema import ExportRequest
from backend.app.db.models.portfolio import PropertyModel, Epc, Portfolio, PortfolioStatus, PortfolioGoal, \
PropertyCreationStatus, PropertyDetailsEpcModel
from backend.app.db.models.recommendations import PlanModel, Recommendation, PlanRecommendations
from utils.logger import setup_logger
FIXTURE_PATH = Path("backend/export/tests/fixtures")
logger = setup_logger()
def load_csv(name: str) -> pd.DataFrame:
df = pd.read_csv(FIXTURE_PATH / name)
df = df.replace({np.nan: None})
return df
def test_default_export_integration(db_session):
# ----------------------------------------
# 1) Load csvs
# ----------------------------------------
t0 = time.perf_counter()
portfolio_df = load_csv("portfolio_569.csv")
properties_df = load_csv("properties_569.csv")
property_details_epc_df = load_csv("property_details_epc_569.csv")
plans_df = load_csv("plans_569.csv")
plan_recs_df = load_csv("plan_recs_569.csv")
recommendations_df = load_csv("recommendations_569.csv")
# Shrink down recommendations_df to speed up the data load. For this test, we only need
# default recommendations so let's focus on those. We filter on where default is true
recommendations_df = recommendations_df[
recommendations_df["default"]
]
valid_rec_ids = recommendations_df["id"].unique()
plan_recs_df = plan_recs_df[
plan_recs_df["recommendation_id"].isin(valid_rec_ids)
]
logger.info(
"Loaded CSVs in %.2f seconds | properties=%s plans=%s recs=%s",
time.perf_counter() - t0,
len(properties_df),
len(plans_df),
len(recommendations_df),
)
logger.info("Starting database load")
db_load_t0 = time.perf_counter()
# ----------------------------------------
# 2) Insert test portfolio
# ----------------------------------------
portfolios = []
for row in portfolio_df.itertuples(index=False):
portfolios.append(
Portfolio(
id=row.id,
name=row.name,
status=PortfolioStatus[row.status.split(".")[-1]],
goal=PortfolioGoal[row.goal.split(".")[-1]] if row.goal else None,
)
)
db_session.bulk_save_objects(portfolios)
db_session.flush()
# ----------------------------------------
# 3) Insert test property
# ----------------------------------------
properties = []
for row in properties_df.itertuples(index=False):
row_dict = row._asdict()
row_dict["uprn"] = int(row_dict["uprn"]) if row_dict.get("uprn") else None
row_dict["building_reference_number"] = (
int(row_dict["building_reference_number"])
if row_dict.get("building_reference_number")
else None
)
prop = PropertyModel(**{
col: row_dict[col]
for col in PropertyModel.__table__.columns.keys()
if col in row_dict
})
prop.creation_status = PropertyCreationStatus[
row_dict["creation_status"].split(".")[-1]
]
prop.status = PortfolioStatus[row_dict["status"].split(".")[-1]]
if row_dict.get("current_epc_rating"):
prop.current_epc_rating = Epc[
row_dict["current_epc_rating"].split(".")[-1]
]
properties.append(prop)
db_session.bulk_save_objects(properties)
db_session.flush()
# ----------------------------------------
# 4) Insert property details - EPC
# ----------------------------------------
property_lookup = {
prop.uprn: prop
for prop in db_session.query(PropertyModel).all()
}
epc_rows = []
for row in property_details_epc_df.itertuples(index=False):
row_dict = row._asdict()
uprn = int(row_dict["uprn"]) if row_dict.get("uprn") else None
property_obj = property_lookup.get(uprn)
if not property_obj:
continue # skip if property not found
# Build only fields that exist on the model
epc_data = {
col.name: row_dict[col.name]
for col in PropertyDetailsEpcModel.__table__.columns
if col.name in row_dict and col.name not in ["id", "property_id", "portfolio_id"]
}
epc = PropertyDetailsEpcModel(
property_id=property_obj.id,
portfolio_id=property_obj.portfolio_id,
**epc_data,
)
epc_rows.append(epc)
db_session.bulk_save_objects(epc_rows)
db_session.flush()
# ----------------------------------------
# 4) Insert default plan
# ----------------------------------------
plans = []
for row in plans_df.itertuples(index=False):
row_dict = row._asdict()
if row_dict.get("post_epc_rating"):
row_dict["post_epc_rating"] = Epc[
row_dict["post_epc_rating"].split(".")[-1]
]
row_dict["scenario_id"] = None
plan = PlanModel(**{
col: row_dict[col]
for col in PlanModel.__table__.columns.keys()
if col in row_dict
})
plans.append(plan)
db_session.bulk_save_objects(plans)
db_session.flush()
# ----------------------------------------
# 5) Insert recommendation
# ----------------------------------------
recs = [
Recommendation(**{
col: row[col]
for col in Recommendation.__table__.columns.keys()
if col in row
})
for _, row in recommendations_df.iterrows()
]
db_session.bulk_save_objects(recs)
db_session.flush()
# ----------------------------------------
# 6) Insert PlanRecommendations
# ----------------------------------------
links = [
PlanRecommendations(
plan_id=row.plan_id,
recommendation_id=row.recommendation_id,
)
for row in plan_recs_df.itertuples(index=False)
]
db_session.bulk_save_objects(links)
db_session.commit()
logger.info("Inserted all data in %.2f seconds", time.perf_counter() - db_load_t0)
# ----------------------------------------
# 6) Build payload
# ----------------------------------------
body_dict = {
"task_id": "test",
"subtask_id": "test",
"portfolio_id": 569,
"scenario_ids": [],
"default_plans_only": True,
}
payload = ExportRequest.model_validate(body_dict)
# ----------------------------------------
# 7) Call process_export
# ----------------------------------------
logger.info(
"Recommendation count in DB: %s",
db_session.query(Recommendation).count()
)
logger.info(
"Default + not installed count: %s",
db_session.query(Recommendation)
.filter(
Recommendation.default.is_(True),
Recommendation.already_installed.is_(False)
)
.count()
)
logger.info("Starting process_export")
process_t0 = time.perf_counter()
result = process_export(payload, session=db_session)
logger.info("process_export finished in %.2f seconds", time.perf_counter() - process_t0)
# ----------------------------------------
# 8) Assertions
# ----------------------------------------
assert "default_plans" in result
df = result["default_plans"]
assert not df.empty
# This test was generated on a real portfolio and so we check the things we expect to do
# 1) All packages are "compliant", where in this case, the properties should get to EPC C
failed = df[df["predicted_post_works_sap"] < 69]
failed_property_types = failed["property_type"].value_counts().to_dict()
assert failed_property_types["Flat"] == 113
assert failed_property_types["House"] == 8
assert failed_property_types["Bungalow"] == 4
assert failed_property_types["Maisonette"] == 1
# Check the houses
assert failed.shape[0]
# Errors for me:
# - should get to EPC C: https://ara.domna.homes/portfolio/569/building-passport/661051/plans
# - Why doesn't this get to a C, under the plan?:
# https://ara.domna.homes/portfolio/569/building-passport/660447/plans/1603913