Model/backend/Outputs.py
2024-10-03 12:10:45 +01:00

343 lines
13 KiB
Python

import msgpack
import pandas as pd
import numpy as np
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from utils.s3 import read_from_s3, save_excel_to_s3
from backend.app.utils import sap_to_epc
from backend.app.db.connection import db_engine
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
class Outputs:
FORMATS = ["mds"]
MDS_MEASURE_MAPPING = {
"external_wall_insulation": "EWI (Trad Const)",
"cavity_wall_insulation": "CWI",
"loft_insulation": "LI",
"party_wall_insulation": "Party Wall Insu",
"internal_wall_insulation": "IWI (POA - Prov Sum Only)",
"suspended_floor_insulation": "U/F Insu (Manual install)",
"solid_floor_insulation": "Solid floor insl (Out of scope - Prov sum only)",
"air_source_heat_pump": "ASHP Htg",
"ground_source_heat_pump": "GSHP Htg",
"shared_ground_loops": "Shared ground loops",
"communal_heat_networks": "Communal heat networks",
"district_heating_networks": "District heating networks",
"high_heat_retention_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)",
"low_energy_lighting": "Low Energy Bulbs",
"cylinder_insulation": "Cyl Insulation",
"smart_controls": "Smart controls",
"zone_controls": "Zone controls",
"trvs": "Upgrade TRV's",
"solar_pv": "Solar PV",
"solar_thermal": "Solar Thermal",
"double_glazing": "Double Glazing (POA - Prov sum only)",
"draught_proofing": "Draught Proofing",
"mechanical_ventilation": "Ventilation upgrade",
"gas_boiler": "Gas Boiler Replacement",
"flat_roof_insulation": "Flat roof (Out of scope - prov sum only)",
"room_in_roof_insulation": "RIR (POA - Prov sum only)",
"ev_charging": "EV Charging",
"battery": "Battery"
}
def __init__(self, format, portfolio_id):
"""
This class handles the creation of standard outputs for the backend. For example, creation of
an excel output, to be used for the MDS data sheet, required by E.ON
:param format: The format of the output, e.g. mds
:param portfolio_id: The id of the portfolio for which the output is being created
"""
if format not in self.FORMATS:
raise ValueError("Invalid format, should be one of {}".format(self.FORMATS))
self.format = format
self.portfolio_id = portfolio_id
self.today = datetime.now().strftime("%Y-%m-%d")
# Connect to the database
self.session = sessionmaker(bind=db_engine)()
# Download cleaned data
self.cleaned_epc_lookup = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
self.cleaned_epc_lookup = msgpack.unpackb(self.cleaned_epc_lookup, raw=False)
def get_properties_from_db(self):
# Get properties and their details for a specific portfolio
properties_query = self.session.query(
PropertyModel,
PropertyDetailsEpcModel
).join(
PropertyDetailsEpcModel,
PropertyModel.id == PropertyDetailsEpcModel.property_id
).filter(
PropertyModel.portfolio_id == self.portfolio_id # Filter by portfolio ID
).all()
# Transform properties data to include all fields dynamically
properties_data = [
{**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
**{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
PropertyDetailsEpcModel.__table__.columns}}
for prop in properties_query
]
return properties_data
def get_plans_from_db(self):
plans_query = self.session.query(Plan).filter(Plan.portfolio_id == self.portfolio_id).all()
# Transform plans data to include all fields dynamically
plans_data = [
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
for plan in plans_query
]
return plans_data
def get_recommendations_from_db(self, plan_ids):
# Get recommendations through PlanRecommendations for those plans and that are default
recommendations_query = self.session.query(
Recommendation,
Plan.scenario_id
).join(
PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
).join(
Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
).filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default == True # Filtering for default recommendations
).all()
# Transform recommendations data to include all fields dynamically and include scenario_id
recommendations_data = [
{
**{
col.name: getattr(rec.Recommendation, col.name) if
hasattr(rec, 'Recommendation') else getattr(rec, col.name)
for col in Recommendation.__table__.columns
},
"Scenario ID": rec.scenario_id
} for rec in recommendations_query
]
return recommendations_data
def make_mds_measure_matrix(self, scenario_recommendations):
all_measures = list(self.MDS_MEASURE_MAPPING.values())
# Collect rows in a list
rows = []
# Populate the rows list
for idx, row in scenario_recommendations.iterrows():
property_id = row["property_id"]
measure_type = row["measure_type"]
# Get the label for the current type
measure_label = self.MDS_MEASURE_MAPPING.get(measure_type, None)
# If the property_id already exists in the collected rows, update it
existing_row = next((item for item in rows if item["property_id"] == property_id), None)
if existing_row is None:
# Create a new row if the property_id doesn't exist
new_row = {measure: None for measure in all_measures}
new_row["property_id"] = property_id
rows.append(new_row)
else:
new_row = existing_row
# Set the corresponding measure label in the row
new_row[measure_label] = measure_label
# Convert the list of dictionaries to a DataFrame
matrix = pd.DataFrame(rows)
# Reset the index for cleanliness
matrix.reset_index(drop=True, inplace=True)
return matrix
def export_mds(self):
"""
This function will export the data in the MDS format
Core data required:
- Property address
- Property postcode
- uprn
- recommended measures
- pre-EPC
- pre-SAP
- pre Heat Demand
- Property Type
- Built form
- Wall type
- Tenure
- Fuel type
- Estimated bill
- Recommended measures
- Post EPC
- Post heat demand
- Bill savings
- Kwh savings
"""
self.session.begin()
properties_data = self.get_properties_from_db()
plans_data = self.get_plans_from_db()
plan_ids = [plan['id'] for plan in plans_data]
recommendations_data = self.get_recommendations_from_db(plan_ids)
self.session.close()
# Convert these tables to dataframes
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
scenario_ids = plans_df["scenario_id"].unique()
# We start to create the MDS sheet
mds = properties_df[
[
"property_id",
"address",
"postcode",
"uprn",
"current_epc_rating",
"current_sap_points",
"primary_energy_consumption",
"property_type",
"built_form",
"total_floor_area",
"walls",
"tenure",
"mainfuel",
# The bills columns are split out - we include them and aggregate, without appliances
"heating_cost_current",
"hot_water_cost_current",
"lighting_cost_current",
"gas_standing_charge",
"electricity_standing_charge"
]
].copy().rename(
columns={
"address": "Address",
"postcode": "Postcode",
"uprn": "UPRN",
"current_epc_rating": "Pre EPC",
"current_sap_points": "EPC Source",
"primary_energy_consumption": "Existing Heating Demand Kwh/m2/y",
"property_type": "Property Type",
"built_form": "Built Form",
"total_floor_area": "Floor area m2 (If known)",
"walls": "Wall Type (Mandatory field)",
"tenure": "Tenure",
}
)
mds["Estimated bill (£ per year)"] = (
mds["heating_cost_current"] +
mds["hot_water_cost_current"] +
mds["lighting_cost_current"] +
mds["gas_standing_charge"] +
mds["electricity_standing_charge"]
)
mds = mds.drop(
columns=[
"heating_cost_current",
"hot_water_cost_current",
"lighting_cost_current",
"gas_standing_charge",
"electricity_standing_charge"
]
)
# Formatting - Pre EPC is an enum
mds["Pre EPC"] = [x.value for x in mds["Pre EPC"].values]
mds["Wall Type (Mandatory field)"] = mds["Wall Type (Mandatory field)"].str.split(",").str[0]
# Remove average thermal transmittance field
mds["Wall Type (Mandatory field)"] = np.where(
mds["Wall Type (Mandatory field)"].str.contains("Average thermal transmittance"),
"",
mds["Wall Type (Mandatory field)"]
)
mds = mds.merge(
pd.DataFrame(self.cleaned_epc_lookup["main-fuel"])[["clean_description", "fuel_type"]],
left_on="mainfuel",
right_on="clean_description",
how="left"
)
mds = mds.rename(columns={"fuel_type": "Existing Fuel Type"}).drop(columns=["clean_description", "mainfuel"])
mds["Existing Fuel Type"].value_counts()
mds_output_by_scenario = {}
for scenario_id in scenario_ids:
scenario_recommendations = recommendations_df[recommendations_df["Scenario ID"] == scenario_id]
# For each measure, we create the measure matrix
scenario_measure_matrix = self.make_mds_measure_matrix(scenario_recommendations)
# Calculate the predicted impact on: SAP, heat demand, bills, kwh
recommendation_impacts = scenario_recommendations.groupby("property_id")[
["sap_points", "heat_demand", "kwh_savings", "energy_cost_savings"]
].sum().reset_index()
scenario_mds = mds.merge(
scenario_measure_matrix, how="left", on="property_id"
).merge(
recommendation_impacts, how="left", on="property_id"
)
# If we have no recommendations, sap_points, kwh_savings, head_demand will be NaN
to_clean = [c for c in recommendation_impacts.columns if c != "property_id"]
for col in to_clean:
scenario_mds[col].fillna(0, inplace=True)
scenario_mds.fillna(0, inplace=True)
scenario_mds["Post SAP"] = scenario_mds["EPC Source"] + scenario_mds["sap_points"]
# Round Post SAP down to the nearest integer
scenario_mds["Post SAP"] = scenario_mds["Post SAP"].apply(lambda x: int(x))
scenario_mds["Post EPC"] = scenario_mds["Post SAP"].apply(lambda x: sap_to_epc(x))
scenario_mds["Heating Demand Kwh/m2/y"] = (
scenario_mds["Existing Heating Demand Kwh/m2/y"] - scenario_mds["heat_demand"]
)
scenario_mds = scenario_mds.rename(
columns={
"sap_points": "Predicted SAP Points",
"kwh_savings": "Energy Saving (Kwh)",
"energy_cost_savings": "Bill Reduction (£ per yr)"
}
)
mds_output_by_scenario[scenario_id] = scenario_mds
# We now save them to s3 as excels
for scenario_id, scenario_mds in mds_output_by_scenario.items():
save_excel_to_s3(
df=scenario_mds,
file_key=f"engine_outputs/{self.format}/{self.today}_scenario_id={scenario_id}.xlsx",
bucket_name="retrofit-data-dev"
)
def export(self):
"""
This function will export the data in the required format
"""
if self.format == "mds":
self.export_mds()
raise NotImplementedError("Export format not implemented")