Model/backend/Property.py
2025-12-05 09:40:24 +00:00

1443 lines
61 KiB
Python

import os
import ast
from itertools import groupby
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from etl.epc.Dataset import TrainingDataset
from etl.epc.Record import EPCRecord
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.settings import DATA_ANOMALY_MATCHES
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
from recommendations.recommendation_utils import (
estimate_perimeter,
get_wall_type,
estimate_external_wall_area,
estimate_windows,
estimate_pitched_roof_area
)
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.app.utils import sap_to_epc
from backend.Funding import Funding
import backend.app.assumptions as assumptions
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
DATA_BUCKET = os.environ.get(
"DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None
)
logger = setup_logger()
class Property:
ATTRIBUTE_MAP = {
"floor-description": "floor",
"hotwater-description": "hotwater",
"main-fuel": "main_fuel",
"mainheat-description": "main_heating",
"mainheatcont-description": "main_heating_controls",
"roof-description": "roof",
"walls-description": "walls",
"windows-description": "windows",
"lighting-description": "lighting",
}
floor = None
hotwater = None
main_fuel = None
main_heating = None
main_heating_controls = None
roof = None
walls = None
windows = None
lighting = None
energy_source = None
spatial = None
base_difference_record = None
DATA_ANOMALY_MATCHES = DATA_ANOMALY_MATCHES
# Surplus information, that can be provided as optional inputs, by a customer
n_bathrooms = None
n_bedrooms = None
landlord_property_id = None # unique reference for the property as recognised by the landlord
building_id = None # Used to group properties together into a single building
# Contains the solar panel optimisation results from the Google Solar API
solar_panel_configuration = None
# If true, indicates the floor area has actually been given to us by the owner, and we should use this figure
# instead of the one in the EPC, when we simulate
owner_floor_area = False
def __init__(
self,
id,
postcode,
address,
epc_record,
property_valuation=None,
already_installed=None,
non_invasive_recommendations=None,
measures=None,
energy_assessment=None,
is_new=True,
inspections=None,
**kwargs
):
self.epc_record = epc_record
self.id = id
self.is_new = is_new
self.address = address
self.postcode = postcode
self.data = {
k.replace("_", "-"): v for k, v in epc_record.get("prepared_epc").items()
}
self.old_data = epc_record.get("old_data")
self.property_dimensions = None
# This is a list of measures that have already been installed in the property, typically found as a result
# of the non-invasive surveys. We reflect that this has been installed in the recommendations, but remove the
# cost and instead, provide a message that the measure has already been installed
self.already_installed = already_installed
self.non_invasive_recommendations = (
non_invasive_recommendations['recommendations'] if
non_invasive_recommendations else []
)
# This is a list of measures that have been recommended for the property
if isinstance(measures, list):
self.measures = measures
else:
self.measures = ast.literal_eval(measures) if measures else None
self.valuation = property_valuation
self.uprn = epc_record.get("uprn")
self.uprn_source = self.data.get("uprn-source")
self.full_sap_epc = epc_record.get("full_sap_epc")
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = epc_record.get("year_built")
self.number_of_rooms = epc_record.prepared_epc.get("number_habitable_rooms")
self.age_band = epc_record.get("age_band")
self.construction_age_band = epc_record.get("construction_age_band")
self.number_of_floors = epc_record.get("number_of_floors")
self.perimeter = None
self.wall_type = None
self.floor_type = None
self.energy_cost_estimates = {}
self.energy_consumption_estimates = {}
# when storing the energy, we'll also
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
"epc_co2_emissions": epc_record.get("co2_emissions_current"),
# These will be added in once we estimate the amount of emissions from appliances - using the carbon
# intensity of electricity
"appliances_co2_emissions": None,
"co2_emissions": None
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
}
self.solar_pv = {
"solar_pv": epc_record.get("photo_supply"),
}
self.solar_hot_water = {
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
"solar_hot_water_boolean": epc_record.get("solar_water_heating_flag_bool"),
}
self.wind_turbine = {
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
}
self.number_of_open_fireplaces = {
"number_of_open_fireplaces": epc_record.prepared_epc.get(
"number_open_fireplaces"
),
}
self.number_of_extensions = {
"number_of_extensions": epc_record.prepared_epc.get("extension_count"),
}
self.number_of_storeys = {
"number_of_storeys": epc_record.prepared_epc.get("flat_storey_count"),
}
self.heat_loss_corridor = {
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
"heat_loss_corridor_boolean": epc_record.get("heat_loss_corridor_bool"),
}
self.mains_gas = epc_record.prepared_epc.get("mains_gas_flag")
self.floor_height = epc_record.prepared_epc.get("floor_height")
self.insulation_wall_area = None
self.floor_area = epc_record.prepared_epc.get("total_floor_area")
self.roof_area = None
self.insulation_floor_area = None
self.number_lighting_outlets = epc_record.prepared_epc.get(
"fixed_lighting_outlets_count"
)
self.floor_level = None
self.number_of_windows = None
self.windows_area = None
self.solar_pv_percentage = None
self.current_energy_consumption = None
self.current_energy_consumption_heating_hotwater = None
self.current_energy_bill = None
self.expected_energy_bill = None
self.heating_energy_source = None
self.hot_water_energy_source = None
self.recommendations_scoring_data = []
self.simulation_epcs = {}
self.updated_simulation_epcs = []
# This additional condition data should change how we pass kwargs to this. We should no longer need to pass
# kwargs to this class, but instead, we should pass the energy assessment condition data
energy_assessment = (
{"condition": {}, "energy_assessment_is_newer": False} if energy_assessment is None else energy_assessment
)
self.energy_assessment_condition_data = energy_assessment["condition"]
self.energy_assessment_is_newer = energy_assessment["energy_assessment_is_newer"]
# Store inspections
self.inspections = inspections
# TODO: We keep this but only temporarily until we add bathrooms, bedrooms, building id to the condition data
self.parse_kwargs(kwargs)
# Funding
# self.gbis_eligibiltiy = None
# self.eco4_eligibility = None
# self.whlg_eligibility = None
self.scheme = None
self.funded_measures = None
self.project_funding = None
self.total_uplift = None
self.full_project_score = None
self.partial_project_score = None
self.uplift_project_score = None
# Ventilation
self.has_ventilation = self.identify_ventilation()
@classmethod
def extract_kwargs(cls, kwargs):
"""
This method is to be used in the router, to extract the kwargs from the request and prevent any errors such as
non-integer values, or inputs that clash with the __init__ method of this class
:param kwargs:
:return:
"""
# Note - none of this data is contained in an energy asssessment, but we should consider how this is done
# as we collect more data from the energy assessment
n_bathrooms = kwargs.get("n_bathrooms", None)
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
n_bathrooms = int(round(float(n_bathrooms) + 1e-5)) if n_bathrooms not in [None, ""] else None
n_bedrooms = kwargs.get("n_bedrooms", None)
n_bedrooms = int(round(float(n_bedrooms) + 1e-5)) if n_bedrooms not in [None, ""] else None
number_of_floors = kwargs.get("number_of_floors", None)
number_of_floors = int(round(float(number_of_floors) + 1e-5)) if number_of_floors not in [None, ""] else None
insulation_floor_area = kwargs.get("insulation_floor_area", None)
insulation_floor_area = float(insulation_floor_area) if insulation_floor_area not in [None, ""] else None
insulation_wall_area = kwargs.get("insulation_wall_area", None)
insulation_wall_area = float(insulation_wall_area) if insulation_wall_area not in [None, ""] else None
# We allow for the asset owner to provide us with total floor area, in the event of it being incorrect
floor_area = kwargs.get("floor_area", None)
floor_area = float(floor_area) if floor_area not in [None, ""] else None
return {
"n_bathrooms": n_bathrooms,
"n_bedrooms": n_bedrooms,
"number_of_floors": number_of_floors,
"insulation_floor_area": insulation_floor_area,
"insulation_wall_area": insulation_wall_area,
"building_id": kwargs.get("building_id", kwargs.get("landlord_block_reference", None)),
"floor_area": floor_area,
"landlord_property_id": kwargs.get("landlord_property_id"),
}
def parse_kwargs(self, kwargs):
# We extract the elements from kwargs that we recognise. Anything additional is ignored
for arg, val in kwargs.items():
if val is not None:
if arg == "floor_area":
self.owner_floor_area = True
setattr(self, arg, val)
def create_base_difference_epc_record(self, cleaned_lookup: dict):
"""
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
expected EPC
It will be the same starting and ending EPC, as we don't have the expected EPC yet
"""
fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD
fixed_data_col_names = [
x.lower().replace("_", "-") for x in fixed_data_col_names
]
fixed_data = {
k.replace("-", "_"): v
for k, v in self.data.items()
if k in fixed_data_col_names
}
difference_record = self.epc_record.create_EPCDifferenceRecord(
self.epc_record, fixed_data
)
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
# If we have variables that have been given to us by the landlord that we know are correct, whereas the EPC
# may not be, we use them
if self.owner_floor_area is not None:
self.base_difference_record.df["total_floor_area_ending"] = self.floor_area
self.base_difference_record.df["estimated_perimeter_ending"] = self.perimeter
def simulate_all_representative_recommendations(
self, property_representative_recommendations,
):
"""
This method was put together to simulate the impact of the representative recommendations on the property
all at once, for usage within the mds report
:return:
"""
recommendation_record = self.base_difference_record.df.to_dict("records")[
0
].copy()
scoring_dict = self.create_recommendation_scoring_data(
property_id=self.id,
recommendation_record=recommendation_record,
recommendations=property_representative_recommendations,
primary_recommendation_id=self.id,
non_invasive_recommendations=self.non_invasive_recommendations,
)
return scoring_dict
def adjust_difference_record_with_recommendations(
self, property_recommendations, property_representative_recommendations
):
"""
This method will adjust the difference record, based on the recommendations made for the property
In order to score the measures, we need to consider the phase of the retrofit.
:param property_recommendations: dictionary of recommendations for the property
:param property_representative_recommendations: dictionary of representative recommendations for the property
"""
self.recommendations_scoring_data = []
self.simulation_epcs = {}
phases = sorted(
[
r[0]["phase"]
for r in property_recommendations
if r[0]["phase"] is not None
]
)
simulation_lodgment_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
for phase in phases:
property_recommendations_by_phase = [
r for r in property_recommendations if r[0]["phase"] == phase
][0]
previous_phases = [p for p in phases if p < phase]
previous_phase_representatives = [
r
for r in property_representative_recommendations
if r["phase"] in previous_phases
]
# For solid wall insulation, we will actually have 2 representative recommendations, since we consider
# both internal and external wall insulation as possible measures. We will use the representative that
# has the lowest efficiency.
# Take the representative with the lowest efficiency, by phase
# To be safe, we sort by phase
previous_phase_representatives = sorted(
previous_phase_representatives, key=lambda x: x["phase"]
)
previous_phase_representatives = [
min(group, key=lambda x: x["efficiency"])
for _, group in groupby(
previous_phase_representatives, key=lambda x: x["phase"]
)
]
recommendation_record = self.base_difference_record.df.to_dict("records")[
0
].copy()
recommendation_record["days_to_ending"] = EPCRecord._calculate_days_to(
lodgement_date=simulation_lodgment_date,
)
for rec in property_recommendations_by_phase:
# We simulate the impact of the recommendation at this current phase, and all of the prior phases
if rec["type"] in ["trickle_vents", "draught_proofing"]:
continue
scoring_dict = self.create_recommendation_scoring_data(
property_id=self.id,
recommendation_record=recommendation_record,
recommendations=previous_phase_representatives + [rec],
primary_recommendation_id=rec["recommendation_id"],
)
self.recommendations_scoring_data.append(scoring_dict)
simulation_epc = self.epc_record.prepared_epc.copy()
# Insert static values
simulation_epc["lodgement_date"] = simulation_lodgment_date
simulation_epc = {k.replace("_", "-"): v for k, v in simulation_epc.items()}
types = [x["type"] for x in previous_phase_representatives]
if "external_wall_insulation" in types and "internal_wall_insulation" in types:
raise Exception("We shouldn't have this in the representative recommendations")
# We include previous phases + the recommendation itself in the EPC transformations
epc_transformations = [
x["description_simulation"] for x in previous_phase_representatives + [rec]
]
# It is possible that we could have two simulations applied to the same descriptions
# We extract these out
phase_epc_transformation = {}
for config in epc_transformations:
for k, v in config.items():
if k in phase_epc_transformation:
if "-energy-eff" in k:
# We take the highest value
if phase_epc_transformation[k] == "Very Good":
continue
elif phase_epc_transformation[k] == "Good":
if v == "Very Good":
phase_epc_transformation[k] = v
elif phase_epc_transformation[k] == "Average":
if v in ["Good", "Very Good"]:
phase_epc_transformation[k] = v
elif phase_epc_transformation[k] == "Poor":
if v in ["Average", "Good", "Very Good"]:
phase_epc_transformation[k] = v
else:
phase_epc_transformation[k] = v
continue
if phase_epc_transformation[k] == v:
continue
if k == "hotwater-description":
if (
v == "From main system"
) and (
phase_epc_transformation["mainheat-description"] == "Electric storage heaters"
) and (
"Electric immersion" in phase_epc_transformation["hotwater-description"]
):
# It means we've recommended HHR with electric immersion, and shouldn't overwrite
# the hot water description
continue
raise NotImplementedError(
"Already have this key in the phase_epc_transformation - implement me"
)
phase_epc_transformation[k] = v
simulation_epc.update(phase_epc_transformation)
self.simulation_epcs[rec["recommendation_id"]] = simulation_epc
def update_simulation_epcs(self, impact_summary):
"""
This method will insert the high level measures, such as SAP, heat demand, carbon, etc
:return:
"""
if self.simulation_epcs is None:
raise ValueError("Simulation EPCs have not been created")
rec_ids = list(self.simulation_epcs.keys())
updated_simulation_epcs = []
for rec_id in rec_ids:
sim_epc = self.simulation_epcs[rec_id].copy()
rec_impact = [x for x in impact_summary if x["recommendation_id"] == rec_id][0]
# We update all of the features that should have an impact on the kwh model
sim_epc.update(
{
# CO₂ emissions per square metre floor area per year in kg/m². Since CO₂ emissions are in tonnes
# per year, we multiply by 1000 to get kg/m²
"co2-emiss-curr-per-floor-area": round(
1000 * (rec_impact["carbon"] / self.data["total-floor-area"])
),
"co2-emissions-current": rec_impact["carbon"],
"current-energy-rating": sap_to_epc(rec_impact["sap"]),
"current-energy-efficiency": int(np.floor(rec_impact["sap"])),
"energy-consumption-current": rec_impact["heat_demand"],
"id": "+".join([str(self.id), rec_id])
}
)
updated_simulation_epcs.append(sim_epc)
# Now we havet this data inthe
self.updated_simulation_epcs = updated_simulation_epcs
@staticmethod
def create_recommendation_scoring_data(
property_id,
recommendation_record,
recommendations: list,
primary_recommendation_id: int,
):
"""
This function will iterate through a list of recommendations and apply a simulation for each recommendation
This allows us to later multiple measures and see the impact of the measures on the property
:param property_id: The id of the property
:param recommendation_record: The record of the property, which will be updated
:param recommendations: The list of recommendations to apply
:param primary_recommendation_id: The id of the primary recommendation, which is used to identify the record
:return: The updated recommendation record
"""
output = recommendation_record.copy()
for col in [
"walls_insulation_thickness",
"floor_insulation_thickness",
"roof_insulation_thickness",
]:
if output[col] is None:
output[col] = "none"
for recommendation in recommendations:
# For the list of recommendations we have, we iteratively update the output
if recommendation["type"] == "sealing_open_fireplace":
output["number_open_fireplaces_ending"] = 0
if recommendation["type"] == "low_energy_lighting":
output["low_energy_lighting_ending"] = 100
output["lighting_energy_eff_ending"] = "Very Good"
if recommendation["type"] in [
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"cylinder_thermostat", "loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "mixed_glazing",
"windows_glazing", "mechanical_ventilation", "solar_pv"
]:
# We update the data, as defined in the recommendaton
for prefix in ["walls", "roof", "floor"]:
if output[f"{prefix}_insulation_thickness_ending"] is None:
output[f"{prefix}_insulation_thickness_ending"] = "none"
simulation_config = recommendation["simulation_config"].copy()
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning
# value
for key, value in simulation_config.items():
if value is None:
simulation_config[key] = "Unknown"
output.update(simulation_config)
if recommendation["type"] not in [
"sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation",
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing",
"extension_cavity_wall_insulation", "mechanical_ventilation",
]:
raise NotImplementedError(
"Implement me, given type %s" % recommendation["type"]
)
output["id"] = "+".join([str(property_id), str(primary_recommendation_id)])
return output
def set_features(
self,
cleaned,
kwh_client,
kwh_predictions
):
"""
Given the cleaning that has been performed, we'll use this to identify the property
components, from roof to walls to windows, heating and hot water
:param cleaned: This is the dictionary of components found in cleaner.cleaned
:param kwh_client: The client that will be used to convert the energy costs to today's costs
:param kwh_predictions: Contains the kwh predictions for heating and hot water
:return:
"""
if not cleaned:
raise ValueError("Cleaner does not contain cleaned data")
if not self.data:
raise ValueError("Property does not contain data")
for description, attribute in cleaned.items():
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
fill_dict.update(
{
"original_description": self.data[description],
"clean_description": self.data[description],
}
)
setattr(
self,
self.ATTRIBUTE_MAP[description],
fill_dict,
)
continue
attributes = [
x
for x in cleaned[description]
if x["original_description"] == self.data[description]
]
if len(attributes) > 1:
raise ValueError(
"Either No attributes or multiple found for %s" % description
)
if len(attributes) == 0:
# We attempt to perform the clean on the fly
cleaner_cls = all_cleaner_map[description]
if description == "lighting-description":
cleaner_cls = cleaner_cls(self.data[description], averages=None)
else:
cleaner_cls = cleaner_cls(self.data[description])
processed = {
"original_description": self.data[description],
"clean_description": cleaner_cls.description.replace(
"(assumed)", ""
)
.rstrip()
.capitalize(),
**cleaner_cls.process(),
}
attributes = [processed]
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
self.set_basic_property_dimensions()
self.set_wall_type()
self.set_floor_type()
self.set_floor_level()
self.set_windows_count()
self.set_energy_source()
self.find_energy_sources()
self.set_current_energy(kwh_client, kwh_predictions)
def set_solar_panel_configuration(self, solar_panel_configuration):
"""
This funtion inserts the solar panel configuration into the property object
"""
self.solar_panel_configuration = solar_panel_configuration
def set_current_energy(self, kwh_client, kwh_predictions):
"""
Given what we know about the property now, estimates the current energy consumption using the UCL paper
https://www.sciencedirect.com/science/article/pii/S0378778823002542
:return:
"""
# We get the following things:
# 1) Today's cost. This give us a basline figure for what the cost is today
# 2) Predicted KwH
# Today's costs
todays_lighting_cost = kwh_client.convert_cost_to_today(
original_cost=float(self.data["lighting-cost-current"]),
lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]).tz_localize(None)
)
# If we have the kwh figures, we don't need to predict them
condition_data = self.energy_assessment_condition_data.copy()
heating_kwh_predictions = kwh_predictions["heating_kwh_predictions"]
hotwater_kwh_predictions = kwh_predictions["hotwater_kwh_predictions"]
heating_prediction = (
condition_data.get("space_heating_kwh") if condition_data.get("space_heating_kwh") is not None else
heating_kwh_predictions[
heating_kwh_predictions["id"].astype(int) == self.uprn
]["predictions"].values[0]
)
hot_water_prediction = (
condition_data.get("water_heating_kwh") if condition_data.get("water_heating_kwh") is not None else
hotwater_kwh_predictions[
hotwater_kwh_predictions["id"].astype(int) == self.uprn
]["predictions"].values[0]
)
# We convert the lighting cost into kwh, just using the price cap
lighting_kwh = todays_lighting_cost / AnnualBillSavings.ELECTRICITY_PRICE_CAP
appliances_kwh = AnnualBillSavings.estimate_appliances_energy_use(total_floor_area=self.floor_area)
unadjusted_kwh_estimates = {
"heating": float(heating_prediction),
"hot_water": float(hot_water_prediction),
"lighting": float(lighting_kwh),
"appliances": float(appliances_kwh)
}
unadjusted_heating_costs = {
"heating": None,
"hot_water": None,
"lighting": float(todays_lighting_cost),
"appliances": float(appliances_kwh) * AnnualBillSavings.ELECTRICITY_PRICE_CAP
}
# Sum up the adjusted kwh figures
self.current_energy_consumption = sum(list(unadjusted_kwh_estimates.values()))
self.current_energy_consumption_heating_hotwater = (
unadjusted_kwh_estimates["heating"] + unadjusted_kwh_estimates["hot_water"]
)
self.energy_cost_estimates = {
"unadjusted": unadjusted_heating_costs,
# Don't think we need the EPC
# "epc": {
# "heating": float(self.data["heating-cost-current"]),
# "hot_water": float(self.data["hot-water-cost-current"]),
# "lighting": float(self.data["lighting-cost-current"]),
# }
}
self.energy_consumption_estimates = {
"unadjusted": unadjusted_kwh_estimates
}
# Update carbon with appliances
self.energy["appliances_co2_emissions"] = (
(unadjusted_kwh_estimates["appliances"] * assumptions.ELECTRICITY_CARBON_INTENSITY) / 1000
)
# Re-calculate total CO2 emissions
self.energy["co2_emissions"] = float(np.round(
self.energy["epc_co2_emissions"] + self.energy["appliances_co2_emissions"], 2
))
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
:param spatial: Dataframe, containing the spatial data for the property
"""
self.in_conservation_area = spatial["conservation_status"].values[0]
self.is_listed = spatial["is_listed_building"].values[0]
self.is_heritage = spatial["is_heritage_building"].values[0]
# We do an equals True, in the case of one of these variables being True
if (
(self.in_conservation_area == True)
| (self.is_listed == True)
| (self.is_heritage == True)
):
self.restricted_measures = True
spatial_dict = spatial.to_dict("records")[0]
self.spatial = {
"x_coordinate": spatial_dict["X_COORDINATE"],
"y_coordinate": spatial_dict["Y_COORDINATE"],
"latitude": spatial_dict["LATITUDE"],
"longitude": spatial_dict["LONGITUDE"],
"conservation_status": spatial_dict["conservation_status"],
"is_listed_building": spatial_dict["is_listed_building"],
"is_heritage_building": spatial_dict["is_heritage_building"],
}
def _clean_upload_data(self, to_update):
for k, v in to_update.items():
if v in self.DATA_ANOMALY_MATCHES:
to_update[k] = None
return to_update
def get_full_property_data(self, current_valuation=None):
"""
This method extracts the data which is pushed to the database, containing core information, from the EPC
about a property
:return:
"""
property_data = {
"creation_status": "READY",
"uprn": int(self.data["uprn"]),
"building_reference_number": (
int(self.data["building-reference-number"]) if
self.data["building-reference-number"] is not None else None
),
"has_pre_condition_report": True,
"has_recommendations": True,
"property_type": self.data["property-type"],
"built_form": self.data["built-form"],
"local_authority": self.data["local-authority-label"],
"constituency": self.data["constituency-label"],
"number_of_rooms": self.number_of_rooms,
"year_built": self.year_built,
"tenure": self.data["tenure"],
"current_epc_rating": self.data["current-energy-rating"],
"current_sap_points": self.data["current-energy-efficiency"],
"current_valuation": current_valuation,
}
property_data = self._clean_upload_data(property_data)
return property_data
@classmethod
def _prepare_rating_field(cls, field, rating_lookup):
"""
Utility function for usage in the lambda, for preparing the _rating fields
"""
return (
rating_lookup[field].value
if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None)
else None
)
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
if self.current_energy_bill is None:
raise ValueError("Current energy bill has not been set")
property_details_epc = {
"property_id": self.id,
"portfolio_id": portfolio_id,
"full_address": self.data["address"],
"total_floor_area": float(self.data["total-floor-area"]),
"walls": self.walls["clean_description"],
"walls_rating": self._prepare_rating_field(
self.data["walls-energy-eff"], rating_lookup
),
"roof": self.roof["clean_description"],
"roof_rating": self._prepare_rating_field(
self.data["roof-energy-eff"], rating_lookup
),
"floor": self.floor["clean_description"],
"floor_rating": self._prepare_rating_field(
self.data["floor-energy-eff"], rating_lookup
),
"windows": self.windows["clean_description"],
"windows_rating": self._prepare_rating_field(
self.data["windows-energy-eff"], rating_lookup
),
"heating": self.main_heating["clean_description"],
"heating_rating": self._prepare_rating_field(
self.data["mainheat-energy-eff"], rating_lookup
),
"heating_controls": self.main_heating_controls["clean_description"],
"heating_controls_rating": self._prepare_rating_field(
self.data["mainheatc-energy-eff"], rating_lookup
),
"hot_water": self.hotwater["clean_description"],
"hot_water_rating": self._prepare_rating_field(
self.data["hot-water-energy-eff"], rating_lookup
),
"lighting": self.lighting["clean_description"],
"lighting_rating": self._prepare_rating_field(
self.data["lighting-energy-eff"], rating_lookup
),
"mainfuel": self.main_fuel["clean_description"],
"ventilation": self.ventilation["ventilation"],
"solar_pv": self.solar_pv["solar_pv"],
"solar_hot_water": self.solar_hot_water["solar_hot_water_boolean"],
"wind_turbine": self.wind_turbine["wind_turbine"],
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces[
"number_of_open_fireplaces"
],
"number_of_extensions": self.number_of_extensions["number_of_extensions"],
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
"mains_gas": self.mains_gas,
"energy_tariff": self.data["energy-tariff"],
"primary_energy_consumption": self.energy["primary_energy_consumption"],
"co2_emissions": self.energy["co2_emissions"],
"current_energy_demand": self.current_energy_consumption,
"current_energy_demand_heating_hotwater": self.current_energy_consumption_heating_hotwater,
"estimated": self.data.get("estimated", False),
# We indicate if we've overwritten a SAP 05 EPC
"sap_05_overwritten": self.data.get("sap_05_overwritten", False),
**self.current_energy_bill
}
return property_details_epc
def get_spatial_data(self, uprn_filenames):
"""
Given a property's UPRN, this method will pull the associated spatial data from s3
:return:
"""
if self.uprn is None:
logger.warning(
"We do not have a UPRN for this property - this needs to be implemented"
)
self.in_conservation_area = False
self.is_listed = False
self.is_heritage = False
self.restricted_measures = True
return
# We get the file name for the uprn
filtered_df = uprn_filenames[
(uprn_filenames["lower"] <= self.uprn)
& (uprn_filenames["upper"] >= self.uprn)
]
if filtered_df.empty:
logger.warning("Could not find file containing UPRNS")
return None
filename = filtered_df.iloc[0]["filenames"]
spatial_data = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
)
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
# Pull out spatial features
self.set_spatial(spatial)
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[
(property_dimensions["PROPERTY_TYPE"] == self.data["property-type"])
]
if (
self.construction_age_band is not None
and self.construction_age_band not in self.DATA_ANOMALY_MATCHES
):
result = result[
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
]
if (
self.data["built-form"] not in self.DATA_ANOMALY_MATCHES
and self.data["built-form"] in result["BUILT_FORM"]
):
result = result[(result["BUILT_FORM"] == self.data["built-form"])]
return result[
["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
].mean()
def set_basic_property_dimensions(self):
"""
This method sets the number of floors of the property, using a simple approach based on an estimate for
average room size, number of rooms and total floor area
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
number of rooms and total floor area
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
medians across the EPC data
:return:
"""
# Many of these pieces of information are now contained in the condition data
condition_data = self.energy_assessment_condition_data.copy()
# We can update the number of floors if we have this information in the condition data
self.number_of_floors = int(self.energy_assessment_condition_data["number_of_floors"]) \
if (condition_data.get("number_of_floors") is not None) and (self.number_of_floors is not None) \
else self.number_of_floors
# If we already have this, we re-engineer the perimeter
if self.insulation_floor_area is not None:
self.perimeter = np.sqrt(self.insulation_floor_area) * 4
else:
self.perimeter = float(self.energy_assessment_condition_data["perimeter"]) \
if condition_data.get("perimeter") is not None \
else estimate_perimeter(
floor_area=self.floor_area / self.number_of_floors,
num_rooms=self.number_of_rooms / self.number_of_floors
)
self.insulation_wall_area = float(self.energy_assessment_condition_data["insulation_wall_area"]) \
if (condition_data.get("insulation_wall_area") is not None) and (self.insulation_wall_area is not None) \
else estimate_external_wall_area(
num_floors=self.number_of_floors,
floor_height=self.floor_height,
perimeter=self.perimeter,
built_form=self.data["built-form"],
)
if self.insulation_floor_area is None:
self.insulation_floor_area = float(
self.energy_assessment_condition_data["main_dwelling_ground_floor_area"]
) if (condition_data.get("main_dwelling_ground_floor_area") is not None) else (
self.floor_area / self.number_of_floors
)
if not self.roof["is_flat"]:
self.roof_area = estimate_pitched_roof_area(
floor_area=self.insulation_floor_area,
)
else:
self.roof_area = self.insulation_floor_area
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]]
if self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES
and self.data["floor-level"] is not None
else None
)
if self.floor_level is None:
if self.data["property-type"] != "Flat":
return
if self.floor["another_property_below"]:
self.floor_level = 1
else:
self.floor_level = 0
return
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
# where a property is marked as being on the first floor
if self.floor_level > 0:
# We check if there is another property below (for a non-sap assessment)
if not self.floor["another_property_below"] and self.floor["thermal_transmittance_unit"] is None:
self.floor_level = 0
return
if self.floor_level == 0:
# Check if another property below
if self.floor["another_property_below"]:
self.floor_level = 1
return
def set_wall_type(self):
"""
This method sets the wall type of the property, using a simple approach based on the wall description
:return:
"""
self.wall_type = get_wall_type(**self.walls)
def set_floor_type(self):
"""
This method sets the floor type of the property, which is used for calculating u-values
Section 5.6 of the BRE indicates that
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
and the U-values in Table S12 are used.
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
calculating u-values
"""
if self.floor["is_suspended"] | self.floor["another_property_below"]:
self.floor_type = "suspended"
elif self.floor["is_solid"]:
self.floor_type = "solid"
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
self.floor_type = "exposed_floor"
elif self.floor["thermal_transmittance"] is not None:
self.floor_type = "solid"
else:
# in this case, it's not super clear what the floor type is, so we default - this is a temp
logger.warning(
f"Could not determine floor type, given: '{self.floor['original_description']}', defaulting to "
f"suspended for property {self.uprn}"
)
self.floor_type = "suspended"
@staticmethod
def _extract_component(
component_data, component_rename_cols, component_drop_cols, rename_prefix=None
):
for k in component_rename_cols:
component_data[f"{rename_prefix}_{k}"] = component_data.get(k)
component_data = {
k: v
for k, v in component_data.items()
if k not in component_drop_cols + component_rename_cols
}
return component_data
def set_windows_count(self):
"""
Using the estimate_windows function, this method will set the number of windows in the property
:return:
"""
condition_data = self.energy_assessment_condition_data.copy()
self.number_of_windows = int(condition_data["number_of_windows"]) \
if condition_data.get("number_of_windows") is not None \
else estimate_windows(
property_type=self.data["property-type"],
built_form=self.data["built-form"],
construction_age_band=self.construction_age_band,
floor_area=self.floor_area,
number_habitable_rooms=self.number_of_rooms,
)
self.windows_area = float(condition_data["windows_area"]) \
if condition_data.get("windows_area") is not None \
else None
def set_energy_source(self):
"""
This method sets the energy source of the property, based on the mains gas flag and energy tariff.
"""
# Default to "electricity_and_gas" to cover most scenarios including when mains_gas_flag is True
energy_source = "electricity_and_gas"
# If the tariff explicitly indicates electricity use without a dual indication and mains_gas_flag is not True
# We check for the common electricity tariffs
if not self.data["mains-gas-flag"] and self.data["energy-tariff"] in [
"Single",
"off-peak 7 hour",
"off-peak 10 hour",
"off-peak 18 hour",
"standard tariff",
"24 hour",
]:
energy_source = "electricity"
# Set the energy source based on the conditions above
self.energy_source = energy_source
def find_energy_sources(self):
# Based on the heating and the hot water
heating_fuel_mapping = {
'has_mains_gas': 'Natural Gas',
'has_electric': 'Electricity',
'has_oil': 'Oil',
'has_wood_logs': 'Wood Logs',
'has_coal': 'Coal',
'has_anthracite': 'Anthracite',
'has_smokeless_fuel': 'Smokeless Fuel',
'has_lpg': 'LPG',
'has_b30k': 'B30K Biofuel',
'has_air_source_heat_pump': 'Electricity',
'has_ground_source_heat_pump': 'Electricity',
'has_water_source_heat_pump': 'Electricity',
'has_electric_heat_pump': 'Electricity',
'has_solar_assisted_heat_pump': 'Electricity',
'has_exhaust_source_heat_pump': 'Electricity',
'has_community_heat_pump': 'Electricity',
'has_wood_pellets': 'Wood Pellets',
'has_community_scheme': 'Varied (Community Scheme)',
"has_dual_fuel_mineral_and_wood": 'Wood Logs',
"has_electricaire": 'Electricity',
"has_wood_chips": 'Wood Logs'
}
# Hot water
heater_type_to_fuel = {
'gas instantaneous': 'Natural Gas',
'electric heat pump': 'Electricity',
'electric immersion': 'Electricity',
'gas boiler': 'Natural Gas',
'oil boiler': 'Oil',
'electric instantaneous': 'Electricity',
'gas multipoint': 'Natural Gas',
'heat pump': 'Electricity',
'solid fuel boiler': 'Solid Fuel',
'solid fuel range cooker': 'Solid Fuel',
'room heaters': 'Varied', # Could be any fuel, further specifics needed based on context
"single-point gas": "Natural Gas"
}
# Define a mapping from system types to general categories or modifications of fuel types
system_type_modification = {
'from main system': 'Main System',
'from secondary system': 'Secondary System',
'from second main heating system': 'Secondary System',
'community scheme': 'Community Scheme'
}
hotwater_appliance_to_fuel = {
'gas range cooker': 'Natural Gas',
'oil range cooker': 'Oil'
}
fuel_map = {
None: "Natural Gas (Community Scheme)",
"mains gas": "Natural Gas (Community Scheme)",
"biomass": "Smokeless Fuel",
"electricity": "Electricity",
"biogas": "Smokeless Fuel",
}
self.heating_energy_source = list({
fuel for key, fuel in heating_fuel_mapping.items() if self.main_heating.get(key, False)
})
if set(self.heating_energy_source) == {'Electricity', 'Natural Gas'}:
# It means they have mixed heating so we take the primary one, based on main fuel
# This will probably happen in the case of an extension
if self.main_fuel["clean_description"] in ["Mains gas not community", "Mains gas community"]:
self.heating_energy_source = ['Natural Gas']
else:
self.heating_energy_source = ['Electricity']
if set(self.heating_energy_source) == {'Natural Gas', 'Wood Logs'}:
# It means they have mixed heating so we take the primary one, based on main fuel
# This will probably happen in the case of an extension
if self.main_fuel["clean_description"] in ["Mains gas not community", "Mains gas community"]:
self.heating_energy_source = ['Natural Gas']
else:
self.heating_energy_source = ['Wood Logs']
if len(self.heating_energy_source) > 1 and "Varied (Community Scheme)" not in self.heating_energy_source:
# We might have something like heating energy source equal to ['Natural Gas', 'Varied (Community Scheme)']
# so we treat this as community heating
raise Exception("Investigate me")
if len(self.heating_energy_source) == 0:
heating_flags = {
v for k, v in self.main_heating.items() if k not in ["original_description", "clean_description"]
}
hotwater_flags = {
v for k, v in self.hotwater.items() if k not in ["original_description", "clean_description"]
}
# If all flags are zero, we have a no data example
if (heating_flags == {False} or hotwater_flags == {None}) and (
hotwater_flags == {False} or hotwater_flags == {None}):
# We have nodata so we try and rely on main fuel
if self.main_fuel["fuel_type"] in fuel_map: # We assume when None as it's unknown
mapped_fuel = fuel_map[self.main_fuel["fuel_type"]]
self.heating_energy_source = mapped_fuel
self.hot_water_energy_source = mapped_fuel
return
else:
raise NotImplementedError(f"Unhandled fuel {self.main_fuel['fuel_type']}")
if len(self.heating_energy_source) > 1:
# We treat this as a community scheme
self.heating_energy_source = ["Varied (Community Scheme)"]
self.heating_energy_source = self.heating_energy_source[0]
if self.heating_energy_source == "Varied (Community Scheme)":
if self.main_fuel["fuel_type"] in fuel_map: # We assume when None as it's unknown
self.heating_energy_source = fuel_map[self.main_fuel["fuel_type"]]
else:
raise NotImplementedError(f"Unhandled fuel {self.main_fuel['fuel_type']}")
if self.hotwater["heater_type"] is not None:
self.hot_water_energy_source = heater_type_to_fuel[self.hotwater["heater_type"]]
if self.hotwater["extra_features"] == "plus solar":
self.hot_water_energy_source = self.heating_energy_source + " + Solar Thermal"
return
elif self.hotwater["system_type"] is not None:
fuel = system_type_modification[self.hotwater["system_type"]]
if self.hotwater["extra_features"] == "plus solar":
self.hot_water_energy_source = self.heating_energy_source + " + Solar Thermal"
return
if fuel in ['Main System', "Community Scheme"]:
self.hot_water_energy_source = self.heating_energy_source
elif fuel in ['Secondary System']:
# Check the secondary heating system
secondary_heating = self.data["secondheat-description"]
self.hot_water_energy_source = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[secondary_heating]["fuel"]
else:
raise NotImplementedError(f"Investiage me - unhandled hot water fuel {fuel}")
else:
self.hot_water_energy_source = hotwater_appliance_to_fuel[self.hotwater["appliance"]]
def is_ashp_valid(self, measures):
if "air_source_heat_pump" in self.non_invasive_recommendations:
return True
if "air_source_heat_pump" not in measures:
return False
# If we have a house over a floor area threshold, we recommend an ASHP
if (
self.data["property-type"] in ["House", "Bungalow"] and
self.floor_area > assumptions.ASHP_FLOOR_AREA_THRESHOLD
):
return True
suitable_property_type = (
self.data["property-type"] in ["House", "Bungalow"] and
self.data["built-form"] not in ["Enclosed Mid-Terrace", "Enclosed End-Terrace"]
)
has_air_source_heat_pump = self.main_heating["has_air_source_heat_pump"]
return suitable_property_type and not has_air_source_heat_pump
def is_solar_pv_valid(self):
# If the property is a flat but we are looking at building solar potential, we can include this
if (self.building_id is not None) and (self.solar_panel_configuration is not None):
return True
# If the property is in a conservation area, is listed or is a heriage building, solar panels
# become a difficult measure to generally get through planning restrictions and so we do not recommend
# solar panels
if self.is_listed or self.is_heritage:
# If the property is in a conservation area, we can still recommend solar panels
# but they need to be done in a way that is sympathetic to the building. E.g. the panels
# may be installed such that they are not visible from the street
return False
if (self.data["property-type"] in ["House", "Bungalow"]) and (
not pd.isnull(self.roof["thermal_transmittance"])
):
return True
is_valid_property_type = self.data["property-type"] in ["House", "Bungalow", "Maisonette"]
is_valid_roof_type = (
self.roof["is_flat"] or self.roof["is_pitched"] or self.roof["is_roof_room"]
)
# If there is no existing solar PV, the photo-supply field will be None or a missing value
# We use inspections data to tell us this
if getattr(self.inspections, "roof_orientation", None):
has_no_existing_solar_pv = self.inspections.roof_orientation.value not in [
"already has solar pv", "roof too small", "no roof"
]
else:
has_no_existing_solar_pv = self.data["photo-supply"] in [
None, 0, self.DATA_ANOMALY_MATCHES
]
return is_valid_property_type and is_valid_roof_type and has_no_existing_solar_pv
def estimate_electrical_consumption(self, assumed_ashp_efficiency, exclusions):
"""
Given a property, this method estimates the electrical consumption of the property, based on the energy
consumption, the assumed efficiency of an ASHP and the exclusions.
What we're trying to do here is size up the future electricicty demand of the property, assuming that the
home is eligible for an ASHP. If the property is not eligible for an ASHP, we don't need to adjust the
consumption.
This figure is used to size up solar panels, so they can cover heat generation, even if the property
today doesn't generate its heat from electricity
:param assumed_ashp_efficiency:
:param exclusions:
:return:
"""
exclusions = [] if exclusions is None else exclusions
if "air_source_heat_pump" in exclusions:
return self.current_energy_consumption
# If the property currently has an ASHP, we don't gain from any efficiency improvements
if not self.is_ashp_valid(measures=["air_source_heat_pump"]):
return self.current_energy_consumption
# If the property currently has an electric boiler, it will still benefit from the ASHP efficiency gain
remap_fuel_sources = [
"Natural Gas", "LPG", "Wood Logs", "Oil", "Electricity", "Coal", "Smokeless Fuel",
"Natural Gas + Solar Thermal", "Anthracite", "Wood Pellets", "LPG + Solar Thermal",
"Natural Gas (Community Scheme)"
]
heating_energy_source = self.heating_energy_source
hot_water_energy_source = self.hot_water_energy_source
heating_consumption = self.energy_consumption_estimates["unadjusted"]["heating"]
hotwater_consumption = self.energy_consumption_estimates["unadjusted"]["hot_water"]
if (heating_energy_source not in remap_fuel_sources) or (
hot_water_energy_source not in remap_fuel_sources + ["Electricity + Solar Thermal"]
):
raise NotImplementedError("Have not implemented estimating electrical consumption for this fuel type")
if heating_energy_source in remap_fuel_sources:
# Adjust the heating consumption to reflect the expected efficiency of an ASHP
heating_consumption = heating_consumption / (assumed_ashp_efficiency / 100)
if hot_water_energy_source in remap_fuel_sources:
# Adjust the hot water consumption to reflect the expected efficiency of an ASHP
hotwater_consumption = hotwater_consumption / (assumed_ashp_efficiency / 100)
electric_consumption = (
heating_consumption +
hotwater_consumption +
self.energy_consumption_estimates["unadjusted"]["lighting"] +
self.energy_consumption_estimates["unadjusted"]["appliances"]
)
return electric_consumption
def insert_funding(
self,
scheme,
funded_measures,
project_funding,
total_uplift,
full_project_score,
partial_project_score,
uplift_project_score
):
"""
This method inserts the funding into the property object
"""
self.scheme = scheme
self.funded_measures = funded_measures
self.project_funding = project_funding
self.total_uplift = total_uplift
self.full_project_score = full_project_score
self.partial_project_score = partial_project_score
self.uplift_project_score = uplift_project_score
def identify_ventilation(self):
ventilation_descriptions = [
'mechanical, extract only',
'mechanical, supply and extract'
]
return self.data.get("mechanical-ventilation") in ventilation_descriptions