mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
343 lines
13 KiB
Python
343 lines
13 KiB
Python
import msgpack
|
|
import pandas as pd
|
|
import numpy as np
|
|
from sqlalchemy.orm import sessionmaker
|
|
from datetime import datetime
|
|
|
|
from utils.s3 import read_from_s3, save_excel_to_s3
|
|
from backend.app.utils import sap_to_epc
|
|
from backend.app.db.connection import db_engine
|
|
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
|
|
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
|
|
|
|
|
|
class Outputs:
|
|
FORMATS = ["mds"]
|
|
|
|
MDS_MEASURE_MAPPING = {
|
|
"external_wall_insulation": "EWI (Trad Const)",
|
|
"cavity_wall_insulation": "CWI",
|
|
"loft_insulation": "LI",
|
|
"party_wall_insulation": "Party Wall Insu",
|
|
"internal_wall_insulation": "IWI (POA - Prov Sum Only)",
|
|
"suspended_floor_insulation": "U/F Insu (Manual install)",
|
|
"solid_floor_insulation": "Solid floor insl (Out of scope - Prov sum only)",
|
|
"air_source_heat_pump": "ASHP Htg",
|
|
"ground_source_heat_pump": "GSHP Htg",
|
|
"shared_ground_loops": "Shared ground loops",
|
|
"communal_heat_networks": "Communal heat networks",
|
|
"district_heating_networks": "District heating networks",
|
|
"high_heat_retention_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)",
|
|
"low_energy_lighting": "Low Energy Bulbs",
|
|
"cylinder_insulation": "Cyl Insulation",
|
|
"smart_controls": "Smart controls",
|
|
"zone_controls": "Zone controls",
|
|
"trvs": "Upgrade TRV's",
|
|
"solar_pv": "Solar PV",
|
|
"solar_thermal": "Solar Thermal",
|
|
"double_glazing": "Double Glazing (POA - Prov sum only)",
|
|
"draught_proofing": "Draught Proofing",
|
|
"mechanical_ventilation": "Ventilation upgrade",
|
|
"gas_boiler": "Gas Boiler Replacement",
|
|
"flat_roof_insulation": "Flat roof (Out of scope - prov sum only)",
|
|
"room_in_roof_insulation": "RIR (POA - Prov sum only)",
|
|
"ev_charging": "EV Charging",
|
|
"battery": "Battery"
|
|
}
|
|
|
|
def __init__(self, format, portfolio_id):
|
|
"""
|
|
This class handles the creation of standard outputs for the backend. For example, creation of
|
|
an excel output, to be used for the MDS data sheet, required by E.ON
|
|
|
|
:param format: The format of the output, e.g. mds
|
|
:param portfolio_id: The id of the portfolio for which the output is being created
|
|
"""
|
|
|
|
if format not in self.FORMATS:
|
|
raise ValueError("Invalid format, should be one of {}".format(self.FORMATS))
|
|
|
|
self.format = format
|
|
self.portfolio_id = portfolio_id
|
|
self.today = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
# Connect to the database
|
|
self.session = sessionmaker(bind=db_engine)()
|
|
|
|
# Download cleaned data
|
|
self.cleaned_epc_lookup = read_from_s3(
|
|
s3_file_name="cleaned_epc_data/cleaned.bson",
|
|
bucket_name="retrofit-data-dev"
|
|
)
|
|
|
|
self.cleaned_epc_lookup = msgpack.unpackb(self.cleaned_epc_lookup, raw=False)
|
|
|
|
def get_properties_from_db(self):
|
|
# Get properties and their details for a specific portfolio
|
|
properties_query = self.session.query(
|
|
PropertyModel,
|
|
PropertyDetailsEpcModel
|
|
).join(
|
|
PropertyDetailsEpcModel,
|
|
PropertyModel.id == PropertyDetailsEpcModel.property_id
|
|
).filter(
|
|
PropertyModel.portfolio_id == self.portfolio_id # Filter by portfolio ID
|
|
).all()
|
|
|
|
# Transform properties data to include all fields dynamically
|
|
properties_data = [
|
|
{**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
|
|
**{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
|
|
PropertyDetailsEpcModel.__table__.columns}}
|
|
for prop in properties_query
|
|
]
|
|
|
|
return properties_data
|
|
|
|
def get_plans_from_db(self):
|
|
|
|
plans_query = self.session.query(Plan).filter(Plan.portfolio_id == self.portfolio_id).all()
|
|
# Transform plans data to include all fields dynamically
|
|
plans_data = [
|
|
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
|
|
for plan in plans_query
|
|
]
|
|
|
|
return plans_data
|
|
|
|
def get_recommendations_from_db(self, plan_ids):
|
|
# Get recommendations through PlanRecommendations for those plans and that are default
|
|
recommendations_query = self.session.query(
|
|
Recommendation,
|
|
Plan.scenario_id
|
|
).join(
|
|
PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
|
|
).join(
|
|
Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
|
|
).filter(
|
|
PlanRecommendations.plan_id.in_(plan_ids),
|
|
Recommendation.default == True # Filtering for default recommendations
|
|
).all()
|
|
|
|
# Transform recommendations data to include all fields dynamically and include scenario_id
|
|
recommendations_data = [
|
|
{
|
|
**{
|
|
col.name: getattr(rec.Recommendation, col.name) if
|
|
hasattr(rec, 'Recommendation') else getattr(rec, col.name)
|
|
for col in Recommendation.__table__.columns
|
|
},
|
|
"Scenario ID": rec.scenario_id
|
|
} for rec in recommendations_query
|
|
]
|
|
|
|
return recommendations_data
|
|
|
|
def make_mds_measure_matrix(self, scenario_recommendations):
|
|
all_measures = list(self.MDS_MEASURE_MAPPING.values())
|
|
|
|
# Collect rows in a list
|
|
rows = []
|
|
|
|
# Populate the rows list
|
|
for idx, row in scenario_recommendations.iterrows():
|
|
property_id = row["property_id"]
|
|
measure_type = row["measure_type"]
|
|
|
|
# Get the label for the current type
|
|
measure_label = self.MDS_MEASURE_MAPPING.get(measure_type, None)
|
|
|
|
# If the property_id already exists in the collected rows, update it
|
|
existing_row = next((item for item in rows if item["property_id"] == property_id), None)
|
|
if existing_row is None:
|
|
# Create a new row if the property_id doesn't exist
|
|
new_row = {measure: None for measure in all_measures}
|
|
new_row["property_id"] = property_id
|
|
rows.append(new_row)
|
|
else:
|
|
new_row = existing_row
|
|
|
|
# Set the corresponding measure label in the row
|
|
new_row[measure_label] = measure_label
|
|
|
|
# Convert the list of dictionaries to a DataFrame
|
|
matrix = pd.DataFrame(rows)
|
|
|
|
# Reset the index for cleanliness
|
|
matrix.reset_index(drop=True, inplace=True)
|
|
|
|
return matrix
|
|
|
|
def export_mds(self):
|
|
"""
|
|
This function will export the data in the MDS format
|
|
Core data required:
|
|
- Property address
|
|
- Property postcode
|
|
- uprn
|
|
- recommended measures
|
|
- pre-EPC
|
|
- pre-SAP
|
|
- pre Heat Demand
|
|
- Property Type
|
|
- Built form
|
|
- Wall type
|
|
- Tenure
|
|
- Fuel type
|
|
- Estimated bill
|
|
- Recommended measures
|
|
- Post EPC
|
|
- Post heat demand
|
|
- Bill savings
|
|
- Kwh savings
|
|
"""
|
|
|
|
self.session.begin()
|
|
properties_data = self.get_properties_from_db()
|
|
|
|
plans_data = self.get_plans_from_db()
|
|
plan_ids = [plan['id'] for plan in plans_data]
|
|
|
|
recommendations_data = self.get_recommendations_from_db(plan_ids)
|
|
self.session.close()
|
|
|
|
# Convert these tables to dataframes
|
|
properties_df = pd.DataFrame(properties_data)
|
|
plans_df = pd.DataFrame(plans_data)
|
|
recommendations_df = pd.DataFrame(recommendations_data)
|
|
|
|
scenario_ids = plans_df["scenario_id"].unique()
|
|
|
|
# We start to create the MDS sheet
|
|
mds = properties_df[
|
|
[
|
|
"property_id",
|
|
"address",
|
|
"postcode",
|
|
"uprn",
|
|
"current_epc_rating",
|
|
"current_sap_points",
|
|
"primary_energy_consumption",
|
|
"property_type",
|
|
"built_form",
|
|
"total_floor_area",
|
|
"walls",
|
|
"tenure",
|
|
"mainfuel",
|
|
# The bills columns are split out - we include them and aggregate, without appliances
|
|
"heating_cost_current",
|
|
"hot_water_cost_current",
|
|
"lighting_cost_current",
|
|
"gas_standing_charge",
|
|
"electricity_standing_charge"
|
|
]
|
|
].copy().rename(
|
|
columns={
|
|
"address": "Address",
|
|
"postcode": "Postcode",
|
|
"uprn": "UPRN",
|
|
"current_epc_rating": "Pre EPC",
|
|
"current_sap_points": "EPC Source",
|
|
"primary_energy_consumption": "Existing Heating Demand Kwh/m2/y",
|
|
"property_type": "Property Type",
|
|
"built_form": "Built Form",
|
|
"total_floor_area": "Floor area m2 (If known)",
|
|
"walls": "Wall Type (Mandatory field)",
|
|
"tenure": "Tenure",
|
|
}
|
|
)
|
|
|
|
mds["Estimated bill (£ per year)"] = (
|
|
mds["heating_cost_current"] +
|
|
mds["hot_water_cost_current"] +
|
|
mds["lighting_cost_current"] +
|
|
mds["gas_standing_charge"] +
|
|
mds["electricity_standing_charge"]
|
|
)
|
|
|
|
mds = mds.drop(
|
|
columns=[
|
|
"heating_cost_current",
|
|
"hot_water_cost_current",
|
|
"lighting_cost_current",
|
|
"gas_standing_charge",
|
|
"electricity_standing_charge"
|
|
]
|
|
)
|
|
|
|
# Formatting - Pre EPC is an enum
|
|
mds["Pre EPC"] = [x.value for x in mds["Pre EPC"].values]
|
|
mds["Wall Type (Mandatory field)"] = mds["Wall Type (Mandatory field)"].str.split(",").str[0]
|
|
# Remove average thermal transmittance field
|
|
mds["Wall Type (Mandatory field)"] = np.where(
|
|
mds["Wall Type (Mandatory field)"].str.contains("Average thermal transmittance"),
|
|
"",
|
|
mds["Wall Type (Mandatory field)"]
|
|
)
|
|
|
|
mds = mds.merge(
|
|
pd.DataFrame(self.cleaned_epc_lookup["main-fuel"])[["clean_description", "fuel_type"]],
|
|
left_on="mainfuel",
|
|
right_on="clean_description",
|
|
how="left"
|
|
)
|
|
mds = mds.rename(columns={"fuel_type": "Existing Fuel Type"}).drop(columns=["clean_description", "mainfuel"])
|
|
|
|
mds["Existing Fuel Type"].value_counts()
|
|
|
|
mds_output_by_scenario = {}
|
|
for scenario_id in scenario_ids:
|
|
scenario_recommendations = recommendations_df[recommendations_df["Scenario ID"] == scenario_id]
|
|
|
|
# For each measure, we create the measure matrix
|
|
scenario_measure_matrix = self.make_mds_measure_matrix(scenario_recommendations)
|
|
|
|
# Calculate the predicted impact on: SAP, heat demand, bills, kwh
|
|
recommendation_impacts = scenario_recommendations.groupby("property_id")[
|
|
["sap_points", "heat_demand", "kwh_savings", "energy_cost_savings"]
|
|
].sum().reset_index()
|
|
|
|
scenario_mds = mds.merge(
|
|
scenario_measure_matrix, how="left", on="property_id"
|
|
).merge(
|
|
recommendation_impacts, how="left", on="property_id"
|
|
)
|
|
# If we have no recommendations, sap_points, kwh_savings, head_demand will be NaN
|
|
to_clean = [c for c in recommendation_impacts.columns if c != "property_id"]
|
|
for col in to_clean:
|
|
scenario_mds[col].fillna(0, inplace=True)
|
|
scenario_mds.fillna(0, inplace=True)
|
|
scenario_mds["Post SAP"] = scenario_mds["EPC Source"] + scenario_mds["sap_points"]
|
|
# Round Post SAP down to the nearest integer
|
|
scenario_mds["Post SAP"] = scenario_mds["Post SAP"].apply(lambda x: int(x))
|
|
scenario_mds["Post EPC"] = scenario_mds["Post SAP"].apply(lambda x: sap_to_epc(x))
|
|
scenario_mds["Heating Demand Kwh/m2/y"] = (
|
|
scenario_mds["Existing Heating Demand Kwh/m2/y"] - scenario_mds["heat_demand"]
|
|
)
|
|
|
|
scenario_mds = scenario_mds.rename(
|
|
columns={
|
|
"sap_points": "Predicted SAP Points",
|
|
"kwh_savings": "Energy Saving (Kwh)",
|
|
"energy_cost_savings": "Bill Reduction (£ per yr)"
|
|
}
|
|
)
|
|
|
|
mds_output_by_scenario[scenario_id] = scenario_mds
|
|
|
|
# We now save them to s3 as excels
|
|
for scenario_id, scenario_mds in mds_output_by_scenario.items():
|
|
save_excel_to_s3(
|
|
df=scenario_mds,
|
|
file_key=f"engine_outputs/{self.format}/{self.today}_scenario_id={scenario_id}.xlsx",
|
|
bucket_name="retrofit-data-dev"
|
|
)
|
|
|
|
def export(self):
|
|
"""
|
|
This function will export the data in the required format
|
|
"""
|
|
if self.format == "mds":
|
|
self.export_mds()
|
|
|
|
raise NotImplementedError("Export format not implemented")
|