import pandas as pd from sqlalchemy.orm import sessionmaker from backend.app.utils import sap_to_epc from backend.app.db.connection import db_engine from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations class Outputs: FORMATS = ["mds"] MDS_MEASURE_MAPPING = { "external_wall_insulation": "EWI (Trad Const)", "cavity_wall_insulation": "CWI", "loft_insulation": "LI", "party_wall_insulation": "Party Wall Insu", "internal_wall_insulation": "IWI (POA - Prov Sum Only)", "suspended_floor_insulation": "U/F Insu (Manual install)", "solid_floor_insulation": "Solid floor insl (Out of scope - Prov sum only)", "air_source_heat_pump": "ASHP Htg", "ground_source_heat_pump": "GSHP Htg", "shared_ground_loops": "Shared ground loops", "communal_heat_networks": "Communal heat networks", "district_heating_networks": "District heating networks", "high_heat_retention_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)", "low_energy_lighting": "Low Energy Bulbs", "cylinder_insulation": "Cyl Insulation", "smart_controls": "Smart controls", "zone_controls": "Zone controls", "trvs": "Upgrade TRV's", "solar_pv": "Solar PV", "solar_thermal": "Solar Thermal", "double_glazing": "Double Glazing (POA - Prov sum only)", "draught_proofing": "Draught Proofing", "mechanical_ventilation": "Ventilation upgrade", "gas_boiler": "Gas Boiler Replacement", "flat_roof_insulation": "Flat roof (Out of scope - prov sum only)", "room_in_roof_insulation": "RIR (POA - Prov sum only)", "ev_charging": "EV Charging", "battery": "Battery" } def __init__(self, format, portfolio_id): """ This class handles the creation of standard outputs for the backend. For example, creation of an excel output, to be used for the MDS data sheet, required by E.ON :param format: The format of the output, e.g. mds :param portfolio_id: The id of the portfolio for which the output is being created """ if format not in self.FORMATS: raise ValueError("Invalid format, should be one of {}".format(self.FORMATS)) self.format = format self.portfolio_id = portfolio_id # Connect to the database self.session = sessionmaker(bind=db_engine)() def get_properties_from_db(self): # Get properties and their details for a specific portfolio properties_query = self.session.query( PropertyModel, PropertyDetailsEpcModel ).join( PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id ).filter( PropertyModel.portfolio_id == self.portfolio_id # Filter by portfolio ID ).all() # Transform properties data to include all fields dynamically properties_data = [ {**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns}, **{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in PropertyDetailsEpcModel.__table__.columns}} for prop in properties_query ] return properties_data def get_plans_from_db(self): plans_query = self.session.query(Plan).filter(Plan.portfolio_id == self.portfolio_id).all() # Transform plans data to include all fields dynamically plans_data = [ {col.name: getattr(plan, col.name) for col in Plan.__table__.columns} for plan in plans_query ] return plans_data def get_recommendations_from_db(self, plan_ids): # Get recommendations through PlanRecommendations for those plans and that are default recommendations_query = self.session.query( Recommendation, Plan.scenario_id ).join( PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id ).join( Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id ).filter( PlanRecommendations.plan_id.in_(plan_ids), Recommendation.default == True # Filtering for default recommendations ).all() # Transform recommendations data to include all fields dynamically and include scenario_id recommendations_data = [ { **{ col.name: getattr(rec.Recommendation, col.name) if hasattr(rec, 'Recommendation') else getattr(rec, col.name) for col in Recommendation.__table__.columns }, "Scenario ID": rec.scenario_id } for rec in recommendations_query ] return recommendations_data def make_mds_measure_matrix(self, scenario_recommendations): all_measures = list(self.MDS_MEASURE_MAPPING.values()) # Collect rows in a list rows = [] # Populate the rows list for idx, row in scenario_recommendations.iterrows(): property_id = row["property_id"] measure_type = row["measure_type"] # Get the label for the current type measure_label = self.MDS_MEASURE_MAPPING.get(measure_type, None) # If the property_id already exists in the collected rows, update it existing_row = next((item for item in rows if item["property_id"] == property_id), None) if existing_row is None: # Create a new row if the property_id doesn't exist new_row = {measure: None for measure in all_measures} new_row["property_id"] = property_id rows.append(new_row) else: new_row = existing_row # Set the corresponding measure label in the row new_row[measure_label] = measure_label # Convert the list of dictionaries to a DataFrame matrix = pd.DataFrame(rows) # Reset the index for cleanliness matrix.reset_index(drop=True, inplace=True) return matrix def export_mds(self): """ This function will export the data in the MDS format Core data required: - Property address - Property postcode - uprn - recommended measures - pre-EPC - pre-SAP - pre Heat Demand - Property Type - Built form - Wall type - Tenure - Fuel type - Estimated bill - Recommended measures - Post EPC - Post heat demand - Bill savings - Kwh savings """ self.session.begin() properties_data = self.get_properties_from_db() plans_data = self.get_plans_from_db() plan_ids = [plan['id'] for plan in plans_data] recommendations_data = self.get_recommendations_from_db(plan_ids) self.session.close() # Convert these tables to dataframes properties_df = pd.DataFrame(properties_data) plans_df = pd.DataFrame(plans_data) recommendations_df = pd.DataFrame(recommendations_data) scenario_ids = plans_df["scenario_id"].unique() # We start to create the MDS sheet mds = properties_df[ [ "property_id", "address", "postcode", "uprn", "current_epc_rating", "current_sap_points", # TODO: Need to add current heat demand "property_type", "built_form", "total_floor_area", "walls", "tenure", "mainfuel", # TODO: For estimated bill, this should probably be without the cost of appliances ] ].copy().rename( columns={ "address": "Address", "postcode": "Postcode", "uprn": "UPRN", "current_epc_rating": "Pre EPC", "current_sap_points": "EPC Source", # TODO: Need to add current heat demand "property_type": "Property Type", "built_form": "Built Form", "total_floor_area": "Floor area m2 (If known)", "walls": "Wall Type (Mandatory field)", "tenure": "Tenure", "mainfuel": "Existing Fuel Type" # TODO: For estimated bill, this should probably be without the cost of appliances } ) # TODO - format # 1) property type # 2) walls # 3) tenure # 4) mainfuel # 5) Epc Rating mds_output_by_scenario = {} for scenario_id in scenario_ids: scenario_recommendations = recommendations_df[recommendations_df["Scenario ID"] == scenario_id] # For each measure, we create the measure matrix scenario_measure_matrix = self.make_mds_measure_matrix(scenario_recommendations) # Calculate the predicted impact on: SAP, heat demand, bills, kwh recommendation_impacts = scenario_recommendations.groupby("property_id")[ ["sap_points", "heat_demand", "kwh_savings", "energy_cost_savings"] ].sum().reset_index() scenario_mds = mds.merge( scenario_measure_matrix, how="left", on="property_id" ).merge( recommendation_impacts, how="left", on="property_id" ) # If we have no recommendations, sap_points, kwh_savings, head_demand will be NaN to_clean = [c for c in recommendation_impacts.columns if c != "property_id"] for col in to_clean: scenario_mds[col].fillna(0, inplace=True) scenario_mds.fillna(0, inplace=True) scenario_mds["Post SAP"] = scenario_mds["EPC Source"] + scenario_mds["sap_points"] # Round Post SAP down to the nearest integer scenario_mds["Post SAP"] = scenario_mds["Post SAP"].apply(lambda x: int(x)) scenario_mds["Post EPC"] = scenario_mds["Post SAP"].apply(lambda x: sap_to_epc(x)) # TODO: Post heat demand scenario_mds = scenario_mds.rename( columns={ "sap_points": "Predicted SAP Points", "kwh_savings": "Energy Saving (Kwh)", "energy_cost_savings": "Bill Reduction (£ per yr)" } ) def export(self): """ This function will export the data in the required format """ if self.format == "mds": self.export_mds()