import pandas as pd import numpy as np from pathlib import Path import time from backend.export.property_scenarios.main import process_export from backend.export.property_scenarios.input_schema import ExportRequest from backend.app.db.models.portfolio import PropertyModel, Epc, Portfolio, PortfolioStatus, PortfolioGoal, \ PropertyCreationStatus, PropertyDetailsEpcModel from backend.app.db.models.recommendations import PlanModel, Recommendation, PlanRecommendations from utils.logger import setup_logger FIXTURE_PATH = Path("backend/export/tests/fixtures") logger = setup_logger() def load_csv(name: str) -> pd.DataFrame: df = pd.read_csv(FIXTURE_PATH / name) df = df.replace({np.nan: None}) return df def test_default_export_integration(db_session): # ---------------------------------------- # 1) Load csvs # ---------------------------------------- t0 = time.perf_counter() portfolio_df = load_csv("portfolio_569.csv") properties_df = load_csv("properties_569.csv") property_details_epc_df = load_csv("property_details_epc_569.csv") plans_df = load_csv("plans_569.csv") plan_recs_df = load_csv("plan_recs_569.csv") recommendations_df = load_csv("recommendations_569.csv") # Shrink down recommendations_df to speed up the data load. For this test, we only need # default recommendations so let's focus on those. We filter on where default is true recommendations_df = recommendations_df[ recommendations_df["default"] ] valid_rec_ids = recommendations_df["id"].unique() plan_recs_df = plan_recs_df[ plan_recs_df["recommendation_id"].isin(valid_rec_ids) ] logger.info( "Loaded CSVs in %.2f seconds | properties=%s plans=%s recs=%s", time.perf_counter() - t0, len(properties_df), len(plans_df), len(recommendations_df), ) logger.info("Starting database load") db_load_t0 = time.perf_counter() # ---------------------------------------- # 2) Insert test portfolio # ---------------------------------------- portfolios = [] for row in portfolio_df.itertuples(index=False): portfolios.append( Portfolio( id=row.id, name=row.name, status=PortfolioStatus[row.status.split(".")[-1]], goal=PortfolioGoal[row.goal.split(".")[-1]] if row.goal else None, ) ) db_session.bulk_save_objects(portfolios) db_session.flush() # ---------------------------------------- # 3) Insert test property # ---------------------------------------- properties = [] for row in properties_df.itertuples(index=False): row_dict = row._asdict() row_dict["uprn"] = int(row_dict["uprn"]) if row_dict.get("uprn") else None row_dict["building_reference_number"] = ( int(row_dict["building_reference_number"]) if row_dict.get("building_reference_number") else None ) prop = PropertyModel(**{ col: row_dict[col] for col in PropertyModel.__table__.columns.keys() if col in row_dict }) prop.creation_status = PropertyCreationStatus[ row_dict["creation_status"].split(".")[-1] ] prop.status = PortfolioStatus[row_dict["status"].split(".")[-1]] if row_dict.get("current_epc_rating"): prop.current_epc_rating = Epc[ row_dict["current_epc_rating"].split(".")[-1] ] properties.append(prop) db_session.bulk_save_objects(properties) db_session.flush() # ---------------------------------------- # 4) Insert property details - EPC # ---------------------------------------- property_lookup = { prop.uprn: prop for prop in db_session.query(PropertyModel).all() } epc_rows = [] for row in property_details_epc_df.itertuples(index=False): row_dict = row._asdict() uprn = int(row_dict["uprn"]) if row_dict.get("uprn") else None property_obj = property_lookup.get(uprn) if not property_obj: continue # skip if property not found # Build only fields that exist on the model epc_data = { col.name: row_dict[col.name] for col in PropertyDetailsEpcModel.__table__.columns if col.name in row_dict and col.name not in ["id", "property_id", "portfolio_id"] } epc = PropertyDetailsEpcModel( property_id=property_obj.id, portfolio_id=property_obj.portfolio_id, **epc_data, ) epc_rows.append(epc) db_session.bulk_save_objects(epc_rows) db_session.flush() # ---------------------------------------- # 4) Insert default plan # ---------------------------------------- plans = [] for row in plans_df.itertuples(index=False): row_dict = row._asdict() if row_dict.get("post_epc_rating"): row_dict["post_epc_rating"] = Epc[ row_dict["post_epc_rating"].split(".")[-1] ] row_dict["scenario_id"] = None plan = PlanModel(**{ col: row_dict[col] for col in PlanModel.__table__.columns.keys() if col in row_dict }) plans.append(plan) db_session.bulk_save_objects(plans) db_session.flush() # ---------------------------------------- # 5) Insert recommendation # ---------------------------------------- recs = [ Recommendation(**{ col: row[col] for col in Recommendation.__table__.columns.keys() if col in row }) for _, row in recommendations_df.iterrows() ] db_session.bulk_save_objects(recs) db_session.flush() # ---------------------------------------- # 6) Insert PlanRecommendations # ---------------------------------------- links = [ PlanRecommendations( plan_id=row.plan_id, recommendation_id=row.recommendation_id, ) for row in plan_recs_df.itertuples(index=False) ] db_session.bulk_save_objects(links) db_session.commit() logger.info("Inserted all data in %.2f seconds", time.perf_counter() - db_load_t0) # ---------------------------------------- # 6) Build payload # ---------------------------------------- body_dict = { "task_id": "test", "subtask_id": "test", "portfolio_id": 569, "scenario_ids": [], "default_plans_only": True, } payload = ExportRequest.model_validate(body_dict) # ---------------------------------------- # 7) Call process_export # ---------------------------------------- logger.info( "Recommendation count in DB: %s", db_session.query(Recommendation).count() ) logger.info( "Default + not installed count: %s", db_session.query(Recommendation) .filter( Recommendation.default.is_(True), Recommendation.already_installed.is_(False) ) .count() ) logger.info("Starting process_export") process_t0 = time.perf_counter() result = process_export(payload, session=db_session) logger.info("process_export finished in %.2f seconds", time.perf_counter() - process_t0) # ---------------------------------------- # 8) Assertions # ---------------------------------------- assert "default_plans" in result df = result["default_plans"] assert not df.empty # This test was generated on a real portfolio and so we check the things we expect to do # 1) All packages are "compliant", where in this case, the properties should get to EPC C failed = df[df["predicted_post_works_sap"] < 69] failed_property_types = failed["property_type"].value_counts().to_dict() assert failed_property_types["Flat"] == 113 assert failed_property_types["House"] == 8 assert failed_property_types["Bungalow"] == 4 assert failed_property_types["Maisonette"] == 1 # Check the houses assert failed.shape[0] # Errors for me: # - should get to EPC C: https://ara.domna.homes/portfolio/569/building-passport/661051/plans # - Why doesn't this get to a C, under the plan?: # https://ara.domna.homes/portfolio/569/building-passport/660447/plans/1603913