mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
250 lines
7.3 KiB
Python
250 lines
7.3 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from pathlib import Path
|
|
import time
|
|
|
|
from backend.export.property_scenarios.main import process_export
|
|
from backend.export.property_scenarios.input_schema import ExportRequest
|
|
from backend.app.db.models.portfolio import PropertyModel, Epc, Portfolio, PortfolioStatus, PortfolioGoal, \
|
|
PropertyCreationStatus, PropertyDetailsEpcModel
|
|
from backend.app.db.models.recommendations import PlanModel, Recommendation, PlanRecommendations
|
|
from utils.logger import setup_logger
|
|
|
|
FIXTURE_PATH = Path("backend/export/tests/fixtures")
|
|
logger = setup_logger()
|
|
|
|
|
|
def load_csv(name: str) -> pd.DataFrame:
|
|
df = pd.read_csv(FIXTURE_PATH / name)
|
|
df = df.replace({np.nan: None})
|
|
return df
|
|
|
|
|
|
def test_default_export_integration(db_session):
|
|
# ----------------------------------------
|
|
# 1) Load csvs
|
|
# ----------------------------------------
|
|
t0 = time.perf_counter()
|
|
portfolio_df = load_csv("portfolio_569.csv")
|
|
properties_df = load_csv("properties_569.csv")
|
|
property_details_epc_df = load_csv("property_details_epc_569.csv")
|
|
plans_df = load_csv("plans_569.csv")
|
|
plan_recs_df = load_csv("plan_recs_569.csv")
|
|
recommendations_df = load_csv("recommendations_569.csv")
|
|
|
|
logger.info(
|
|
"Loaded CSVs in %.2f seconds | properties=%s plans=%s recs=%s",
|
|
time.perf_counter() - t0,
|
|
len(properties_df),
|
|
len(plans_df),
|
|
len(recommendations_df),
|
|
)
|
|
|
|
logger.info("Starting database load")
|
|
db_load_t0 = time.perf_counter()
|
|
|
|
# ----------------------------------------
|
|
# 2) Insert test portfolio
|
|
# ----------------------------------------
|
|
|
|
portfolios = []
|
|
for row in portfolio_df.itertuples(index=False):
|
|
portfolios.append(
|
|
Portfolio(
|
|
id=row.id,
|
|
name=row.name,
|
|
status=PortfolioStatus[row.status.split(".")[-1]],
|
|
goal=PortfolioGoal[row.goal.split(".")[-1]] if row.goal else None,
|
|
)
|
|
)
|
|
|
|
db_session.bulk_save_objects(portfolios)
|
|
db_session.flush()
|
|
# ----------------------------------------
|
|
# 3) Insert test property
|
|
# ----------------------------------------
|
|
|
|
properties = []
|
|
|
|
for row in properties_df.itertuples(index=False):
|
|
row_dict = row._asdict()
|
|
|
|
row_dict["uprn"] = int(row_dict["uprn"]) if row_dict.get("uprn") else None
|
|
row_dict["building_reference_number"] = (
|
|
int(row_dict["building_reference_number"])
|
|
if row_dict.get("building_reference_number")
|
|
else None
|
|
)
|
|
|
|
prop = PropertyModel(**{
|
|
col: row_dict[col]
|
|
for col in PropertyModel.__table__.columns.keys()
|
|
if col in row_dict
|
|
})
|
|
|
|
prop.creation_status = PropertyCreationStatus[
|
|
row_dict["creation_status"].split(".")[-1]
|
|
]
|
|
prop.status = PortfolioStatus[row_dict["status"].split(".")[-1]]
|
|
|
|
if row_dict.get("current_epc_rating"):
|
|
prop.current_epc_rating = Epc[
|
|
row_dict["current_epc_rating"].split(".")[-1]
|
|
]
|
|
|
|
properties.append(prop)
|
|
|
|
db_session.bulk_save_objects(properties)
|
|
db_session.flush()
|
|
|
|
# ----------------------------------------
|
|
# 4) Insert property details - EPC
|
|
# ----------------------------------------
|
|
|
|
epc_rows = []
|
|
|
|
for row in property_details_epc_df.itertuples(index=False):
|
|
row_dict = row._asdict()
|
|
|
|
# Build only fields that exist on the model
|
|
epc_data = {
|
|
col.name: row_dict[col.name]
|
|
for col in PropertyDetailsEpcModel.__table__.columns.values()
|
|
if col.name in row_dict and col.name not in ["id", "property_id", "portfolio_id"]
|
|
}
|
|
|
|
epc = PropertyDetailsEpcModel(
|
|
property_id=row.property_id,
|
|
portfolio_id=row.portfolio_id,
|
|
**epc_data,
|
|
)
|
|
|
|
epc_rows.append(epc)
|
|
|
|
db_session.bulk_save_objects(epc_rows)
|
|
db_session.flush()
|
|
|
|
# ----------------------------------------
|
|
# 4) Insert default plan
|
|
# ----------------------------------------
|
|
|
|
plans = []
|
|
|
|
for row in plans_df.itertuples(index=False):
|
|
row_dict = row._asdict()
|
|
|
|
if row_dict.get("post_epc_rating"):
|
|
row_dict["post_epc_rating"] = Epc[
|
|
row_dict["post_epc_rating"].split(".")[-1]
|
|
]
|
|
|
|
row_dict["scenario_id"] = None
|
|
|
|
plan = PlanModel(**{
|
|
col: row_dict[col]
|
|
for col in PlanModel.__table__.columns.keys()
|
|
if col in row_dict
|
|
})
|
|
|
|
plans.append(plan)
|
|
|
|
db_session.bulk_save_objects(plans)
|
|
db_session.flush()
|
|
|
|
# ----------------------------------------
|
|
# 5) Insert recommendation
|
|
# ----------------------------------------
|
|
|
|
recs = [
|
|
Recommendation(**{
|
|
col: row[col]
|
|
for col in Recommendation.__table__.columns.keys()
|
|
if col in row
|
|
})
|
|
for _, row in recommendations_df.iterrows()
|
|
]
|
|
|
|
db_session.bulk_save_objects(recs)
|
|
db_session.flush()
|
|
|
|
# ----------------------------------------
|
|
# 6) Insert PlanRecommendations
|
|
# ----------------------------------------
|
|
links = [
|
|
PlanRecommendations(
|
|
plan_id=row.plan_id,
|
|
recommendation_id=row.recommendation_id,
|
|
)
|
|
for row in plan_recs_df.itertuples(index=False)
|
|
]
|
|
|
|
db_session.bulk_save_objects(links)
|
|
db_session.commit()
|
|
logger.info("Inserted all data in %.2f seconds", time.perf_counter() - db_load_t0)
|
|
|
|
# ----------------------------------------
|
|
# 6) Build payload
|
|
# ----------------------------------------
|
|
|
|
body_dict = {
|
|
"task_id": "test",
|
|
"subtask_id": "test",
|
|
"portfolio_id": 569,
|
|
"scenario_ids": [],
|
|
"default_plans_only": True,
|
|
}
|
|
|
|
payload = ExportRequest.model_validate(body_dict)
|
|
|
|
# ----------------------------------------
|
|
# 7) Call process_export
|
|
# ----------------------------------------
|
|
|
|
logger.info(
|
|
"Recommendation count in DB: %s",
|
|
db_session.query(Recommendation).count()
|
|
)
|
|
|
|
logger.info(
|
|
"Property count in DB: %s",
|
|
db_session.query(PropertyModel).count()
|
|
)
|
|
|
|
logger.info(
|
|
"Property EPC in DB: %s",
|
|
db_session.query(PropertyDetailsEpcModel).count()
|
|
)
|
|
|
|
logger.info(
|
|
"Plan count in DB: %s",
|
|
db_session.query(PlanModel).count()
|
|
)
|
|
|
|
logger.info(
|
|
"PlanRecommendatons count in DB: %s",
|
|
db_session.query(PlanModel).count()
|
|
)
|
|
|
|
logger.info("Starting process_export")
|
|
process_t0 = time.perf_counter()
|
|
|
|
result = process_export(payload, session=db_session)
|
|
|
|
logger.info("process_export finished in %.2f seconds", time.perf_counter() - process_t0)
|
|
|
|
# ----------------------------------------
|
|
# 8) Assertions
|
|
# ----------------------------------------
|
|
|
|
assert "default_plans" in result, "Expected 'default_plans' in export result, got {}".format(result.keys())
|
|
|
|
df = result["default_plans"]
|
|
|
|
assert df.shape[0] == 10, "Expected 10 properties in the export, got {}".format(df.shape[0])
|
|
|
|
failed = df[df["predicted_post_works_sap"] < 69]
|
|
failed_property_types = failed["property_type"].value_counts().to_dict()
|
|
assert failed_property_types["Flat"] == 2
|
|
# Check the houses
|
|
|
|
assert failed.shape[0]
|