""" This script prepares the data for the financial model """ import pandas as pd import numpy as np from backend.app.utils import sap_to_epc from sqlalchemy.orm import sessionmaker from backend.app.db.connection import db_engine, db_read_session from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations, RecommendationMaterials from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel, PropertyDetailsSpatial from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict # PORTFOLIO_ID = 206 # SCENARIOS = [389] PORTFOLIO_ID = 449 # Peabody SCENARIOS = [ 922 ] scenario_names = { 922: "EPC C", } def get_data(portfolio_id, scenario_ids): session = sessionmaker(bind=db_engine)() session.begin() # -------------------- # Properties # -------------------- properties_query = session.query( PropertyModel, PropertyDetailsEpcModel ).join( PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id ).filter( PropertyModel.portfolio_id == portfolio_id ).all() properties_data = [ { **{col.name: getattr(p.PropertyModel, col.name) for col in PropertyModel.__table__.columns}, **{col.name: getattr(p.PropertyDetailsEpcModel, col.name) for col in PropertyDetailsEpcModel.__table__.columns}, } for p in properties_query ] # -------------------- # Plans # -------------------- plans_query = session.query(Plan).filter( Plan.scenario_id.in_(scenario_ids) ).all() plans_data = [ {col.name: getattr(plan, col.name) for col in Plan.__table__.columns} for plan in plans_query ] plan_ids = [p["id"] for p in plans_data] # -------------------- # Recommendations (NO materials yet) # -------------------- recommendations_query = session.query( Recommendation, Plan.scenario_id ).join( PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id ).join( Plan, Plan.id == PlanRecommendations.plan_id ).filter( PlanRecommendations.plan_id.in_(plan_ids), Recommendation.default.is_(True), Recommendation.already_installed.is_(False) ).all() recommendations_data = [ { **{col.name: getattr(r.Recommendation, col.name) for col in Recommendation.__table__.columns}, "scenario_id": r.scenario_id, "materials": [] # placeholder } for r in recommendations_query ] recommendation_ids = [r["id"] for r in recommendations_data] # -------------------- # Recommendation materials (SEPARATE QUERY) # -------------------- materials_query = session.query( RecommendationMaterials ).filter( RecommendationMaterials.recommendation_id.in_(recommendation_ids) ).all() # Group materials by recommendation_id materials_by_recommendation = defaultdict(list) for m in materials_query: materials_by_recommendation[m.recommendation_id].append({ "material_id": m.material_id, "depth": m.depth, "quantity": m.quantity, "quantity_unit": m.quantity_unit, "estimated_cost": m.estimated_cost, }) # Attach materials safely (no filtering side effects) for r in recommendations_data: r["materials"] = materials_by_recommendation.get(r["id"], []) session.close() return properties_data, plans_data, recommendations_data properties_data, plans_data, recommendations_data = get_data(portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS) properties_df = pd.DataFrame(properties_data) plans_df = pd.DataFrame(plans_data) recommendations_df = pd.DataFrame(recommendations_data) with db_read_session() as session: materials = get_materials(session) materials = pd.DataFrame(materials) material_lookup = ( materials .set_index("id")[["type", "includes_battery"]] .to_dict("index") ) def has_solar_with_battery(materials_list): for m in materials_list or []: mat = material_lookup.get(m["material_id"]) if not mat: continue if mat["type"] == "solar_pv" and mat["includes_battery"]: return True return False recommendations_df["has_solar_with_battery"] = ( recommendations_df["materials"].apply(has_solar_with_battery) ) recommendations_df["measure_type"] = np.where( recommendations_df["has_solar_with_battery"] == True, recommendations_df["measure_type"] + "_with_battery", recommendations_df["measure_type"] ) # Adjust material type to indicate if there is a battery included from utils.s3 import read_csv_from_s3, read_excel_from_s3 # asset_list = read_excel_from_s3( # bucket_name="retrofit-plan-inputs-dev", file_key="2/404/20251211T163200754Z/asset_list.xlsx", # header_row=0, sheet_name="Standardised Asset List" # ) for scenario_id in SCENARIOS: # Get recs for this scenario recommended_measures_df = recommendations_df[recommendations_df["scenario_id"] == scenario_id][ ["property_id", "measure_type", "estimated_cost", "default"] ] recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]] recommended_measures_df = recommended_measures_df.drop(columns=["default"]) post_install_sap = recommendations_df[recommendations_df["scenario_id"] == scenario_id][ ["property_id", "default", "sap_points"]] post_install_sap = post_install_sap[post_install_sap["default"]] # Sum up the sap points by property id post_install_sap = post_install_sap.groupby(["property_id"])[["sap_points"]].sum().reset_index() # Find dupes by property id and measure type dupes = recommended_measures_df.duplicated(subset=["property_id", "measure_type"], keep=False) dupe_df = recommended_measures_df[dupes] if dupe_df.shape: # Drop dupes - happened due to a funny bug recommended_measures_df = recommended_measures_df.drop_duplicates( subset=["property_id", "measure_type"], keep='first' ) recommendations_measures_pivot = recommended_measures_df.pivot( index='property_id', columns='measure_type', values='estimated_cost' ) recommendations_measures_pivot = recommendations_measures_pivot.reset_index() # Total cost is the row sum, excluding the property_id column recommendations_measures_pivot["total_retrofit_cost"] = recommendations_measures_pivot.drop( columns=["property_id"] ).sum(axis=1) df = properties_df[ [ "landlord_property_id", "property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows", "current_epc_rating", "current_sap_points", "total_floor_area", "number_of_rooms", ] ].merge( recommendations_measures_pivot, how="left", on="property_id" ).merge( post_install_sap, how="left", on="property_id" ) df = df.drop(columns=["property_id"]) df["sap_points"] = df["sap_points"].fillna(0) df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"] df["predicted_post_works_sap"] = df["predicted_post_works_sap"].round() df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(lambda x: sap_to_epc(x)) df["uprn"] = df["uprn"].astype(str) # Create excel to store to filename = (f"{scenario_names[scenario_id]}-clarion.xlsx") with pd.ExcelWriter(filename) as writer: df.to_excel(writer, sheet_name="properties", index=False) #( Junte) don't need anything below this # asset_list = pd.DataFrame(asset_list) # asset_list = asset_list.rename( # columns={ # "postcode": "domna_postcode" # } # ) # if "domna_full_address": # # For Peabody # asset_list["domna_full_address"] = asset_list["domna_address_1"] # # asset_list = asset_list[["domna_full_address", "domna_postcode", "epc_os_uprn", ]].copy() # asset_list = asset_list.rename(columns={"epc_os_uprn": "uprn"}) # asset_list["uprn"] = asset_list["uprn"].astype("Int64").astype(str) # asset_list = asset_list.merge( # df.drop(columns=["address", "postcode", "property_type", "total_floor_area"]), # how="left", # on="uprn" # ) # Get conservation area data from property details spatial. based on the UPRNs def get_conservation_area_data(uprns): session = sessionmaker(bind=db_engine)() session.begin() # Query to get conservation area data spatial_query = session.query( PropertyDetailsSpatial ).filter( PropertyDetailsSpatial.uprn.in_(uprns) # Filter by UPRNs ).all() # Transform spatial data to include all fields dynamically spatial_data = [ {col.name: getattr(spatial, col.name) for col in PropertyDetailsSpatial.__table__.columns} for spatial in spatial_query ] session.close() return pd.DataFrame(spatial_data) uprns = asset_list[ ~pd.isna(asset_list["uprn"]) & (asset_list["uprn"] != "") ]["uprn"].astype(int).unique().tolist() conservation_area_data = get_conservation_area_data(uprns) conservation_area_data["uprn"] = conservation_area_data["uprn"].astype(str) asset_list = asset_list.merge( conservation_area_data[["uprn", "conservation_status", "is_listed_building", "is_heritage_building"]], how="left", on="uprn" ) # For exporting df.to_excel( "EPC C -without floors proposed measures - " "with ID.xlsx", index=False ) # asset_list.to_excel( # "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Lincs Rural/epc_measures.xlsx", # index=False # ) condition_costs = pd.read_excel( "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx", sheet_name="Prices - Khalim", header=35 ) # Remove unnamed columns and reset index condition_costs = condition_costs.loc[:, ~condition_costs.columns.str.contains('^Unnamed')] condition_costs = condition_costs.reset_index(drop=True) # We now estimate condition cost def simulate_condition(asset_list, condition_costs): """ This function is for testing, and will simulate condition cost from 1-10 for each property to see what the costing array looks like. :param df: :return: """ condition_df = [] for _, row in asset_list.iterrows(): n_bathrooms = row["bathrooms"] conditions = {} for condition in reversed(range(1, 11)): condition_cost = condition_costs[ condition_costs["Condition"] == condition ].drop(columns=["Condition"]).iloc[0] # Each cost is scaled by floor area condition_cost = condition_cost * row["total_floor_area"] condition_cost["Bathroom"] = condition_cost["Bathroom"] * n_bathrooms total_condition_cost = condition_cost.sum() conditions["Condition " + str(condition)] = (total_condition_cost) condition_df.append( { "uprn": row["uprn"], **conditions } ) condition_df = pd.DataFrame(condition_df) asset_list = asset_list.merge( condition_df, how="left", on="uprn" ) return asset_list # asset_list = simulate_condition(asset_list, condition_costs) # We calculate the condition cost based on the condition for _, row in asset_list.iterrows(): condition = row["condition_score"] if condition in [None, ""]: continue condition = int(float(condition)) condition_cost = condition_costs[ condition_costs["Condition"] == condition ].drop(columns=["Condition"]).iloc[0] # Each cost is scaled by floor area condition_cost = condition_cost * float(row["total_floor_area"]) n_bathrooms = row["n_bathrooms"] condition_cost["Bathroom"] = condition_cost["Bathroom"] * float(n_bathrooms) total_condition_cost = condition_cost.sum() asset_list.loc[asset_list["uprn"] == row["uprn"], "domna_condition_cost"] = total_condition_cost # Store output asset_list.to_excel( "fabric_clarian_packages.xlsx", index=False ) condition_cost_comparison = asset_list[ ["condition_score", "decoration_sum_min ", "decoration_sum_max", "domna_condition_cost"] ]