Merge pull request #345 from Hestia-Homes/mds-consolidation

Mds consolidation
This commit is contained in:
KhalimCK 2024-10-07 11:17:22 +01:00 committed by GitHub
commit 88e4630c25
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
34 changed files with 912 additions and 544 deletions

343
backend/Outputs.py Normal file
View file

@ -0,0 +1,343 @@
import msgpack
import pandas as pd
import numpy as np
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from utils.s3 import read_from_s3, save_excel_to_s3
from backend.app.utils import sap_to_epc
from backend.app.db.connection import db_engine
from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel
from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations
class Outputs:
FORMATS = ["mds"]
MDS_MEASURE_MAPPING = {
"external_wall_insulation": "EWI (Trad Const)",
"cavity_wall_insulation": "CWI",
"loft_insulation": "LI",
"party_wall_insulation": "Party Wall Insu",
"internal_wall_insulation": "IWI (POA - Prov Sum Only)",
"suspended_floor_insulation": "U/F Insu (Manual install)",
"solid_floor_insulation": "Solid floor insl (Out of scope - Prov sum only)",
"air_source_heat_pump": "ASHP Htg",
"ground_source_heat_pump": "GSHP Htg",
"shared_ground_loops": "Shared ground loops",
"communal_heat_networks": "Communal heat networks",
"district_heating_networks": "District heating networks",
"high_heat_retention_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)",
"low_energy_lighting": "Low Energy Bulbs",
"cylinder_insulation": "Cyl Insulation",
"smart_controls": "Smart controls",
"zone_controls": "Zone controls",
"trvs": "Upgrade TRV's",
"solar_pv": "Solar PV",
"solar_thermal": "Solar Thermal",
"double_glazing": "Double Glazing (POA - Prov sum only)",
"draught_proofing": "Draught Proofing",
"mechanical_ventilation": "Ventilation upgrade",
"gas_boiler": "Gas Boiler Replacement",
"flat_roof_insulation": "Flat roof (Out of scope - prov sum only)",
"room_in_roof_insulation": "RIR (POA - Prov sum only)",
"ev_charging": "EV Charging",
"battery": "Battery"
}
def __init__(self, format, portfolio_id):
"""
This class handles the creation of standard outputs for the backend. For example, creation of
an excel output, to be used for the MDS data sheet, required by E.ON
:param format: The format of the output, e.g. mds
:param portfolio_id: The id of the portfolio for which the output is being created
"""
if format not in self.FORMATS:
raise ValueError("Invalid format, should be one of {}".format(self.FORMATS))
self.format = format
self.portfolio_id = portfolio_id
self.today = datetime.now().strftime("%Y-%m-%d")
# Connect to the database
self.session = sessionmaker(bind=db_engine)()
# Download cleaned data
self.cleaned_epc_lookup = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
self.cleaned_epc_lookup = msgpack.unpackb(self.cleaned_epc_lookup, raw=False)
def get_properties_from_db(self):
# Get properties and their details for a specific portfolio
properties_query = self.session.query(
PropertyModel,
PropertyDetailsEpcModel
).join(
PropertyDetailsEpcModel,
PropertyModel.id == PropertyDetailsEpcModel.property_id
).filter(
PropertyModel.portfolio_id == self.portfolio_id # Filter by portfolio ID
).all()
# Transform properties data to include all fields dynamically
properties_data = [
{**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns},
**{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in
PropertyDetailsEpcModel.__table__.columns}}
for prop in properties_query
]
return properties_data
def get_plans_from_db(self):
plans_query = self.session.query(Plan).filter(Plan.portfolio_id == self.portfolio_id).all()
# Transform plans data to include all fields dynamically
plans_data = [
{col.name: getattr(plan, col.name) for col in Plan.__table__.columns}
for plan in plans_query
]
return plans_data
def get_recommendations_from_db(self, plan_ids):
# Get recommendations through PlanRecommendations for those plans and that are default
recommendations_query = self.session.query(
Recommendation,
Plan.scenario_id
).join(
PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id
).join(
Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id
).filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default == True # Filtering for default recommendations
).all()
# Transform recommendations data to include all fields dynamically and include scenario_id
recommendations_data = [
{
**{
col.name: getattr(rec.Recommendation, col.name) if
hasattr(rec, 'Recommendation') else getattr(rec, col.name)
for col in Recommendation.__table__.columns
},
"Scenario ID": rec.scenario_id
} for rec in recommendations_query
]
return recommendations_data
def make_mds_measure_matrix(self, scenario_recommendations):
all_measures = list(self.MDS_MEASURE_MAPPING.values())
# Collect rows in a list
rows = []
# Populate the rows list
for idx, row in scenario_recommendations.iterrows():
property_id = row["property_id"]
measure_type = row["measure_type"]
# Get the label for the current type
measure_label = self.MDS_MEASURE_MAPPING.get(measure_type, None)
# If the property_id already exists in the collected rows, update it
existing_row = next((item for item in rows if item["property_id"] == property_id), None)
if existing_row is None:
# Create a new row if the property_id doesn't exist
new_row = {measure: None for measure in all_measures}
new_row["property_id"] = property_id
rows.append(new_row)
else:
new_row = existing_row
# Set the corresponding measure label in the row
new_row[measure_label] = measure_label
# Convert the list of dictionaries to a DataFrame
matrix = pd.DataFrame(rows)
# Reset the index for cleanliness
matrix.reset_index(drop=True, inplace=True)
return matrix
def export_mds(self):
"""
This function will export the data in the MDS format
Core data required:
- Property address
- Property postcode
- uprn
- recommended measures
- pre-EPC
- pre-SAP
- pre Heat Demand
- Property Type
- Built form
- Wall type
- Tenure
- Fuel type
- Estimated bill
- Recommended measures
- Post EPC
- Post heat demand
- Bill savings
- Kwh savings
"""
self.session.begin()
properties_data = self.get_properties_from_db()
plans_data = self.get_plans_from_db()
plan_ids = [plan['id'] for plan in plans_data]
recommendations_data = self.get_recommendations_from_db(plan_ids)
self.session.close()
# Convert these tables to dataframes
properties_df = pd.DataFrame(properties_data)
plans_df = pd.DataFrame(plans_data)
recommendations_df = pd.DataFrame(recommendations_data)
scenario_ids = plans_df["scenario_id"].unique()
# We start to create the MDS sheet
mds = properties_df[
[
"property_id",
"address",
"postcode",
"uprn",
"current_epc_rating",
"current_sap_points",
"primary_energy_consumption",
"property_type",
"built_form",
"total_floor_area",
"walls",
"tenure",
"mainfuel",
# The bills columns are split out - we include them and aggregate, without appliances
"heating_cost_current",
"hot_water_cost_current",
"lighting_cost_current",
"gas_standing_charge",
"electricity_standing_charge"
]
].copy().rename(
columns={
"address": "Address",
"postcode": "Postcode",
"uprn": "UPRN",
"current_epc_rating": "Pre EPC",
"current_sap_points": "EPC Source",
"primary_energy_consumption": "Existing Heating Demand Kwh/m2/y",
"property_type": "Property Type",
"built_form": "Built Form",
"total_floor_area": "Floor area m2 (If known)",
"walls": "Wall Type (Mandatory field)",
"tenure": "Tenure",
}
)
mds["Estimated bill (£ per year)"] = (
mds["heating_cost_current"] +
mds["hot_water_cost_current"] +
mds["lighting_cost_current"] +
mds["gas_standing_charge"] +
mds["electricity_standing_charge"]
)
mds = mds.drop(
columns=[
"heating_cost_current",
"hot_water_cost_current",
"lighting_cost_current",
"gas_standing_charge",
"electricity_standing_charge"
]
)
# Formatting - Pre EPC is an enum
mds["Pre EPC"] = [x.value for x in mds["Pre EPC"].values]
mds["Wall Type (Mandatory field)"] = mds["Wall Type (Mandatory field)"].str.split(",").str[0]
# Remove average thermal transmittance field
mds["Wall Type (Mandatory field)"] = np.where(
mds["Wall Type (Mandatory field)"].str.contains("Average thermal transmittance"),
"",
mds["Wall Type (Mandatory field)"]
)
mds = mds.merge(
pd.DataFrame(self.cleaned_epc_lookup["main-fuel"])[["clean_description", "fuel_type"]],
left_on="mainfuel",
right_on="clean_description",
how="left"
)
mds = mds.rename(columns={"fuel_type": "Existing Fuel Type"}).drop(columns=["clean_description", "mainfuel"])
mds["Existing Fuel Type"].value_counts()
mds_output_by_scenario = {}
for scenario_id in scenario_ids:
scenario_recommendations = recommendations_df[recommendations_df["Scenario ID"] == scenario_id]
# For each measure, we create the measure matrix
scenario_measure_matrix = self.make_mds_measure_matrix(scenario_recommendations)
# Calculate the predicted impact on: SAP, heat demand, bills, kwh
recommendation_impacts = scenario_recommendations.groupby("property_id")[
["sap_points", "heat_demand", "kwh_savings", "energy_cost_savings"]
].sum().reset_index()
scenario_mds = mds.merge(
scenario_measure_matrix, how="left", on="property_id"
).merge(
recommendation_impacts, how="left", on="property_id"
)
# If we have no recommendations, sap_points, kwh_savings, head_demand will be NaN
to_clean = [c for c in recommendation_impacts.columns if c != "property_id"]
for col in to_clean:
scenario_mds[col].fillna(0, inplace=True)
scenario_mds.fillna(0, inplace=True)
scenario_mds["Post SAP"] = scenario_mds["EPC Source"] + scenario_mds["sap_points"]
# Round Post SAP down to the nearest integer
scenario_mds["Post SAP"] = scenario_mds["Post SAP"].apply(lambda x: int(x))
scenario_mds["Post EPC"] = scenario_mds["Post SAP"].apply(lambda x: sap_to_epc(x))
scenario_mds["Heating Demand Kwh/m2/y"] = (
scenario_mds["Existing Heating Demand Kwh/m2/y"] - scenario_mds["heat_demand"]
)
scenario_mds = scenario_mds.rename(
columns={
"sap_points": "Predicted SAP Points",
"kwh_savings": "Energy Saving (Kwh)",
"energy_cost_savings": "Bill Reduction (£ per yr)"
}
)
mds_output_by_scenario[scenario_id] = scenario_mds
# We now save them to s3 as excels
for scenario_id, scenario_mds in mds_output_by_scenario.items():
save_excel_to_s3(
df=scenario_mds,
file_key=f"engine_outputs/{self.format}/{self.today}_scenario_id={scenario_id}.xlsx",
bucket_name="retrofit-data-dev"
)
def export(self):
"""
This function will export the data in the required format
"""
if self.format == "mds":
self.export_mds()
raise NotImplementedError("Export format not implemented")

View file

@ -18,6 +18,7 @@ from recommendations.recommendation_utils import (
get_wall_type,
estimate_external_wall_area,
estimate_windows,
estimate_pitched_roof_area
)
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.app.utils import sap_to_epc
@ -111,6 +112,8 @@ class Property:
self.measures = ast.literal_eval(measures) if measures else None
self.uprn = epc_record.get("uprn")
self.uprn_source = self.data.get("uprn-source")
self.full_sap_epc = epc_record.get("full_sap_epc")
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
@ -618,20 +621,24 @@ class Property:
self.set_windows_count()
self.set_energy_source()
self.find_energy_sources()
self.set_current_energy_bill(kwh_client, kwh_predictions)
self.set_current_energy(kwh_client, kwh_predictions)
def set_solar_panel_configuration(
self, solar_panel_configuration, roof_area
):
def set_solar_panel_configuration(self, solar_panel_configuration):
"""
This funtion inserts the solar panel configuration into the property object
"""
self.solar_panel_configuration = solar_panel_configuration
# We also set the roof area
self.roof_area = roof_area
if not self.roof["is_flat"]:
default_roof_area = estimate_pitched_roof_area(
floor_area=self.insulation_floor_area,
)
else:
default_roof_area = self.insulation_floor_area
def set_current_energy_bill(self, kwh_client, kwh_predictions):
self.roof_area = default_roof_area
def set_current_energy(self, kwh_client, kwh_predictions):
"""
Given what we know about the property now, estimates the current energy consumption using the UCL paper
https://www.sciencedirect.com/science/article/pii/S0378778823002542
@ -788,6 +795,9 @@ class Property:
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
if self.current_energy_bill is None:
raise ValueError("Current energy bill has not been set")
property_details_epc = {
"property_id": self.id,
"portfolio_id": portfolio_id,
@ -845,6 +855,7 @@ class Property:
"current_energy_demand": self.current_energy_consumption,
"current_energy_demand_heating_hotwater": self.current_energy_consumption_heating_hotwater,
"estimated": self.data.get("estimated", False),
**self.current_energy_bill
}
return property_details_epc

View file

@ -126,6 +126,9 @@ class SearchEpc:
combinations about the home to find the property
"""
# If we create the uprn based on a hash, we mark it as simulated
UPRN_SOURCE_SIMULATED = "SIMULATED"
MAX_RETRIES = 5
SUCCESS = {
@ -405,7 +408,11 @@ class SearchEpc:
else:
raise ValueError("Multiple UPRNs found - investigate me")
uprn = uprns.pop() if uprns else None
if uprns:
uprn = uprns.pop()
else:
newest_epc["uprn-source"] = self.UPRN_SOURCE_SIMULATED
uprn = hash(self.address1 + self.postcode)
if self.fast:
return newest_epc, [], {}, "", "", None

View file

@ -1,16 +1,22 @@
import time
import requests
import pandas as pd
import numpy as np
from recommendations.Costs import MCS_SOLAR_PV_COST_DATA
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
import requests
from typing import List
from functools import lru_cache
import time
from backend.app.db.functions.solar_functions import get_solar_data, store_batch_data
from utils.logger import setup_logger
from sklearn.preprocessing import MinMaxScaler
from recommendations.Costs import Costs
from tqdm import tqdm
from math import sin, cos, sqrt, atan2, radians
from utils.logger import setup_logger
from recommendations.Costs import Costs, MCS_SOLAR_PV_COST_DATA
from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.Property import Property
from backend.app.db.functions.solar_functions import get_solar_data, store_batch_data
import backend.app.assumptions as assumptions
from backend.app.plan.schemas import PlanTriggerRequest
logger = setup_logger()
@ -589,3 +595,269 @@ class GoogleSolarApi:
# we need to do is perform the solar analysis and then half the results. We set an indicator which
# implies we should do this
self.double_property = True
@staticmethod
def prepare_input_data(
input_properties: List[Property],
energy_consumption_client: EnergyConsumptionModel,
body: PlanTriggerRequest
):
"""
:param input_properties: List of properties
:param energy_consumption_client: EnergyConsumptionModel instance
:param body: PlanTriggerRequest instance
This sets up the data required to make the solar api request
:return:
"""
building_solar_config = [
{
"building_id": p.building_id,
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
)
),
"property_id": p.id,
"uprn": p.uprn
} for p in input_properties if p.building_id is not None
]
unit_solar_config = [
{
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
),
),
"property_id": p.id,
"uprn": p.uprn
} for p in input_properties if p.building_id is None
]
return building_solar_config, unit_solar_config
@classmethod
def building_solar_analysis(
cls, building_solar_config: List, input_properties: List[Property], session, google_solar_api_key: str
):
"""
Perform the solar analysis for the building level
:param building_solar_config: List of building solar configurations
:param input_properties: List of properties
:param session: Database session
:param google_solar_api_key: Google Solar API key
:return:
"""
if not building_solar_config:
return input_properties
# Find the unique longitude and latitude pairs for each building id
unique_coordinates = {}
building_uprns = {}
for entry in building_solar_config:
building_id = entry['building_id']
coordinate_pair = {'longitude': entry['longitude'], 'latitude': entry['latitude']}
if building_id not in unique_coordinates:
unique_coordinates[building_id] = []
if coordinate_pair not in unique_coordinates[building_id]:
unique_coordinates[building_id].append(coordinate_pair)
if building_id not in building_uprns:
building_uprns[building_id] = []
if entry['uprn'] not in building_uprns[building_id]:
building_uprns[building_id].append(
{
"uprn": entry['uprn'], "longitude": entry['longitude'], "latitude": entry['latitude']
}
)
solar_panel_configuration = {}
for building_id, coordinates in unique_coordinates.items():
if len(coordinates) > 1:
raise NotImplementedError("more than one coordinate for a building - handle me")
coordinates = coordinates[0]
energy_consumption = sum(
[entry['energy_consumption'] for entry in building_solar_config if entry['building_id'] == building_id]
)
solar_api_client = cls(api_key=google_solar_api_key)
solar_api_client.get(
longitude=coordinates["longitude"],
latitude=coordinates["latitude"],
energy_consumption=energy_consumption,
is_building=True,
session=session
)
solar_panel_configuration[building_id] = {
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"n_units": len([entry for entry in building_solar_config if entry['building_id'] == building_id])
}
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it
# exists
solar_api_client.save_to_db(
session=session, uprns_to_location=building_uprns[building_id], scenario_type="building"
)
# Insert this into the properties that have this building id
for p in input_properties:
if p.building_id == building_id:
unit_solar_panel_configuration = solar_panel_configuration[building_id].copy()
unit_solar_panel_configuration["unit_share_of_energy"] = (
[x for x in building_solar_config if x["property_id"] == p.id][0]["energy_consumption"] /
energy_consumption
)
p.set_solar_panel_configuration(unit_solar_panel_configuration)
return input_properties
@classmethod
def unit_solar_analysis(
cls, unit_solar_config: List, input_properties: List[Property], session, body, google_solar_api_key: str
):
if not unit_solar_config:
return input_properties
# Model the solar potential at the property level
for unit in tqdm(unit_solar_config):
# We don't need to do this if we have global inclusions that don't include solar
if body.inclusions:
if "solar_pv" not in body.inclusions:
continue
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
# Or if we have a solar non-invasive recommendation
if (
(not property_instance.is_solar_pv_valid()) or
[r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"]
):
continue
if unit["longitude"] is None or unit["latitude"] is None:
# At this point, we've checked that solar PV is valid, and so we provide some defaults
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": None,
"panel_performance": cls.default_panel_performance(property_instance=property_instance),
"unit_share_of_energy": 1
},
)
continue
solar_api_client = cls(api_key=google_solar_api_key)
solar_api_client.get(
longitude=unit["longitude"],
latitude=unit["latitude"],
energy_consumption=unit["energy_consumption"],
is_building=False,
session=session,
uprn=unit["uprn"],
property_instance=property_instance
)
# Store the data in the database
solar_api_client.save_to_db(
session=session,
uprns_to_location=[
{
"uprn": property_instance.uprn,
"longitude": property_instance.spatial["longitude"],
"latitude": property_instance.spatial["latitude"]
}
],
scenario_type="unit"
)
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"unit_share_of_energy": 1
},
roof_area=solar_api_client.roof_area
)
return input_properties
@classmethod
def default_panel_performance(cls, property_instance):
"""
In a small number of cases, where properties have simulated uprns, we do not have a longitude and latitude
value and therefore we just return a default panel performance
:param property_instance:
:return:
"""
cost_instance = Costs(property_instance=property_instance)
# We return a 2.4 and 4 kwp system
panel_performance = pd.DataFrame(
[
{
'n_panels': 10,
'yearly_dc_energy': 4000 * 0.99, # Assumed 99% efficient wattage -> dc
'total_cost': cost_instance.solar_pv(
n_panels=10, has_battery=False, n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
'panneled_roof_area': 10 * 1.8,
'array_wattage': 4000,
'initial_ac_kwh_per_year': 4000 * 0.95, # Assumed 95% efficient wattage -> ac
'lifetime_ac_kwh': None,
'lifetime_dc_kwh': None,
'roi': None,
'generation_value': None,
'generation_deficit': None,
'expected_payback_years': None,
'surplus': None,
'combined_score': None,
'rank': None
},
{
'n_panels': 6,
'yearly_dc_energy': 2400 * 0.99, # Assumed 99% efficient wattage -> dc
'total_cost': cost_instance.solar_pv(
n_panels=6, has_battery=False, n_floors=property_instance.number_of_floors
)["total"],
'weighted_ratio': None,
'panneled_roof_area': 6 * 1.8,
'array_wattage': 2400,
'initial_ac_kwh_per_year': 2400 * 0.95, # Assumed 95% efficient wattage -> ac
'lifetime_ac_kwh': None,
'lifetime_dc_kwh': None,
'roi': None,
'generation_value': None,
'generation_deficit': None,
'expected_payback_years': None,
'surplus': None,
'combined_score': None,
'rank': None
},
]
)
return panel_performance

View file

@ -1,7 +1,7 @@
# Assumes that the average efficiency of an air source heat pump is 250%, taking the median of the 200-400% range,
# which is often quoted as a sensible efficiency range for air source heat pumps.
PESSIMISTIC_ASHP_EFFICIENCY = 200
AVERAGE_ASHP_EFFICIENCY = 300
AVERAGE_ASHP_EFFICIENCY = 250
# Conservative estimate of the proportion of electricity that will be consumed, whereas the rest will
# be exported
@ -11,34 +11,36 @@ DESCRIPTIONS_TO_FUEL_TYPES = {
"Air source heat pump, radiators, electric": {
"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100
},
"Boiler and radiators, mains gas": {"fuel": 'Natural Gas', "cop": 0.9},
"Boiler and radiators, mains gas": {"fuel": 'Natural Gas', "cop": 0.85},
'Electric storage heaters': {"fuel": 'Electricity', "cop": 1},
"Electric immersion, off-peak": {"fuel": 'Electricity', "cop": 1},
"Electric storage heaters, radiators": {"fuel": 'Electricity', "cop": 1},
"Room heaters, electric": {"fuel": 'Electricity', "cop": 1},
"Electric immersion, standard tariff": {"fuel": 'Electricity', "cop": 1},
"Portable electric heaters assumed for most rooms": {"fuel": 'Electricity', "cop": 1},
"Boiler and radiators, LPG": {"fuel": 'LPG', "cop": 0.9},
"Boiler and radiators, LPG": {"fuel": 'LPG', "cop": 0.85},
"Room heaters, dual fuel (mineral and wood)": {"fuel": 'Wood Logs', "cop": 1},
"Room heaters, mains gas": {"fuel": 'Natural Gas', "cop": 0.9},
"Warm air, mains gas": {"fuel": 'Natural Gas', "cop": 0.9},
"Boiler, mains gas": {"fuel": 'Natural Gas', "cop": 0.9},
"Gas multipoint": {"fuel": "Natural Gas", "cop": 0.9},
"Room heaters, mains gas": {"fuel": 'Natural Gas', "cop": 0.85},
"Warm air, mains gas": {"fuel": 'Natural Gas', "cop": 0.85},
"Boiler, mains gas": {"fuel": 'Natural Gas', "cop": 0.85},
"Gas multipoint": {"fuel": "Natural Gas", "cop": 0.85},
"Warm air, Electricaire": {"fuel": "Electricity", "cop": 1},
"Gas boiler/circulator": {"fuel": "Natural Gas", "cop": 0.9},
"Boiler and underfloor heating, mains gas": {"fuel": "Natural Gas", "cop": 0.9},
"Gas boiler/circulator": {"fuel": "Natural Gas", "cop": 0.85},
"Boiler and underfloor heating, mains gas": {"fuel": "Natural Gas", "cop": 0.85},
"No system present: electric heaters assumed": {"fuel": "Electricity", "cop": 1},
"Electric instantaneous at point of use": {"fuel": "Electricity", "cop": 1},
"Boiler and radiators, oil": {"fuel": "Oil", "cop": 0.9},
"Boiler and radiators, oil": {"fuel": "Oil", "cop": 0.85},
"Electric storage heaters, Electric storage heaters": {"fuel": "Electricity", "cop": 1},
"Boiler and radiators, electric": {"fuel": "Electricity", "cop": 0.9},
"Gas boiler/circulator, no cylinder thermostat": {"fuel": "Natural Gas", "cop": 0.9},
"Boiler and radiators, dual fuel (mineral and wood)": {"fuel": "Wood Logs", "cop": 0.9},
"Boiler and radiators, electric": {"fuel": "Electricity", "cop": 0.85},
"Gas boiler/circulator, no cylinder thermostat": {"fuel": "Natural Gas", "cop": 0.85},
"Boiler and radiators, dual fuel (mineral and wood)": {"fuel": "Wood Logs", "cop": 0.85},
"Electric immersion, standard tariff, plus solar": {"fuel": "Electricity + Solar Thermal", "cop": 1},
"From main system, flue gas heat recovery": {"fuel": "Natural Gas", "cop": 0.9},
"From main system, flue gas heat recovery": {"fuel": "Natural Gas", "cop": 0.85},
"Electric underfloor heating": {"fuel": "Electricity", "cop": 1},
"No system present: electric immersion assumed": {"fuel": "Electricity", "cop": 1},
"Air source heat pump, underfloor, electric": {
"fuel": "Electricity", "cop": AVERAGE_ASHP_EFFICIENCY / 100
},
"Gas instantaneous at point of use": {"fuel": "Natural Gas", "cop": 0.85},
"Room heaters, wood logs": {"fuel": "Wood Logs", "cop": 1},
}

View file

@ -108,6 +108,7 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
{
"property_id": property_id,
"type": rec["type"],
"measure_type": rec["measure_type"],
"description": rec["description"],
"estimated_cost": rec["total"],
"default": rec["default"],
@ -121,6 +122,7 @@ def upload_recommendations(session: Session, recommendations_to_upload, property
"energy_cost_savings": rec["energy_cost_savings"],
"labour_days": rec["labour_days"],
"already_installed": rec["already_installed"],
"heat_demand": rec["heat_demand"]
}
for rec in recommendations_to_upload
]

View file

@ -173,6 +173,13 @@ class PropertyDetailsEpcModel(Base):
current_energy_demand = Column(Float)
current_energy_demand_heating_hotwater = Column(Float)
estimated = Column(Boolean, default=False)
# Include estimates for energy bills, across the different types of energy
heating_cost_current = Column(Float)
hot_water_cost_current = Column(Float)
lighting_cost_current = Column(Float)
appliances_cost_current = Column(Float)
gas_standing_charge = Column(Float)
electricity_standing_charge = Column(Float)
class PropertyDetailsSpatial(Base):

View file

@ -15,6 +15,7 @@ class Recommendation(Base):
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
type = Column(String, nullable=False)
measure_type = Column(String)
description = Column(String, nullable=False)
estimated_cost = Column(Float)
default = Column(Boolean, nullable=False)

View file

@ -10,7 +10,6 @@ from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
import backend.app.assumptions as assumptions
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
@ -127,8 +126,8 @@ def extract_portfolio_aggregation_data(
pre_retrofit_co2 = p.data["co2-emissions-current"]
post_retrofit_co2 = pre_retrofit_co2 - carbon_savings
pre_retrofit_energy_bill = p.current_energy_bill
post_retrofit_energy_bill = p.current_energy_bill - sum(
pre_retrofit_energy_bill = sum(p.current_energy_bill.values())
post_retrofit_energy_bill = sum(p.current_energy_bill.values()) - sum(
[r["energy_cost_savings"] for r in default_recommendations]
)
@ -511,166 +510,32 @@ async def trigger_plan(body: PlanTriggerRequest):
input_properties = OpenUprnClient.set_spatial_data(input_properties, bucket_name=get_settings().DATA_BUCKET)
[p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=kwh_preds) for p in input_properties]
logger.info("Performing solar analysis")
# TODO: Tidy this up
# TODO: If a property is semi-detached, we might get roof surfaces for the main building + the neighbour
# TODO: If we can't get high image quality, should we use the solar API? Maybe just for semi-detached units with
# extensions, since it doesn't seem to do a great job
# TODO: For simple properties, we should do a comparison/check between the solar API's roof area and the
# basic estimate of roof area
building_ids = [
{
"building_id": p.building_id,
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
)
),
"property_id": p.id,
"uprn": p.uprn
} for p in input_properties if p.building_id is not None
]
individual_units = [
{
"longitude": p.spatial["longitude"],
"latitude": p.spatial["latitude"],
# Energy consumption is adjusted for the property's expected post retrofit state
# We set the target rating to EPC C, which is the typical EPC rating we would expect the
# property to achieve post retrofit of just the fabric
"energy_consumption": energy_consumption_client.estimate_new_consumption(
current_energy_efficiency=p.data["current-energy-efficiency"],
target_efficiency="69",
current_consumption=p.estimate_electrical_consumption(
assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions
),
),
"property_id": p.id,
"uprn": p.uprn
} for p in input_properties if p.building_id is None
]
if building_ids:
# Find the unique longitude and latitude pairs for each building id
unique_coordinates = {}
building_uprns = {}
for entry in building_ids:
building_id = entry['building_id']
coordinate_pair = {'longitude': entry['longitude'], 'latitude': entry['latitude']}
logger.info("Performing solar analysis")
building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data(
input_properties=input_properties,
energy_consumption_client=energy_consumption_client,
body=body
)
if building_id not in unique_coordinates:
unique_coordinates[building_id] = []
input_properties = GoogleSolarApi.building_solar_analysis(
building_solar_config=building_solar_config,
input_properties=input_properties,
session=session,
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY
)
if coordinate_pair not in unique_coordinates[building_id]:
unique_coordinates[building_id].append(coordinate_pair)
if building_id not in building_uprns:
building_uprns[building_id] = []
if entry['uprn'] not in building_uprns[building_id]:
building_uprns[building_id].append(
{
"uprn": entry['uprn'], "longitude": entry['longitude'], "latitude": entry['latitude']
}
)
solar_panel_configuration = {}
for building_id, coordinates in unique_coordinates.items():
if len(coordinates) > 1:
raise NotImplementedError("more than one coordinate for a building - handle me")
coordinates = coordinates[0]
energy_consumption = sum(
[entry['energy_consumption'] for entry in building_ids if entry['building_id'] == building_id]
)
solar_api_client = GoogleSolarApi(api_key=get_settings().GOOGLE_SOLAR_API_KEY)
solar_api_client.get(
longitude=coordinates["longitude"],
latitude=coordinates["latitude"],
energy_consumption=energy_consumption,
is_building=True,
session=session
)
solar_panel_configuration[building_id] = {
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"n_units": len([entry for entry in building_ids if entry['building_id'] == building_id])
}
# Store the data in the database
# TODO: Rather than just doing a straight insert, we should overwrite what's already there if it
# exists
solar_api_client.save_to_db(
session=session, uprns_to_location=building_uprns[building_id], scenario_type="building"
)
# Insert this into the properties that have this building id
for p in input_properties:
if p.building_id == building_id:
unit_solar_panel_configuration = solar_panel_configuration[building_id].copy()
unit_solar_panel_configuration["unit_share_of_energy"] = (
[x for x in building_ids if x["property_id"] == p.id][0]["energy_consumption"] /
energy_consumption
)
p.set_solar_panel_configuration(unit_solar_panel_configuration)
if individual_units:
# Model the solar potential at the property level
for unit in tqdm(individual_units):
# TODO: Tidy up this code
# We don't need to do this if we have global inclusions that don't include solar
if body.inclusions:
if "solar_pv" not in body.inclusions:
continue
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
if not property_instance.is_solar_pv_valid():
continue
# We check if we have a solar non-invasive recommendation
if [r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"]:
continue
solar_api_client = GoogleSolarApi(api_key=get_settings().GOOGLE_SOLAR_API_KEY)
solar_api_client.get(
longitude=unit["longitude"],
latitude=unit["latitude"],
energy_consumption=unit["energy_consumption"],
is_building=False,
session=session,
uprn=unit["uprn"],
property_instance=property_instance
)
# Store the data in the database
solar_api_client.save_to_db(
session=session,
uprns_to_location=[
{
"uprn": property_instance.uprn,
"longitude": property_instance.spatial["longitude"],
"latitude": property_instance.spatial["latitude"]
}
],
scenario_type="unit"
)
property_instance.set_solar_panel_configuration(
solar_panel_configuration={
"insights_data": solar_api_client.insights_data,
"panel_performance": solar_api_client.panel_performance,
"unit_share_of_energy": 1
},
roof_area=solar_api_client.roof_area
)
input_properties = GoogleSolarApi.unit_solar_analysis(
unit_solar_config=unit_solar_config,
input_properties=input_properties,
session=session,
body=body,
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY
)
logger.info("Identifying property recommendations")
recommendations = {}
@ -729,171 +594,6 @@ async def trigger_plan(body: PlanTriggerRequest):
scoring_epcs.extend(property_instance.updated_simulation_epcs)
recommendations[property_id] = recommendations_with_impact
# For Debugging
# recommendation_impact_df = []
# for property_id in recommendations.keys():
# for recs_by_type in recommendations[property_id]:
# for rec in recs_by_type:
# recommendation_impact_df.append(
# {
# "property_id": property_id,
# "uprn": [p.uprn for p in input_properties if p.id == property_id][0],
# "address": [p.address for p in input_properties if p.id == property_id][0],
# "recommendation_id": rec["recommendation_id"],
# "type": rec["type"],
# "description": rec["description"],
# "sap_points": rec["sap_points"],
# "co2_equivalent_savings": rec["co2_equivalent_savings"],
# "heat_demand": rec["heat_demand"]
# }
# )
# recommendation_impact_df = pd.DataFrame(recommendation_impact_df)
#
# surveyed_uprns = [
# 10024087855, 121016117, 121016124,
# 10024087902, 121016121, 121016128
# ]
# recommendation_impact_df = recommendation_impact_df[recommendation_impact_df["uprn"].isin(surveyed_uprns)]
# # recommendation_impact_df = recommendation_impact_df[recommendation_impact_df["type"].isin(
# # ["windows_glazing", "internal_wall_insulation"])
# # ]
#
# actual_impacts_df = pd.DataFrame(
# [
# # 10024087855
# {"uprn": 10024087855, "type": "internal_wall_insulation", "actual_sap_points": 5},
# {"uprn": 10024087855, "type": "draught_proofing", "actual_sap_points": 2},
# {"uprn": 10024087855, "type": "low_energy_lighting", "actual_sap_points": 0},
# {"uprn": 10024087855, "type": "windows_glazing", "actual_sap_points": 4},
# # 121016117
# {"uprn": 121016117, "type": "internal_wall_insulation", "actual_sap_points": 6},
# {"uprn": 121016117, "type": "draught_proofing", "actual_sap_points": 1},
# {"uprn": 121016117, "type": "low_energy_lighting", "actual_sap_points": 1},
# {"uprn": 121016117, "type": "windows_glazing", "actual_sap_points": 4},
# # 121016124
# {"uprn": 121016124, "type": "internal_wall_insulation", "actual_sap_points": 8},
# {"uprn": 121016124, "type": "low_energy_lighting", "actual_sap_points": 2},
# {"uprn": 121016124, "type": "windows_glazing", "actual_sap_points": 5},
# # 10024087902
# {"uprn": 10024087902, "type": "room_roof_insulation", "actual_sap_points": 16},
# {"uprn": 10024087902, "type": "internal_wall_insulation", "actual_sap_points": 2},
# {"uprn": 10024087902, "type": "low_energy_lighting", "actual_sap_points": 0},
# # 121016121
# {"uprn": 121016121, "type": "internal_wall_insulation", "actual_sap_points": 5},
# {"uprn": 121016121, "type": "suspended_floor_insulation", "actual_sap_points": 2},
# {"uprn": 121016121, "type": "draught_proofing", "actual_sap_points": 1},
# {"uprn": 121016121, "type": "windows_glazing", "actual_sap_points": 3},
# # 121016128
# {"uprn": 121016128, "type": "internal_wall_insulation", "actual_sap_points": 6},
# {"uprn": 121016128, "type": "suspended_floor_insulation", "actual_sap_points": 1},
# {"uprn": 121016128, "type": "draught_proofing", "actual_sap_points": 1},
# {"uprn": 121016128, "type": "low_energy_lighting", "actual_sap_points": 1},
# {"uprn": 121016128, "type": "windows_glazing", "actual_sap_points": 3},
# ]
# )
#
# comparison = recommendation_impact_df.merge(
# actual_impacts_df, how="inner", on=["uprn", "type"]
# )
#
# print(recommendation_impact_df.groupby(["uprn"])["sap_points"].sum())
# property_recs = recommendation_impact_df[recommendation_impact_df["uprn"] == 121016128]
# property = [p for p in input_properties if p.uprn == 121016128][0]
# print(property.data["current-energy-efficiency"])
# print(property_recs["sap_points"].sum())
# print(property_recs["type"])
# print(float(property.data["current-energy-efficiency"]) + property_recs["sap_points"].sum())
# recommendations[property.id][2][0]["simulation_config"]
# from utils.s3 import read_dataframe_from_s3_parquet
# training_data = read_dataframe_from_s3_parquet(
# bucket_name="retrofit-data-dev",
# file_key="sap_change_model/2024-08-06-11-19-49/dataset_rooms.parquet"
# )
# import pickle
# with open("delete_me.pkl", "wb") as f:
# pickle.dump(training_data, f)
# Read in the pickle
import pickle
with open("delete_me.pkl", "rb") as f:
training_data = pickle.load(f)
# How do we simulate windows:
ending_cols = [col for col in training_data.columns if col.endswith("_ending")]
starting = {}
for c in ending_cols:
starting_colname = c.replace("_ending", "_starting")
if starting_colname in training_data.columns:
starting[c] = starting_colname
else:
starting[c] = c.replace("_ending", "")
allowed_to_change = [
# Windows
"windows_energy_eff_ending",
"glazed_type_ending",
"glazing_type_ending",
"multi_glaze_proportion_ending",
# Other
"sap_ending",
"heat_demand_ending",
"carbon_ending",
"estimated_perimeter_ending",
"lodgement_year_ending",
"lodgement_month_ending",
"days_to_ending",
"number_habitable_rooms_ending",
"number_heated_rooms_ending",
]
fixed = [c for c in ending_cols if c not in allowed_to_change + ["uprn"]]
training_fixed = training_data.copy()
for col in fixed:
starting_col = starting[col]
training_fixed = training_fixed[training_fixed[col] == training_fixed[starting_col]]
training_fixed = training_fixed.reset_index(drop=True)
# Get the recommendation config for this uprn
uprn = 121016121
property_instance = [p for p in input_properties if p.uprn == uprn][0]
property_recs = recommendations[property_instance.id]
window_recs = [r for r in property_recs if r[0]["type"] == "windows_glazing"][0]
window_recs[0].keys()
window_recs[0]["description_simulation"]["multi-glaze-proportion"]
# TODO: - In description_simulation for windows, we update glazed-type but in the model training data there
# is a column called "glazing-type".
# - We don't update glazed-area (should be "Much More Than Typical" most likely? Or Normal??)
# TODO: I think we update eveything that we actually need to, when simulating the recommendation impact for the
# ML models
# TODO: Secondary glazing appears to go to "Good", not "Average". Investigate why
# TODO: For the two properties, force recommendations for double glazing and check impact
z = training_data[training_data["glazed_type_ending"] == "secondary glazing"]
z = z[z["multi_glaze_proportion_ending"] == 100]
z["windows_energy_eff_ending"].value_counts()
# Find the things that change
example = training_fixed.iloc[3]
for _, example in training_fixed.iterrows():
things_that_change = []
for c in ending_cols:
if example[c] != example[starting[c]]:
things_that_change.append(c)
if len(things_that_change) > 4:
print(things_that_change)
print(example["uprn"])
# blah
# 100051011370 (doesn't change in actual glazing)
# example["glazed_type_ending"]
# double glazing installed before 2002
# example["glazed_type_starting"]
# double glazing, unknown install date
# 100040925015
# We call the API with the scoring epcs
scoring_epcs = pd.DataFrame(scoring_epcs)
scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned)
@ -913,10 +613,12 @@ async def trigger_plan(body: PlanTriggerRequest):
property_recommendations = recommendations.get(property_id, [])
property_instance = [p for p in input_properties if p.id == property_id][0]
property_current_energy_bill = Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
kwh_simulation_predictions=kwh_simulation_predictions,
property_recommendations=property_recommendations
property_current_energy_bill = (
Recommendations.calculate_recommendation_tenant_savings(
property_instance=property_instance,
kwh_simulation_predictions=kwh_simulation_predictions,
property_recommendations=property_recommendations
)
)
property_instance.current_energy_bill = property_current_energy_bill

View file

@ -64,6 +64,7 @@ MEASURE_MAP = {
"floor_insulation": ["suspended_floor_insulation", "solid_floor_insulation"],
"heating": ["boiler_upgrade", "high_heat_retention_storage_heater", "air_source_heat_pump"],
"windows": ["double_glazing", "secondary_glazing"],
"heating_controls": ["roomstat_programmer_trvs", "time_temperature_zone_control"]
}

View file

@ -22,68 +22,78 @@ class AirSourceHeatPumpEfficiency:
def create_dataset(self):
logger.info("Creating solar photo supply dataset")
all_counts = []
heating_data = []
for dir in tqdm(self.file_directories):
filepath = dir / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df = df[~pd.isnull(df["UPRN"])]
df["UPRN"] = df["UPRN"].astype(int).astype(str)
# df = df[~pd.isnull(df["UPRN"])]
# df["UPRN"] = df["UPRN"].astype(int).astype(str)
# Take entries after SAP12
df["LODGEMENT_DATE"] = pd.to_datetime(df["LODGEMENT_DATE"])
df = df[df["LODGEMENT_DATE"] > EARLIEST_EPC_DATE]
df = df[
~df["TENURE"].isin(
[
"unknown",
"Not defined - use in the case of a new dwelling for which the intended tenure in not known. "
"It is not to be used for an existing dwelling"
]
)
]
# df = df[
# ~df["TENURE"].isin(
# [
# "unknown",
# "Not defined - use in the case of a new dwelling for which the intended tenure in not known. "
# "It is not to be used for an existing dwelling"
# ]
# )
# ]
# Take entries that contain an air source heat pump
df = df[
df["MAINHEAT_DESCRIPTION"].str.contains("air source heat pump", case=False, na=False)
]
(
# Air source heat pumps
(df["MAINHEAT_DESCRIPTION"] == "Air source heat pump, radiators, electric") &
(df["MAINHEATCONT_DESCRIPTION"] == "Time and temperature zone control")
) |
(
# High heat retention storage
df["MAINHEATCONT_DESCRIPTION"] == "Controls for high heat retention storage heaters"
)
]
# Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA
for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]:
df = df[~pd.isnull(df[col])]
# Get the columns we're interested in
df = df[
[
"PROPERTY_TYPE",
"BUILT_FORM",
"MAINHEAT_DESCRIPTION",
"MAINHEAT_ENERGY_EFF",
"MAINHEATCONT_DESCRIPTION",
"MAINHEATC_ENERGY_EFF",
"MAIN_FUEL",
"HOTWATER_DESCRIPTION",
"HOT_WATER_ENERGY_EFF",
"MAINS_GAS_FLAG"
]
heating_data.append(df)
# temp
# import pickle
# with open("heating_data - delete me.pkl", "wb") as f:
# pickle.dump(heating_data, f)
heating_df = pd.concat(heating_data)
# Clean construction age band
from etl.epc.DataProcessor import EPCDataProcessor
heating_df["CONSTRUCTION_AGE_BAND_CLEAN"] = heating_df["CONSTRUCTION_AGE_BAND"].apply(
lambda x: EPCDataProcessor.clean_construction_age_band(x)
)
ashp_df = heating_df[
(heating_df["MAINHEAT_DESCRIPTION"] == "Air source heat pump, radiators, electric") &
# ~heating_df["CONSTRUCTION_AGE_BAND"].str.contains("England and Wales")
(~heating_df["CONSTRUCTION_AGE_BAND"].isin(["NO DATA!", "INVALID!"])) &
(heating_df["LODGEMENT_DATE"] >= pd.to_datetime("2019-01-01"))
]
counts = df.groupby(
ashp_efficiencies = (
ashp_df.groupby(
[
"PROPERTY_TYPE",
"BUILT_FORM",
"MAINHEAT_DESCRIPTION",
"CONSTRUCTION_AGE_BAND_CLEAN",
# "WALLS_DESCRIPTION",
# "ROOF_DESCRIPTION",
"MAINHEAT_ENERGY_EFF",
"MAINHEATCONT_DESCRIPTION",
"MAINHEATC_ENERGY_EFF",
"MAIN_FUEL",
"HOTWATER_DESCRIPTION",
"HOT_WATER_ENERGY_EFF",
"MAINS_GAS_FLAG"
]
).size().reset_index(name="count")
)["LMK_KEY"].count().reset_index()
)
all_counts.append(counts)
ashp_df["MAINHEAT_ENERGY_EFF"].value_counts()
all_counts = pd.concat(all_counts)
ashp_efficiencies["CONSTRUCTION_AGE_BAND_CLEAN"].value_counts()
ashp_efficiency_agg
all_counts_agg = all_counts.groupby(
[

View file

@ -1,8 +1,10 @@
import inspect
from pathlib import Path
from backend.app.plan.utils import get_cleaned
from etl.air_source_heat_pump.AirSourceHeatPumpEfficiency import AirSourceHeatPumpEfficiency
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
file_src = inspect.getfile(lambda: None)
DATA_DIRECTORY = Path(file_src).parent / "local_data" / "all-domestic-certificates"
def app():

View file

@ -259,6 +259,9 @@ class KwhData:
# Create new features:
data['estimate_annual_kwh'] = data['energy-consumption-current'] * data['total-floor-area']
# Ensure this is string, because we could have mixed types
data["lodgement-datetime"] = data["lodgement-datetime"].astype(str)
if save:
self.model_training_data_filepath = f"energy_consumption/{self.run_date}/training_data.parquet"
logger.info(f"Storing energy consumption dataset in s3 at {self.consumption_data_filepath}")

View file

@ -2,12 +2,11 @@ import time
import pandas as pd
from utils.s3 import read_excel_from_s3
from backend.SearchEpc import SearchEpc
from dotenv import load_dotenv
import os
from tqdm import tqdm
from utils.s3 import save_csv_to_s3
from utils.s3 import save_csv_to_s3, read_excel_from_s3
# Read in the .env file in backend
load_dotenv(dotenv_path="backend/.env")
@ -172,9 +171,6 @@ def app():
# Let's just pull the full EPC data for this
asset_list_with_uprn = []
for row, property_meta in tqdm(raw_asset_list_base.iterrows(), total=raw_asset_list_base.shape[0]):
if row <= 104:
continue
time.sleep(1.1)
searcher = SearchEpc(
address1=property_meta["address"],
postcode=property_meta["postcode"],
@ -183,24 +179,26 @@ def app():
full_address=", ".join([property_meta["address"], property_meta["postcode"]])
)
# Let's just find the UPRN
searcher.ordnance_survey_client.get_places_api()
uprn = searcher.ordnance_survey_client.most_relevant_result["UPRN"]
searcher.find_property(skip_os=True)
if searcher.newest_epc["uprn-source"] == SearchEpc.UPRN_SOURCE_SIMULATED:
uprn = None
else:
uprn = searcher.uprn
# searcher.find_property(skip_os=False)
asset_list_with_uprn.append(
{
**property_meta,
"uprn": uprn,
"matched_address": searcher.address1,
"matched_postcode": searcher.postcode
}
)
# Store this as a backup
# import pandas as pd
# asset_list_with_uprn_df = pd.DataFrame(asset_list_with_uprn)
# asset_list_with_uprn_df.to_csv("eon_asset_list_with_uprn.csv", index=False)
# asset_list_with_uprn_df.to_csv("eon_asset_list_with_uprn_2.csv", index=False)
# Read in
# asset_list_with_uprn = pd.read_csv("eon_asset_list_with_uprn.csv").to_dict(orient="records")

View file

@ -5,6 +5,7 @@ import geopandas as gpd
from utils.logger import setup_logger
from utils.s3 import read_io_from_s3, save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
from backend.Property import Property
from backend.SearchEpc import SearchEpc
logger = setup_logger()
@ -151,7 +152,7 @@ class OpenUprnClient:
bucket_name=bucket_name, file_key="spatial/filename_meta.parquet"
)
uprns = [p.uprn for p in input_properties]
uprns = [p.uprn for p in input_properties if p.uprn_source != SearchEpc.UPRN_SOURCE_SIMULATED]
uprn_map = cls.make_uprn_map(uprns, uprn_filenames)
for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)):
@ -165,6 +166,9 @@ class OpenUprnClient:
if p.uprn in associated_uprn:
p.set_spatial(spatial_df[spatial_df["UPRN"] == p.uprn])
if p.uprn_source == SearchEpc.UPRN_SOURCE_SIMULATED:
p.set_spatial(cls.empty_spatial_df())
# Perform a final check to ensure that all properties have spatial data
for p in input_properties:
if p.spatial is None:
@ -172,6 +176,22 @@ class OpenUprnClient:
return input_properties
@staticmethod
def empty_spatial_df():
return pd.DataFrame(
[
{
"X_COORDINATE": None,
"Y_COORDINATE": None,
"LATITUDE": None,
"LONGITUDE": None,
"conservation_status": False,
"is_listed_building": False,
"is_heritage_building": False,
}
]
)
@classmethod
def get_spatial_data(cls, uprns: list[int], bucket_name):
"""

View file

@ -38,6 +38,7 @@ class DraughtProofingRecommendations:
"phase": None,
"parts": [],
"type": "draught_proofing",
"measure_type": "draught_proofing",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -41,6 +41,7 @@ class FireplaceRecommendations(Definitions):
"phase": phase,
"parts": [],
"type": "sealing_open_fireplace",
"measure_type": "sealing_open_fireplace",
"description": "Seal %s open fireplaces" % str(number_open_fireplaces),
"starting_u_value": None,
"new_u_value": None,

View file

@ -241,6 +241,7 @@ class FloorRecommendations(Definitions):
),
],
"type": material["type"],
"measure_type": material["type"], # This is distinct between suspended and solid floor
"description": self._make_floor_description(material),
"starting_u_value": u_value,
"new_u_value": new_u_value,

View file

@ -216,6 +216,7 @@ class HeatingControlRecommender:
self.recommendation.append(
{
"type": "heating_control",
"measure_type": "roomstat_programmer_trvs",
"parts": [],
"description": description,
**cost_result,
@ -289,6 +290,7 @@ class HeatingControlRecommender:
self.recommendation.append(
{
"type": "heating_control",
"measure_type": "time_temperature_zone_control",
"parts": [],
"description": description,
**cost_result,

View file

@ -220,6 +220,8 @@ class HeatingRecommender:
for k in ["total", "subtotal", "vat", "labour_hours", "labour_days"]:
combined_rec[k] = rec[k] + rec2[k]
combined_rec["measure_type"] = "+".join([rec["measure_type"], rec2["measure_type"]])
combined_recommendations.append(combined_rec)
self.heating_recommendations.extend(combined_recommendations)
@ -432,7 +434,7 @@ class HeatingRecommender:
ashp_costs_with_controls[key] += controls_rec[key]
if controls_rec is None:
description = "Install an air source heat pump."
description = "Install a Mitsubish air source heat pump."
elif already_installed:
description = "The property already has an air source heat pump, no further action needed."
else:
@ -455,8 +457,8 @@ class HeatingRecommender:
)
simulation_config = {
"mainheat_energy_eff_ending": "Good",
"hot_water_energy_eff_ending": "Good"
"mainheat_energy_eff_ending": "Very Good",
"hot_water_energy_eff_ending": "Very Good"
}
description_simulation = {
"mainheat-description": new_heating_description,
@ -512,10 +514,9 @@ class HeatingRecommender:
ashp_recommendation = {
"phase": phase,
"parts": [
# TODO
],
"parts": [],
"type": "heating",
"measure_type": "air_source_heat_pump",
"description": description,
"starting_u_value": None,
"new_u_value": None,
@ -556,7 +557,8 @@ class HeatingRecommender:
phase,
heating_controls_only,
system_change,
system_type
system_type,
measure_type
):
"""
Given a recommendation for heating controls, and a recommendation for the heating system, we combine the two
@ -572,7 +574,8 @@ class HeatingRecommender:
current system. If we have a system change and we have a heat control recommendation, we only recommend
both heating and controls together
:param system_type: The type of heating system we are recommending
:return:
:param measure_type: The type of measure we are recommending - more granular than the "type" field, allowing us
to distinguish between different types of heating recommendations
"""
# We produce recommendations with & without heating controls
@ -616,10 +619,9 @@ class HeatingRecommender:
recommendation = {
"phase": phase,
"parts": [
# TODO
],
"parts": [],
"type": "heating",
"measure_type": measure_type,
"description": recommendation_description,
"starting_u_value": None,
"new_u_value": None,
@ -723,7 +725,7 @@ class HeatingRecommender:
description_prefix = ""
controls_recommender.recommend(
heating_description="Electric storage heaters, radiators", description_prefix=description_prefix
heating_description="Electric storage heaters", description_prefix=description_prefix
)
has_hhr = self.is_hhr_already_installed()
@ -738,7 +740,7 @@ class HeatingRecommender:
self.property.main_heating["clean_description"]
]["hhr"]["mainheating_description"]
else:
new_heating_description = "Electric storage heaters, radiators"
new_heating_description = "Electric storage heaters"
# Set up artefacts, suitable for the simulation and regardless of controls
heating_ending_config = MainHeatAttributes(new_heating_description).process()
@ -799,7 +801,8 @@ class HeatingRecommender:
phase=phase,
heating_controls_only=heating_controls_only,
system_change=system_change,
system_type="high_heat_retention_storage_heater"
system_type="high_heat_retention_storage_heater",
measure_type="high_heat_retention_storage_heater"
)
if _return:
return recommendations
@ -977,10 +980,9 @@ class HeatingRecommender:
boiler_recommendation = {
"phase": recommendation_phase,
"parts": [
# TODO
],
"parts": [],
"type": "heating",
"measure_type": "boiler_upgrade",
"description": description,
"starting_u_value": None,
"new_u_value": None,
@ -1027,7 +1029,8 @@ class HeatingRecommender:
phase=recommendation_phase,
heating_controls_only=False,
system_change=True,
system_type="boiler_upgrade"
system_type="boiler_upgrade",
measure_type="boiler_upgrade",
)
combined_recommendations.extend(combined_recommendation)

View file

@ -58,10 +58,9 @@ class HotwaterRecommendations:
self.recommendations.append(
{
"phase": phase,
"parts": [
# TODO
],
"parts": [],
"type": "hot_water_tank_insulation",
"measure_type": "hot_water_tank_insulation",
"description": description,
"starting_u_value": None,
"new_u_value": None,
@ -107,6 +106,7 @@ class HotwaterRecommendations:
"phase": phase,
"parts": [],
"type": "cylinder_thermostat",
"measure_type": "cylinder_thermostat",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -152,6 +152,7 @@ class LightingRecommendations:
"phase": phase,
"parts": [],
"type": "low_energy_lighting",
"measure_type": "low_energy_lighting",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -267,6 +267,11 @@ class Recommendations:
property_recommendations,
)
# Check to make sure measure_type is populated
for recs in property_recommendations:
if any(pd.isnull(rec.get("measure_type")) for rec in recs):
raise ValueError("Measure type is not populated")
return property_recommendations, property_representative_recommendations
@staticmethod
@ -797,13 +802,14 @@ class Recommendations:
electricity_standing_charge = AnnualBillSavings.DAILY_STANDARD_CHARGE_ELECTRICITY * 365
current_energy_bill = (
starting_figures["heating_cost"] +
starting_figures["hotwater_cost"] +
property_instance.energy_cost_estimates["unadjusted"]["lighting"] +
property_instance.energy_cost_estimates["unadjusted"]["appliances"] +
gas_standing_charge +
electricity_standing_charge
)
# We return a dictionary that contains the individual costs, that can be stored to the database
current_energy_bill = {
"heating_cost_current": starting_figures["heating_cost"],
"hot_water_cost_current": starting_figures["hotwater_cost"],
"lighting_cost_current": property_instance.energy_cost_estimates["unadjusted"]["lighting"],
"appliances_cost_current": property_instance.energy_cost_estimates["unadjusted"]["appliances"],
"gas_standing_charge": gas_standing_charge,
"electricity_standing_charge": electricity_standing_charge,
}
return current_energy_bill

View file

@ -84,12 +84,17 @@ class RoofRecommendations:
return (self.insulation_thickness > self.MINIMUM_LOFT_ISULATION_MM) and self.property.roof["is_pitched"]
def is_room_roof_insulated(self):
def is_room_roof_insulated_or_unsuitable(self, measures):
"""
Check if the room roof is already insulated
"""
# If the roof is a room roof room roof is not included in the measures, we deem the recommendation unsuitable
unsuitable = "room_roof_insulation" not in measures and self.property.roof["is_roof_room"]
if unsuitable:
return True
full_insulated_room_roof = (
self.property.roof["is_roof_room"] and
self.property.roof["insulation_thickness"] in ["average", "above_average"]
@ -123,7 +128,7 @@ class RoofRecommendations:
if (self.insulation_thickness >= self.MINIMUM_FLAT_ROOF_ISULATION_MM) and self.property.roof["is_flat"]:
return
if self.is_room_roof_insulated():
if self.is_room_roof_insulated_or_unsuitable(measures):
return
# If we have a u-value already, need to implement this
@ -245,8 +250,10 @@ class RoofRecommendations:
if is_pitched:
insulation_materials = self.loft_insulation_materials
measure_type = "loft_insulation"
elif is_flat:
insulation_materials = self.flat_roof_insulation_materials
measure_type = "flat_roof_insulation"
else:
raise ValueError("Roof is not pitched or flat")
@ -362,6 +369,7 @@ class RoofRecommendations:
)
],
"type": material["type"],
"measure_type": measure_type,
"description": self.make_roof_insulation_description(material),
"starting_u_value": u_value,
"new_u_value": new_u_value,
@ -485,10 +493,9 @@ class RoofRecommendations:
recommendations.append(
{
"phase": phase,
"parts": [
# TODO
],
"parts": [],
"type": "room_roof_insulation",
"measure_type": "room_roof_insulation",
"description": "Insulate room in roof at rafters and re-decorate",
"starting_u_value": u_value,
"new_u_value": new_u_value,

View file

@ -52,6 +52,7 @@ class SecondaryHeating:
"phase": phase,
"parts": [],
"type": "secondary_heating",
"measure_type": "secondary_heating",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -2,7 +2,7 @@ import numpy as np
import pandas as pd
from recommendations.Costs import Costs
from recommendations.recommendation_utils import override_costs, esimtate_pitched_roof_area
from recommendations.recommendation_utils import override_costs, estimate_pitched_roof_area
class SolarPvRecommendations:
@ -126,6 +126,7 @@ class SolarPvRecommendations:
"phase": phase,
"parts": [],
"type": "solar_pv",
"measure_type": "solar_pv",
"description": description,
"starting_u_value": None,
"new_u_value": None,
@ -174,7 +175,7 @@ class SolarPvRecommendations:
if self.property.roof["is_flat"]:
roof_area = self.property.insulation_floor_area
else:
roof_area = esimtate_pitched_roof_area(
roof_area = estimate_pitched_roof_area(
floor_area=self.property.insulation_floor_area, floor_height=self.property.data["floor-height"]
)
solar_configurations = pd.DataFrame(
@ -221,6 +222,7 @@ class SolarPvRecommendations:
"phase": phase,
"parts": [],
"type": "solar_pv",
"measure_type": "solar_pv",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -66,6 +66,7 @@ class VentilationRecommendations(Definitions):
"phase": None,
"parts": part,
"type": part[0]["type"],
"measure_type": "mechanical_ventilation",
"description": f"Install {n_units} {part[0]['description']} units",
"starting_u_value": None,
"new_u_value": None,
@ -106,6 +107,7 @@ class VentilationRecommendations(Definitions):
"phase": None,
"parts": [],
"type": "trickle_vents",
"measure_type": "trickle_vents",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -374,6 +374,7 @@ class WallRecommendations(Definitions):
)
],
"type": "cavity_wall_insulation",
"measure_type": "cavity_wall_insulation",
"description": description,
"starting_u_value": u_value,
"new_u_value": new_u_value,
@ -545,6 +546,7 @@ class WallRecommendations(Definitions):
)
],
"type": material["type"],
"measure_type": material["type"], # This is distinguished between EWI & IWI
"description": self._make_description(material),
"starting_u_value": u_value,
"new_u_value": new_u_value,

View file

@ -223,6 +223,7 @@ class WindowsRecommendations:
"phase": phase,
"parts": [],
"type": "windows_glazing",
"measure_type": "double_glazing" if not is_secondary_glazing else "secondary_glazing",
"description": description,
"starting_u_value": None,
"new_u_value": None,
@ -272,6 +273,7 @@ class WindowsRecommendations:
"phase": phase,
"parts": [],
"type": "mixed_glazing",
"measure_type": "mixed_glazing",
"description": description,
"starting_u_value": None,
"new_u_value": None,

View file

@ -205,7 +205,7 @@ def get_wall_u_value(
mapped_value = wall_uvalues_df[
wall_uvalues_df["Wall_type"] == mapped_description
][age_band].values[0]
][age_band].values[0]
if pd.isnull(mapped_value) and "Park home" in mapped_description:
# We don't know enough in this case so we default to 0
@ -505,7 +505,7 @@ def get_floor_u_value(
insulation_lookup = s11[
s11["Age_band"].str.contains(age_band) & s11["Floor_construction"]
== floor_type
]
]
if insulation_lookup.empty:
insulation_thickness = 0
else:
@ -700,34 +700,17 @@ def convert_thickness_to_numeric(string_thickness, is_pitched, is_flat):
return int(string_thickness)
def esimtate_pitched_roof_area(floor_area: float, floor_height: float) -> float:
def estimate_pitched_roof_area(floor_area: float) -> float:
"""
This function will estimate the area of a pitched roof, given the floor area below the roof and the floor
height of the property.
Given limited information about the home, this is a very rough method to estimate the roof area and we
assume the the room is a gable roof.
We assume a roughly average pitch of 45 degrees
Note that both floor area and height should be in the same units. E.g. if floor area is meters squared,
floor height should be in meters
This function mimics the methodology for calculating floor area in Elmhurst, so that we can simulate the outcomes
in a way that is consistent with the Elmhurst methodology.
:param floor_area: area of the home's floor
:param floor_height: height of the home's floors
:return: Numerical estimate of the surface area of the top of the pitched roof
"""
# We estimate the length of the wall by just modelling the house as a square
wall_width = np.sqrt(floor_area)
# We're modelling the roof as two triangles where we know two of the three sides.
# The floor height makes up one side and half of the wall width makes up the other side
slope = np.sqrt(np.square(wall_width / 2) + np.square(floor_height))
area = 2 * (slope * wall_width)
return area
scalar = 1.0571283428862048
return scalar * (floor_area / np.cos(np.radians(30)))
def estimate_windows(

View file

@ -41,12 +41,18 @@ class TestLightingRecommendations:
assert len(lr.recommendation) == 1
assert lr.recommendation == [
{'phase': 0, 'parts': [], 'type': 'low_energy_lighting',
'description': 'Install low energy lighting in 4 outlets', 'starting_u_value': None, 'new_u_value': None,
'already_installed': False, 'sap_points': 0.4, 'kwh_savings': 219.0, 'co2_equivalent_savings': 0.035478,
'description_simulation': {'lighting-energy-eff': 'Very Good',
'lighting-description': 'Low energy lighting in all fixed outlets',
'low-energy-lighting': 100}, 'total': 240.24, 'subtotal': 200.20000000000002,
'vat': 40.040000000000006, 'contingency': 14.3, 'preliminaries': 14.3, 'material': 80.0, 'profit': 28.6,
'labour_hours': 3.2, 'labour_days': 0.4, 'labour_cost': 63.0, 'survey': False}
{
'phase': 0, 'parts': [], 'type': 'low_energy_lighting', 'measure_type': 'low_energy_lighting',
'description': 'Install low energy lighting in 4 outlets', 'starting_u_value': None,
'new_u_value': None,
'already_installed': False, 'sap_points': 0.4, 'kwh_savings': 219.0, 'co2_equivalent_savings': 0.035478,
'description_simulation': {
'lighting-energy-eff': 'Very Good',
'lighting-description': 'Low energy lighting in all fixed outlets',
'low-energy-lighting': 100
},
'total': 240.24, 'subtotal': 200.20000000000002,
'vat': 40.040000000000006, 'contingency': 14.3, 'preliminaries': 14.3, 'material': 80.0, 'profit': 28.6,
'labour_hours': 3.2, 'labour_days': 0.4, 'labour_cost': 63.0, 'survey': False
}
]

View file

@ -359,60 +359,36 @@ def test_park_home():
) == 0
def test_esimtate_pitched_roof_area():
roof_area1 = recommendation_utils.esimtate_pitched_roof_area(
floor_area=100, floor_height=2
def test_estimate_pitched_roof_area():
roof_area0 = recommendation_utils.estimate_pitched_roof_area(
floor_area=80,
)
assert np.isclose(roof_area0, 97.65333333333334)
roof_area1 = recommendation_utils.estimate_pitched_roof_area(
floor_area=100,
)
assert np.isclose(roof_area1, 107.70329614269008)
assert np.isclose(roof_area1, 122.06666666666666)
# As the floor height gets bigger, the area should get bigger
roof_area2 = recommendation_utils.esimtate_pitched_roof_area(
floor_area=100, floor_height=3
roof_area2 = recommendation_utils.estimate_pitched_roof_area(
floor_area=45,
)
assert np.isclose(roof_area2, 116.61903789690601)
assert np.isclose(roof_area2, 54.93)
# As the floor area gets smaller, the area should get smaller
roof_area3 = recommendation_utils.esimtate_pitched_roof_area(
floor_area=100, floor_height=1
roof_area3 = recommendation_utils.estimate_pitched_roof_area(
floor_area=60,
)
assert np.isclose(roof_area3, 101.9803902718557)
assert np.isclose(roof_area3, 73.24)
# As the floor area decreases, area should decrease
roof_area4 = recommendation_utils.esimtate_pitched_roof_area(
floor_area=50, floor_height=2
)
assert np.isclose(roof_area4, 57.44562646538029)
# As the floor area increases, area should increase
roof_area5 = recommendation_utils.esimtate_pitched_roof_area(
floor_area=150, floor_height=2
)
assert np.isclose(roof_area5, 157.797338380595)
zero_roof_area = recommendation_utils.esimtate_pitched_roof_area(
floor_area=0, floor_height=1000
zero_roof_area = recommendation_utils.estimate_pitched_roof_area(
floor_area=0,
)
assert zero_roof_area == 0
# If the floor height zero, we don't have a traingle, it's a flat roof
flat_roof_area = recommendation_utils.esimtate_pitched_roof_area(
floor_area=1000, floor_height=0
)
assert flat_roof_area == 1000
zero_roof_area2 = recommendation_utils.esimtate_pitched_roof_area(
floor_area=0, floor_height=0
)
assert zero_roof_area2 == 0
def test_external_wall_area():
# Arrange: Define the test cases

View file

@ -3,12 +3,6 @@ from recommendations.SolarPvRecommendations import SolarPvRecommendations
from backend.Property import Property
from etl.epc.Record import EPCRecord
import pandas as pd
from datetime import datetime
from utils.s3 import read_dataframe_from_s3_parquet, read_from_s3
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.Recommendations import Recommendations
from backend.ml_models.api import ModelApi
import msgpack
class TestSolarPvRecommendations:
@ -86,9 +80,10 @@ class TestSolarPvRecommendations:
def test_valid_all_conditions(self, property_instance_valid_all):
solar_pv = SolarPvRecommendations(property_instance_valid_all)
solar_pv.recommend(phase=0)
assert len(solar_pv.recommendation) == 2
assert solar_pv.recommendation == [
{
'phase': 0, 'parts': [], 'type': 'solar_pv',
'phase': 0, 'parts': [], 'type': 'solar_pv', 'measure_type': 'solar_pv',
'description': 'Install a 4.0 kilowatt-peak (kWp) solar photovoltaic (PV) panel system on 50% the '
'roof.',
'starting_u_value': None, 'new_u_value': None, 'sap_points': None, 'already_installed': False,
@ -97,17 +92,13 @@ class TestSolarPvRecommendations:
'description_simulation': {'photo-supply': 50.0}
},
{
'phase': 0, 'parts': [], 'type': 'solar_pv',
'description': 'Install a 4.0 kilowatt-peak (kWp) '
'solar photovoltaic (PV) panel system '
'on 50% the roof, with a battery '
'storage system.',
'starting_u_value': None, 'new_u_value': None,
'sap_points': None, 'already_installed': False,
'total': 7550.0, 'subtotal': 6291.666666666667,
'vat': 1258.333333333333, 'labour_hours': 48,
'labour_days': 2, 'photo_supply': 50.0,
'has_battery': True, 'initial_ac_kwh_per_year': 3800,
'phase': 0, 'parts': [], 'type': 'solar_pv', 'measure_type': 'solar_pv',
'description': 'Install a 4.0 kilowatt-peak (kWp) solar photovoltaic (PV) panel system on 50% the '
'roof, '
'with a battery storage system.',
'starting_u_value': None, 'new_u_value': None, 'sap_points': None, 'already_installed': False,
'total': 7550.0, 'subtotal': 6291.666666666667, 'vat': 1258.333333333333, 'labour_hours': 48,
'labour_days': 2, 'photo_supply': 50.0, 'has_battery': True, 'initial_ac_kwh_per_year': 3800,
'description_simulation': {'photo-supply': 50.0}
}
]

View file

@ -43,7 +43,7 @@ class TestWindowRecommendations:
assert recommender.recommendation == [
{
'phase': 0, 'parts': [], 'type': 'windows_glazing',
'phase': 0, 'parts': [], 'type': 'windows_glazing', "measure_type": "double_glazing",
'description': 'Install double glazing to all windows',
'starting_u_value': None, 'new_u_value': None, 'sap_points': None, 'already_installed': False,
'total': 7980.0, 'labour_hours': 0.0, 'labour_days': 0.0, 'is_secondary_glazing': False,
@ -92,7 +92,7 @@ class TestWindowRecommendations:
assert recommender2.recommendation == [
{
'phase': 0, 'parts': [], 'type': 'windows_glazing',
'phase': 0, 'parts': [], 'type': 'windows_glazing', "measure_type": "double_glazing",
'description': 'Install double glazing to the remaining windows', 'starting_u_value': None,
'new_u_value': None, 'sap_points': None, 'already_installed': False, 'total': 5700.0,
'labour_hours': 0.0,
@ -193,7 +193,7 @@ class TestWindowRecommendations:
assert recommender5.recommendation == [
{
'phase': 0, 'parts': [], 'type': 'windows_glazing',
'phase': 0, 'parts': [], 'type': 'windows_glazing', 'measure_type': 'secondary_glazing',
'description': 'Install secondary glazing to the remaining windows', 'starting_u_value': None,
'new_u_value': None, 'sap_points': None, 'already_installed': False, 'total': 4560.0,
'labour_hours': 0.0, 'labour_days': 0.0, 'is_secondary_glazing': True,
@ -240,7 +240,7 @@ class TestWindowRecommendations:
assert recommender6.recommendation == [
{
'phase': 0, 'parts': [], 'type': 'windows_glazing',
'phase': 0, 'parts': [], 'type': 'windows_glazing', 'measure_type': 'secondary_glazing',
'description': 'Install secondary glazing to all windows. Secondary glazing recommended due to '
'herigate building status',
'starting_u_value': None, 'new_u_value': None, 'sap_points': None, 'already_installed': False,
@ -396,7 +396,7 @@ class TestWindowRecommendations:
assert recommender9.recommendation == [
{
'phase': 0, 'parts': [], 'type': 'windows_glazing',
'phase': 0, 'parts': [], 'type': 'windows_glazing', 'measure_type': 'double_glazing',
'description': 'Install double glazing to all windows', 'starting_u_value': None, 'new_u_value': None,
'sap_points': None, 'already_installed': False, 'total': 7980.0, 'labour_hours': 0.0,
'labour_days': 0.0, 'is_secondary_glazing': False,
@ -638,7 +638,8 @@ class TestWindowRecommendations:
'glazing_coverage_ending': 'full', 'id': '1+1'
}
assert simulated_data[0] == expected_simulated_outcome
# Make sure all keys are the same, apart from days_to_ending
assert all([v == expected_simulated_outcome[k] for k, v in simulated_data[0].items() if k != "days_to_ending"])
# has_glazing_ending and glazing_coverage_ending are not in the starting record - test for this in case it
# changes
@ -648,7 +649,7 @@ class TestWindowRecommendations:
# Check which keys are different
different = []
for k in simulated_data[0].keys():
if k in ["id", 'has_glazing_ending', 'glazing_coverage_ending']:
if k in ["id", 'has_glazing_ending', 'glazing_coverage_ending', 'days_to_ending']:
continue
if simulated_data[0][k] != starting_record[k]:
different.append(
@ -666,7 +667,6 @@ class TestWindowRecommendations:
'simulated': 'double glazing installed during or after 2002'},
{'variable': 'multi_glaze_proportion_ending', 'starting': 0.0, 'simulated': 100},
{'variable': 'windows_energy_eff_ending', 'starting': 'Very Poor', 'simulated': 'Average'},
{'variable': 'days_to_ending', 'starting': 3642, 'simulated': 3713}
]
assert different == expected_different