mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1258 lines
54 KiB
Python
1258 lines
54 KiB
Python
import os
|
|
import ast
|
|
from itertools import groupby
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Set
|
|
from datetime import datetime, timedelta
|
|
|
|
from etl.epc.Dataset import TrainingDataset
|
|
from etl.epc.Record import EPCRecord
|
|
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES
|
|
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import read_dataframe_from_s3_parquet
|
|
from etl.epc.settings import DATA_ANOMALY_MATCHES
|
|
from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
|
|
from recommendations.recommendation_utils import (
|
|
estimate_perimeter,
|
|
get_wall_type,
|
|
estimate_external_wall_area,
|
|
estimate_windows,
|
|
estimate_pitched_roof_area
|
|
)
|
|
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
|
|
from backend.app.utils import sap_to_epc
|
|
import backend.app.assumptions as assumptions
|
|
from backend.app.db.models.portfolio import rating_lookup
|
|
|
|
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
|
|
DATA_BUCKET = os.environ.get(
|
|
"DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None
|
|
)
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class Property:
|
|
ATTRIBUTE_MAP = {
|
|
"floor-description": "floor",
|
|
"hotwater-description": "hotwater",
|
|
"main-fuel": "main_fuel",
|
|
"mainheat-description": "main_heating",
|
|
"mainheatcont-description": "main_heating_controls",
|
|
"roof-description": "roof",
|
|
"walls-description": "walls",
|
|
"windows-description": "windows",
|
|
"lighting-description": "lighting",
|
|
}
|
|
|
|
floor = None
|
|
hotwater = None
|
|
main_fuel = None
|
|
main_heating = None
|
|
main_heating_controls = None
|
|
roof = None
|
|
walls = None
|
|
windows = None
|
|
lighting = None
|
|
|
|
spatial = None
|
|
base_difference_record = None
|
|
|
|
DATA_ANOMALY_MATCHES: Set = DATA_ANOMALY_MATCHES
|
|
|
|
# Surplus information, that can be provided as optional inputs, by a customer
|
|
n_bathrooms = None
|
|
n_bedrooms = None
|
|
landlord_property_id = None # unique reference for the property as recognised by the landlord
|
|
building_id = None # Used to group properties together into a single building
|
|
|
|
# Contains the solar panel optimisation results from the Google Solar API
|
|
solar_panel_configuration = None
|
|
|
|
# If true, indicates the floor area has actually been given to us by the owner, and we should use this figure
|
|
# instead of the one in the EPC, when we simulate
|
|
owner_floor_area = False
|
|
|
|
def __init__(
|
|
self,
|
|
id,
|
|
postcode,
|
|
address,
|
|
epc_record,
|
|
uprn=None, # Pass as an optional input
|
|
property_valuation=None,
|
|
already_installed=None,
|
|
find_my_epc_components=None,
|
|
non_invasive_recommendations=None,
|
|
measures=None,
|
|
energy_assessment=None,
|
|
is_new=True,
|
|
inspections=None,
|
|
**kwargs
|
|
):
|
|
|
|
self.epc_record = epc_record
|
|
|
|
self.id = id
|
|
self.is_new = is_new
|
|
|
|
self.address = address
|
|
self.postcode = postcode
|
|
|
|
self.old_data = self.epc_record.old_data
|
|
# This is a list of measures that have already been installed in the property, typically found as a result
|
|
# of the non-invasive surveys. We reflect that this has been installed in the recommendations, but remove the
|
|
# cost and instead, provide a message that the measure has already been installed
|
|
|
|
self.already_installed = already_installed
|
|
self.non_invasive_recommendations = (
|
|
non_invasive_recommendations['recommendations'] if
|
|
non_invasive_recommendations else []
|
|
)
|
|
self.find_my_epc_components = find_my_epc_components # Store the find my epc components
|
|
# This is a list of measures that have been recommended for the property
|
|
if isinstance(measures, list):
|
|
self.measures = measures
|
|
else:
|
|
self.measures = ast.literal_eval(measures) if measures else None
|
|
|
|
self.valuation = property_valuation
|
|
|
|
self.uprn = uprn if uprn is not None else epc_record.uprn
|
|
self.uprn_source = self.epc_record.uprn_source
|
|
|
|
self.full_sap_epc = self.epc_record.full_sap_epc
|
|
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
|
|
self.restricted_measures = False
|
|
self.year_built = self.epc_record.year_built
|
|
self.number_of_rooms = epc_record.number_habitable_rooms
|
|
self.age_band = epc_record.age_band
|
|
self.construction_age_band = epc_record.construction_age_band
|
|
self.number_of_floors = epc_record.number_of_floors
|
|
self.perimeter = None
|
|
self.wall_type = None
|
|
self.floor_type = None
|
|
|
|
self.energy_cost_estimates = {}
|
|
self.energy_consumption_estimates = {}
|
|
|
|
# when storing the energy, we'll also
|
|
self.energy = {
|
|
"primary_energy_consumption": epc_record.energy_consumption_current,
|
|
"epc_co2_emissions": epc_record.co2_emissions_current,
|
|
# These will be added in once we estimate the amount of emissions from appliances - using the carbon
|
|
# intensity of electricity
|
|
"appliances_co2_emissions": None,
|
|
"co2_emissions": None
|
|
}
|
|
self.mains_gas = self.epc_record.mains_gas_flag
|
|
self.floor_height = self.epc_record.floor_height
|
|
self.insulation_wall_area = None
|
|
self.floor_area = self.epc_record.total_floor_area
|
|
self.roof_area = None
|
|
self.insulation_floor_area = None
|
|
self.number_lighting_outlets = self.epc_record.fixed_lighting_outlets_count
|
|
self.floor_level = None
|
|
self.number_of_windows = None
|
|
self.windows_area = None
|
|
|
|
self.current_energy_consumption = None
|
|
self.current_energy_consumption_heating_hotwater = None
|
|
self.current_energy_bill = None
|
|
|
|
self.recommendations_scoring_data = []
|
|
self.simulation_epcs = {}
|
|
self.updated_simulation_epcs = []
|
|
|
|
# This additional condition data should change how we pass kwargs to this. We should no longer need to pass
|
|
# kwargs to this class, but instead, we should pass the energy assessment condition data
|
|
energy_assessment = (
|
|
{"condition": {}, "energy_assessment_is_newer": False} if energy_assessment is None else energy_assessment
|
|
)
|
|
self.energy_assessment_condition_data = energy_assessment["condition"]
|
|
self.energy_assessment_is_newer = energy_assessment["energy_assessment_is_newer"]
|
|
|
|
# Store inspections
|
|
self.inspections = inspections
|
|
|
|
self.parse_kwargs(kwargs)
|
|
|
|
self.scheme = None
|
|
self.funded_measures = None
|
|
self.project_funding = None
|
|
self.total_uplift = None
|
|
self.full_project_score = None
|
|
self.partial_project_score = None
|
|
self.uplift_project_score = None
|
|
|
|
# Ventilation
|
|
self.has_ventilation = self.identify_ventilation()
|
|
|
|
@staticmethod
|
|
def _safe_int(value: str | int | float | None) -> int | None:
|
|
if value in [None, ""]:
|
|
return None
|
|
return int(round(float(value) + 1e-5))
|
|
|
|
@classmethod
|
|
def extract_kwargs(cls, kwargs):
|
|
"""
|
|
This method is to be used in the router, to extract the kwargs from the request and prevent any errors such as
|
|
non-integer values, or inputs that clash with the __init__ method of this class
|
|
:param kwargs:
|
|
:return:
|
|
"""
|
|
|
|
# Note - none of this data is contained in an energy asssessment, but we should consider how this is done
|
|
# as we collect more data from the energy assessment
|
|
|
|
n_bathrooms = kwargs.get("n_bathrooms")
|
|
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
|
|
n_bathrooms = cls._safe_int(n_bathrooms) if n_bathrooms not in [None, ""] else None
|
|
|
|
n_bedrooms = kwargs.get("n_bedrooms")
|
|
n_bedrooms = cls._safe_int(n_bedrooms) if n_bedrooms not in [None, ""] else None
|
|
|
|
number_of_floors = kwargs.get("number_of_floors")
|
|
number_of_floors = cls._safe_int(number_of_floors) if number_of_floors not in [None, ""] else None
|
|
|
|
insulation_floor_area = kwargs.get("insulation_floor_area")
|
|
insulation_floor_area = float(insulation_floor_area) if insulation_floor_area not in [None, ""] else None
|
|
|
|
insulation_wall_area = kwargs.get("insulation_wall_area")
|
|
insulation_wall_area = float(insulation_wall_area) if insulation_wall_area not in [None, ""] else None
|
|
|
|
# We allow for the asset owner to provide us with total floor area, in the event of it being incorrect
|
|
floor_area = kwargs.get("floor_area")
|
|
floor_area = float(floor_area) if floor_area not in [None, ""] else None
|
|
|
|
return {
|
|
"n_bathrooms": n_bathrooms,
|
|
"n_bedrooms": n_bedrooms,
|
|
"number_of_floors": number_of_floors,
|
|
"insulation_floor_area": insulation_floor_area,
|
|
"insulation_wall_area": insulation_wall_area,
|
|
"building_id": kwargs.get("building_id", kwargs.get("landlord_block_reference", None)),
|
|
"floor_area": floor_area,
|
|
"landlord_property_id": kwargs.get("landlord_property_id"),
|
|
}
|
|
|
|
def parse_kwargs(self, kwargs):
|
|
# We extract the elements from kwargs that we recognise. Anything additional is ignored
|
|
for arg, val in kwargs.items():
|
|
if val is not None:
|
|
if arg == "floor_area":
|
|
self.owner_floor_area = True
|
|
setattr(self, arg, val)
|
|
|
|
def create_base_difference_epc_record(self, cleaned_lookup: dict):
|
|
"""
|
|
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
|
|
expected EPC
|
|
It will be the same starting and ending EPC, as we don't have the expected EPC yet
|
|
"""
|
|
|
|
fixed_data_col_names = [x.lower() for x in MANDATORY_FIXED_FEATURES + LATEST_FIELD]
|
|
|
|
fixed_data = {
|
|
k.replace("-", "_"): v
|
|
for k, v in vars(self.epc_record).items()
|
|
if k in fixed_data_col_names
|
|
}
|
|
|
|
difference_record = self.epc_record.create_epc_difference_record(self.epc_record, fixed_data)
|
|
|
|
# We have rare cases where entire description columns are missing. EpcRecords will convert this to None.
|
|
# Due to the sensitivity of the EPCDifferenceRecord creation to missing data, we will fill in these missing
|
|
# descriptions with and empty string, for the purpose of creating this scoring record
|
|
description_cols = [
|
|
x for x in difference_record.difference_record if
|
|
"_description" in x and difference_record.difference_record[x] is None
|
|
]
|
|
if description_cols:
|
|
for col in description_cols:
|
|
difference_record.difference_record[col] = ""
|
|
|
|
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
|
|
|
|
# If we have variables that have been given to us by the landlord that we know are correct, whereas the EPC
|
|
# may not be, we use them
|
|
if self.owner_floor_area:
|
|
self.base_difference_record.df["total_floor_area_ending"] = self.floor_area
|
|
self.base_difference_record.df["estimated_perimeter_ending"] = self.perimeter
|
|
|
|
def simulate_all_representative_recommendations(
|
|
self, property_representative_recommendations,
|
|
):
|
|
"""
|
|
This method was put together to simulate the impact of the representative recommendations on the property
|
|
all at once, for usage within the mds report
|
|
:return:
|
|
"""
|
|
|
|
recommendation_record = self.base_difference_record.df.to_dict("records")[
|
|
0
|
|
].copy()
|
|
|
|
scoring_dict = self.create_recommendation_scoring_data(
|
|
property_id=self.id,
|
|
recommendation_record=recommendation_record,
|
|
recommendations=property_representative_recommendations,
|
|
primary_recommendation_id=self.id,
|
|
)
|
|
|
|
return scoring_dict
|
|
|
|
def adjust_difference_record_with_recommendations(
|
|
self, property_recommendations, property_representative_recommendations
|
|
):
|
|
"""
|
|
This method will adjust the difference record, based on the recommendations made for the property
|
|
|
|
In order to score the measures, we need to consider the phase of the retrofit.
|
|
|
|
:param property_recommendations: dictionary of recommendations for the property
|
|
:param property_representative_recommendations: dictionary of representative recommendations for the property
|
|
"""
|
|
|
|
self.recommendations_scoring_data = []
|
|
self.simulation_epcs = {}
|
|
phases = sorted(
|
|
[
|
|
r[0]["phase"]
|
|
for r in property_recommendations
|
|
if r[0]["phase"] is not None
|
|
]
|
|
)
|
|
simulation_lodgment_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
|
|
for phase in phases:
|
|
property_recommendations_by_phase = [
|
|
r for r in property_recommendations if r[0]["phase"] == phase
|
|
][0]
|
|
previous_phases = [p for p in phases if p < phase]
|
|
previous_phase_representatives = [
|
|
r
|
|
for r in property_representative_recommendations
|
|
if r["phase"] in previous_phases
|
|
]
|
|
# For solid wall insulation, we will actually have 2 representative recommendations, since we consider
|
|
# both internal and external wall insulation as possible measures. We will use the representative that
|
|
# has the lowest efficiency.
|
|
# Take the representative with the lowest efficiency, by phase
|
|
|
|
# To be safe, we sort by phase
|
|
previous_phase_representatives = sorted(
|
|
previous_phase_representatives, key=lambda x: x["phase"]
|
|
)
|
|
|
|
previous_phase_representatives = [
|
|
min(group, key=lambda x: x["efficiency"])
|
|
for _, group in groupby(
|
|
previous_phase_representatives, key=lambda x: x["phase"]
|
|
)
|
|
]
|
|
|
|
recommendation_record = self.base_difference_record.df.to_dict("records")[
|
|
0
|
|
].copy()
|
|
|
|
recommendation_record["days_to_ending"] = EPCRecord._calculate_days_to(
|
|
lodgement_date=simulation_lodgment_date,
|
|
)
|
|
|
|
for rec in property_recommendations_by_phase:
|
|
# We simulate the impact of the recommendation at this current phase, and all of the prior phases
|
|
|
|
if rec["type"] in ["trickle_vents", "draught_proofing"]:
|
|
continue
|
|
|
|
scoring_dict = self.create_recommendation_scoring_data(
|
|
property_id=self.id,
|
|
recommendation_record=recommendation_record,
|
|
recommendations=previous_phase_representatives + [rec],
|
|
primary_recommendation_id=rec["recommendation_id"],
|
|
)
|
|
|
|
self.recommendations_scoring_data.append(scoring_dict)
|
|
|
|
simulation_epc = self.epc_record.to_dict(case="kebab", source="prepared")
|
|
|
|
types = [x["type"] for x in previous_phase_representatives]
|
|
if "external_wall_insulation" in types and "internal_wall_insulation" in types:
|
|
raise Exception("We shouldn't have this in the representative recommendations")
|
|
# We include previous phases + the recommendation itself in the EPC transformations
|
|
epc_transformations = [
|
|
x["description_simulation"] for x in previous_phase_representatives + [rec]
|
|
]
|
|
|
|
# It is possible that we could have two simulations applied to the same descriptions
|
|
# We extract these out
|
|
phase_epc_transformation = {}
|
|
for config in epc_transformations:
|
|
for k, v in config.items():
|
|
if k in phase_epc_transformation:
|
|
if "-energy-eff" in k:
|
|
# We take the highest value
|
|
if phase_epc_transformation[k] == "Very Good":
|
|
continue
|
|
elif phase_epc_transformation[k] == "Good":
|
|
if v == "Very Good":
|
|
phase_epc_transformation[k] = v
|
|
elif phase_epc_transformation[k] == "Average":
|
|
if v in ["Good", "Very Good"]:
|
|
phase_epc_transformation[k] = v
|
|
elif phase_epc_transformation[k] == "Poor":
|
|
if v in ["Average", "Good", "Very Good"]:
|
|
phase_epc_transformation[k] = v
|
|
else:
|
|
phase_epc_transformation[k] = v
|
|
|
|
continue
|
|
|
|
if phase_epc_transformation[k] == v:
|
|
continue
|
|
|
|
if k == "hotwater-description":
|
|
if (
|
|
v == "From main system"
|
|
) and (
|
|
phase_epc_transformation["mainheat-description"] == "Electric storage heaters"
|
|
) and (
|
|
"Electric immersion" in phase_epc_transformation["hotwater-description"]
|
|
):
|
|
# It means we've recommended HHR with electric immersion, and shouldn't overwrite
|
|
# the hot water description
|
|
continue
|
|
# Set the new value otherwise as it's due to already installed measures - do nothing
|
|
|
|
phase_epc_transformation[k] = v
|
|
simulation_epc.update(phase_epc_transformation)
|
|
self.simulation_epcs[rec["recommendation_id"]] = simulation_epc
|
|
|
|
def update_simulation_epcs(self, impact_summary):
|
|
"""
|
|
This method will insert the high level measures, such as SAP, heat demand, carbon, etc
|
|
:return:
|
|
"""
|
|
if self.simulation_epcs is None:
|
|
raise ValueError("Simulation EPCs have not been created")
|
|
|
|
rec_ids = list(self.simulation_epcs.keys())
|
|
updated_simulation_epcs = []
|
|
for rec_id in rec_ids:
|
|
sim_epc = self.simulation_epcs[rec_id].copy()
|
|
rec_impact = [x for x in impact_summary if x["recommendation_id"] == rec_id][0]
|
|
# We update all features that should have an impact on the kwh model
|
|
|
|
sim_epc.update(
|
|
{
|
|
# CO₂ emissions per square metre floor area per year in kg/m². Since CO₂ emissions are in tonnes
|
|
# per year, we multiply by 1000 to get kg/m²
|
|
"co2-emiss-curr-per-floor-area": round(
|
|
1000 * (rec_impact["carbon"] / self.epc_record.total_floor_area)
|
|
),
|
|
"co2-emissions-current": rec_impact["carbon"],
|
|
"current-energy-rating": sap_to_epc(rec_impact["sap"]),
|
|
"current-energy-efficiency": int(np.floor(rec_impact["sap"])),
|
|
"energy-consumption-current": rec_impact["heat_demand"],
|
|
"id": "+".join([str(self.id), rec_id])
|
|
}
|
|
)
|
|
updated_simulation_epcs.append(sim_epc)
|
|
|
|
# Now we havet this data inthe
|
|
self.updated_simulation_epcs = updated_simulation_epcs
|
|
|
|
@staticmethod
|
|
def create_recommendation_scoring_data(
|
|
property_id,
|
|
recommendation_record,
|
|
recommendations: list,
|
|
primary_recommendation_id: int,
|
|
):
|
|
"""
|
|
This function will iterate through a list of recommendations and apply a simulation for each recommendation
|
|
This allows us to later multiple measures and see the impact of the measures on the property
|
|
:param property_id: The id of the property
|
|
:param recommendation_record: The record of the property, which will be updated
|
|
:param recommendations: The list of recommendations to apply
|
|
:param primary_recommendation_id: The id of the primary recommendation, which is used to identify the record
|
|
:return: The updated recommendation record
|
|
"""
|
|
|
|
output = recommendation_record.copy()
|
|
|
|
for col in [
|
|
"walls_insulation_thickness",
|
|
"floor_insulation_thickness",
|
|
"roof_insulation_thickness",
|
|
]:
|
|
if output[col] is None:
|
|
output[col] = "none"
|
|
|
|
for recommendation in recommendations:
|
|
# For the list of recommendations we have, we iteratively update the output
|
|
|
|
if recommendation["type"] == "sealing_open_fireplace":
|
|
output["number_open_fireplaces_ending"] = 0
|
|
|
|
if recommendation["type"] == "low_energy_lighting":
|
|
output["low_energy_lighting_ending"] = 100
|
|
output["lighting_energy_eff_ending"] = "Very Good"
|
|
|
|
if recommendation["type"] in [
|
|
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating",
|
|
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
|
"cylinder_thermostat", "loft_insulation", "room_roof_insulation", "flat_roof_insulation",
|
|
"solid_floor_insulation", "suspended_floor_insulation", "mixed_glazing",
|
|
"windows_glazing", "mechanical_ventilation", "solar_pv", "sloping_ceiling_insulation"
|
|
]:
|
|
# We update the data, as defined in the recommendaton
|
|
for prefix in ["walls", "roof", "floor"]:
|
|
if output[f"{prefix}_insulation_thickness_ending"] is None:
|
|
output[f"{prefix}_insulation_thickness_ending"] = "none"
|
|
|
|
simulation_config = recommendation["simulation_config"].copy()
|
|
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning
|
|
# value
|
|
for key, value in simulation_config.items():
|
|
if value is None:
|
|
simulation_config[key] = "Unknown"
|
|
|
|
output.update(simulation_config)
|
|
|
|
if recommendation["type"] not in [
|
|
"sealing_open_fireplace", "low_energy_lighting",
|
|
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
|
|
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
|
|
"solid_floor_insulation", "suspended_floor_insulation",
|
|
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
|
|
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing",
|
|
"extension_cavity_wall_insulation", "mechanical_ventilation", "sloping_ceiling_insulation"
|
|
]:
|
|
raise NotImplementedError(
|
|
"Implement me, given type %s" % recommendation["type"]
|
|
)
|
|
|
|
output["id"] = "+".join([str(property_id), str(primary_recommendation_id)])
|
|
|
|
return output
|
|
|
|
def set_features(
|
|
self,
|
|
cleaned,
|
|
kwh_client,
|
|
kwh_predictions
|
|
):
|
|
"""
|
|
Given the cleaning that has been performed, we'll use this to identify the property
|
|
components, from roof to walls to windows, heating and hot water
|
|
:param cleaned: This is the dictionary of components found in cleaner.cleaned
|
|
:param kwh_client: The client that will be used to convert the energy costs to today's costs
|
|
:param kwh_predictions: Contains the kwh predictions for heating and hot water
|
|
:return:
|
|
"""
|
|
|
|
if not cleaned:
|
|
raise ValueError("Cleaner does not contain cleaned data")
|
|
|
|
if not self.epc_record:
|
|
raise ValueError("Property does not contain data")
|
|
|
|
for description, attribute in cleaned.items():
|
|
|
|
cleaner_cls = all_cleaner_map[description]
|
|
description_underscore = description.replace("-", "_")
|
|
|
|
if getattr(self.epc_record, description_underscore) in self.DATA_ANOMALY_MATCHES:
|
|
if description == "lighting-description":
|
|
cleaner_cls = cleaner_cls("", averages=None)
|
|
else:
|
|
cleaner_cls = cleaner_cls("")
|
|
fill_dict = {
|
|
"original_description": getattr(self.epc_record, description_underscore),
|
|
"clean_description": getattr(self.epc_record, description_underscore),
|
|
**cleaner_cls.process()
|
|
}
|
|
setattr(self, self.ATTRIBUTE_MAP[description], fill_dict)
|
|
continue
|
|
|
|
attributes = [
|
|
x
|
|
for x in cleaned[description]
|
|
if x["original_description"] == getattr(self.epc_record, description_underscore)
|
|
]
|
|
|
|
if len(attributes) > 1:
|
|
raise ValueError(
|
|
"Either No attributes or multiple found for %s" % description
|
|
)
|
|
|
|
if len(attributes) == 0:
|
|
# We attempt to perform the clean on the fly
|
|
if description == "lighting-description":
|
|
cleaner_cls = cleaner_cls(getattr(self.epc_record, description_underscore), averages=None)
|
|
else:
|
|
cleaner_cls = cleaner_cls(getattr(self.epc_record, description_underscore))
|
|
processed = {
|
|
"original_description": getattr(self.epc_record, description_underscore),
|
|
"clean_description": cleaner_cls.description.replace(
|
|
"(assumed)", ""
|
|
)
|
|
.rstrip()
|
|
.capitalize(),
|
|
**cleaner_cls.process(),
|
|
}
|
|
|
|
attributes = [processed]
|
|
|
|
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
|
|
|
|
self.set_basic_property_dimensions()
|
|
self.set_wall_type()
|
|
self.set_floor_type()
|
|
self.set_floor_level()
|
|
self.set_windows_count()
|
|
self.set_current_energy(kwh_client, kwh_predictions)
|
|
|
|
def set_solar_panel_configuration(self, solar_panel_configuration):
|
|
"""
|
|
This funtion inserts the solar panel configuration into the property object
|
|
"""
|
|
self.solar_panel_configuration = solar_panel_configuration
|
|
|
|
def set_current_energy(self, kwh_client, kwh_predictions):
|
|
"""
|
|
Given what we know about the property now, estimates the current energy consumption using the UCL paper
|
|
https://www.sciencedirect.com/science/article/pii/S0378778823002542
|
|
:return:
|
|
"""
|
|
|
|
# We get the following things:
|
|
# 1) Today's cost. This give us a basline figure for what the cost is today
|
|
# 2) Predicted KwH
|
|
|
|
# Today's costs
|
|
todays_lighting_cost = kwh_client.convert_cost_to_today(
|
|
original_cost=float(self.epc_record.lighting_cost_current),
|
|
lodgement_date=pd.Timestamp(self.epc_record.lodgement_date).tz_localize(None)
|
|
)
|
|
|
|
# If we have the kwh figures, we don't need to predict them
|
|
condition_data = self.energy_assessment_condition_data
|
|
|
|
heating_kwh_predictions = kwh_predictions["heating_kwh_predictions"]
|
|
hotwater_kwh_predictions = kwh_predictions["hotwater_kwh_predictions"]
|
|
|
|
heating_prediction = (
|
|
condition_data.get("space_heating_kwh") if condition_data.get("space_heating_kwh") is not None else
|
|
heating_kwh_predictions[
|
|
heating_kwh_predictions["id"].astype(int) == self.uprn
|
|
]["predictions"].values[0]
|
|
)
|
|
|
|
hot_water_prediction = (
|
|
condition_data.get("water_heating_kwh") if condition_data.get("water_heating_kwh") is not None else
|
|
hotwater_kwh_predictions[
|
|
hotwater_kwh_predictions["id"].astype(int) == self.uprn
|
|
]["predictions"].values[0]
|
|
)
|
|
|
|
# We convert the lighting cost into kwh, just using the price cap
|
|
lighting_kwh = todays_lighting_cost / AnnualBillSavings.ELECTRICITY_PRICE_CAP
|
|
|
|
appliances_kwh = AnnualBillSavings.estimate_appliances_energy_use(total_floor_area=self.floor_area)
|
|
|
|
unadjusted_kwh_estimates = {
|
|
"heating": float(heating_prediction),
|
|
"hot_water": float(hot_water_prediction),
|
|
"lighting": float(lighting_kwh),
|
|
"appliances": float(appliances_kwh)
|
|
}
|
|
|
|
unadjusted_heating_costs = {
|
|
"heating": None,
|
|
"hot_water": None,
|
|
"lighting": float(todays_lighting_cost),
|
|
"appliances": float(appliances_kwh) * AnnualBillSavings.ELECTRICITY_PRICE_CAP
|
|
}
|
|
|
|
# Sum up the adjusted kwh figures
|
|
self.current_energy_consumption = sum(unadjusted_kwh_estimates.values())
|
|
self.current_energy_consumption_heating_hotwater = (
|
|
unadjusted_kwh_estimates["heating"] + unadjusted_kwh_estimates["hot_water"]
|
|
)
|
|
|
|
self.energy_cost_estimates = {
|
|
"unadjusted": unadjusted_heating_costs,
|
|
}
|
|
|
|
self.energy_consumption_estimates = {
|
|
"unadjusted": unadjusted_kwh_estimates
|
|
}
|
|
|
|
# Update carbon with appliances
|
|
self.energy["appliances_co2_emissions"] = (
|
|
(unadjusted_kwh_estimates["appliances"] * assumptions.ELECTRICITY_CARBON_INTENSITY) / 1000
|
|
)
|
|
# Re-calculate total CO2 emissions
|
|
self.energy["co2_emissions"] = float(np.round(
|
|
self.energy["epc_co2_emissions"] + self.energy["appliances_co2_emissions"], 2
|
|
))
|
|
|
|
def set_spatial(self, spatial: pd.DataFrame):
|
|
"""
|
|
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
|
|
|
|
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
|
|
|
|
:param spatial: Dataframe, containing the spatial data for the property
|
|
"""
|
|
self.in_conservation_area = spatial["conservation_status"].values[0]
|
|
self.is_listed = spatial["is_listed_building"].values[0]
|
|
self.is_heritage = spatial["is_heritage_building"].values[0]
|
|
|
|
# We do an equals True, in the case of one of these variables being True
|
|
if (
|
|
(self.in_conservation_area == True)
|
|
| (self.is_listed == True)
|
|
| (self.is_heritage == True)
|
|
):
|
|
self.restricted_measures = True
|
|
|
|
spatial_dict = spatial.to_dict("records")[0]
|
|
self.spatial = {
|
|
"x_coordinate": spatial_dict["X_COORDINATE"],
|
|
"y_coordinate": spatial_dict["Y_COORDINATE"],
|
|
"latitude": spatial_dict["LATITUDE"],
|
|
"longitude": spatial_dict["LONGITUDE"],
|
|
"conservation_status": spatial_dict["conservation_status"],
|
|
"is_listed_building": spatial_dict["is_listed_building"],
|
|
"is_heritage_building": spatial_dict["is_heritage_building"],
|
|
}
|
|
|
|
def _clean_upload_data(self, to_update):
|
|
for k, v in to_update.items():
|
|
if v in self.DATA_ANOMALY_MATCHES:
|
|
to_update[k] = None
|
|
return to_update
|
|
|
|
def get_full_property_data(self, current_valuation=None, needs_rebaselining=False, rebaselining_sap=0):
|
|
"""
|
|
This method extracts the data which is pushed to the database, containing core information, from the EPC
|
|
about a property
|
|
:return:
|
|
"""
|
|
|
|
current_sap_rating = float(self.epc_record.current_energy_efficiency)
|
|
if needs_rebaselining:
|
|
current_sap_rating += rebaselining_sap
|
|
|
|
current_epc_rating = sap_to_epc(current_sap_rating)
|
|
|
|
property_data = {
|
|
"creation_status": "READY",
|
|
"uprn": int(self.epc_record.uprn),
|
|
"building_reference_number": (
|
|
int(self.epc_record.building_reference_number) if
|
|
self.epc_record.building_reference_number is not None else None
|
|
),
|
|
"has_pre_condition_report": True,
|
|
"has_recommendations": True,
|
|
"property_type": self.epc_record.property_type,
|
|
"built_form": self.epc_record.built_form,
|
|
"local_authority": self.epc_record.local_authority_label,
|
|
"constituency": self.epc_record.constituency_label,
|
|
"number_of_rooms": self.number_of_rooms,
|
|
"year_built": self.year_built,
|
|
"tenure": self.epc_record.tenure,
|
|
"current_epc_rating": current_epc_rating,
|
|
"current_sap_points": current_sap_rating,
|
|
"current_valuation": current_valuation,
|
|
"original_sap_points": self.epc_record.original_epc["current-energy-efficiency"],
|
|
"is_sap_points_adjusted_for_installed_measures": needs_rebaselining,
|
|
"installed_measures_sap_point_adjustment": rebaselining_sap,
|
|
}
|
|
|
|
property_data = self._clean_upload_data(property_data)
|
|
|
|
return property_data
|
|
|
|
@classmethod
|
|
def _prepare_rating_field(cls, field):
|
|
"""
|
|
Utility function for usage in the lambda, for preparing the _rating fields
|
|
"""
|
|
return (
|
|
rating_lookup[field].value
|
|
if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None)
|
|
else None
|
|
)
|
|
|
|
def get_property_details_epc(
|
|
self, portfolio_id: int, needs_rebaselining: bool = False, rebaselining_carbon: float = 0,
|
|
rebaselining_heat_demand: float = 0, rebaselining_kwh: float = 0, rebaselining_bills: float = 0
|
|
):
|
|
|
|
if self.current_energy_bill is None:
|
|
raise ValueError("Current energy bill has not been set")
|
|
|
|
# IF we have a SAP05 overwrite, we pull out the relevant information
|
|
sap_05_overwritten = self.epc_record.sap_05_overwritten
|
|
|
|
sap_05_score, sap_05_epc_rating = None, None
|
|
if sap_05_overwritten:
|
|
if not self.old_data:
|
|
# Trying to fetch SAP05 EPC but no data
|
|
raise ValueError("Trying to fetch SAP05 EPC but no old data available")
|
|
# We get the last rating from the old data
|
|
newest_old_epc = max(self.old_data, key=lambda d: pd.to_datetime(d["lodgement-date"]))
|
|
# Get the rating and score
|
|
sap_05_score = int(newest_old_epc["current-energy-efficiency"])
|
|
sap_05_epc_rating = newest_old_epc["current-energy-rating"]
|
|
|
|
lodgement_date = self.epc_record.lodgement_date
|
|
# We check if the lodgement date is more than 10 years old
|
|
is_expired = self.epc_is_expired
|
|
|
|
# Handle re-baselining
|
|
co2_emissions = self.energy["co2_emissions"]
|
|
primary_energy_consumption = self.energy["primary_energy_consumption"]
|
|
current_kwh_demand = self.current_energy_consumption
|
|
current_kwh_heating_hotwater = self.current_energy_consumption_heating_hotwater
|
|
if needs_rebaselining:
|
|
# Carbon will be reduced
|
|
co2_emissions -= rebaselining_carbon
|
|
# Heat demand will be reduced
|
|
primary_energy_consumption -= rebaselining_heat_demand
|
|
current_kwh_demand -= rebaselining_kwh
|
|
current_kwh_heating_hotwater -= rebaselining_kwh
|
|
|
|
property_details_epc = {
|
|
"property_id": self.id,
|
|
"portfolio_id": portfolio_id,
|
|
"lodgement_date": datetime.fromisoformat(lodgement_date),
|
|
"is_expired": is_expired,
|
|
"full_address": self.epc_record.address,
|
|
"total_floor_area": float(self.epc_record.total_floor_area),
|
|
"walls": self.walls["clean_description"],
|
|
"walls_rating": self._prepare_rating_field(self.epc_record.walls_energy_eff),
|
|
"roof": self.roof["clean_description"],
|
|
"roof_rating": self._prepare_rating_field(self.epc_record.roof_energy_eff),
|
|
"floor": self.floor["clean_description"],
|
|
"floor_rating": self._prepare_rating_field(self.epc_record.floor_energy_eff),
|
|
"windows": self.windows["clean_description"],
|
|
"windows_rating": self._prepare_rating_field(self.epc_record.windows_energy_eff),
|
|
"heating": self.main_heating["clean_description"],
|
|
"heating_rating": self._prepare_rating_field(self.epc_record.mainheat_energy_eff),
|
|
"heating_controls": self.main_heating_controls["clean_description"],
|
|
"heating_controls_rating": self._prepare_rating_field(self.epc_record.mainheatc_energy_eff),
|
|
"hot_water": self.hotwater["clean_description"],
|
|
"hot_water_rating": self._prepare_rating_field(self.epc_record.hot_water_energy_eff),
|
|
"lighting": self.lighting["clean_description"],
|
|
"lighting_rating": self._prepare_rating_field(self.epc_record.lighting_energy_eff),
|
|
"mainfuel": self.main_fuel["clean_description"],
|
|
"ventilation": self.epc_record.mechanical_ventilation,
|
|
"solar_pv": self.epc_record.photo_supply,
|
|
"solar_hot_water": self.epc_record.solar_water_heating_flag_bool,
|
|
"wind_turbine": self.epc_record.wind_turbine_count,
|
|
"floor_height": self.floor_height,
|
|
"heat_loss_corridor": self.epc_record.heat_loss_corridor_bool,
|
|
"unheated_corridor_length": self.epc_record.unheated_corridor_length,
|
|
"number_of_open_fireplaces": self.epc_record.number_open_fireplaces,
|
|
"number_of_extensions": self.epc_record.extension_count,
|
|
"number_of_storeys": self.epc_record.flat_storey_count,
|
|
"mains_gas": self.mains_gas,
|
|
"energy_tariff": self.epc_record.energy_tariff,
|
|
"primary_energy_consumption": primary_energy_consumption,
|
|
"co2_emissions": co2_emissions,
|
|
"current_energy_demand": current_kwh_demand, # This is kwh - naming is confusing
|
|
"current_energy_demand_heating_hotwater": current_kwh_heating_hotwater, # This is kwh
|
|
"estimated": self.epc_record.estimated,
|
|
# We indicate if we've overwritten a SAP 05 EPC
|
|
"sap_05_overwritten": sap_05_overwritten,
|
|
"sap_05_score": sap_05_score,
|
|
"sap_05_epc_rating": sap_05_epc_rating,
|
|
**self.current_energy_bill,
|
|
"original_co2_emissions": self.energy["co2_emissions"],
|
|
"original_primary_energy_consumption": self.energy["primary_energy_consumption"],
|
|
"original_current_energy_demand": self.current_energy_consumption, # Bad naming, this is kwh
|
|
"original_current_energy_demand_heating_hotwater": self.current_energy_consumption_heating_hotwater, # kwh
|
|
"installed_measures_co2_adjustment": rebaselining_carbon,
|
|
"installed_measures_energy_demand_adjustment": rebaselining_kwh, # kwh
|
|
"installed_measures_total_energy_bill_adjustment": rebaselining_bills,
|
|
"installed_measures_heat_demand_adjustment": rebaselining_heat_demand,
|
|
"is_epc_adjusted_for_installed_measures": needs_rebaselining,
|
|
# Re-baselining variables - to replace already installed variables entirely
|
|
"lodged_co2_emissions": float(self.epc_record.original_epc["co2-emissions-current"]),
|
|
"lodged_heat_demand": float(self.epc_record.original_epc["energy-consumption-current"]),
|
|
"has_been_remodelled": self.epc_record.has_been_remodelled,
|
|
}
|
|
|
|
return property_details_epc
|
|
|
|
def get_spatial_data(self, uprn_filenames):
|
|
"""
|
|
Given a property's UPRN, this method will pull the associated spatial data from s3
|
|
:return:
|
|
"""
|
|
|
|
if self.uprn is None:
|
|
logger.warning(
|
|
"We do not have a UPRN for this property - this needs to be implemented"
|
|
)
|
|
self.in_conservation_area = False
|
|
self.is_listed = False
|
|
self.is_heritage = False
|
|
self.restricted_measures = True
|
|
return
|
|
|
|
# We get the file name for the uprn
|
|
filtered_df = uprn_filenames[
|
|
(uprn_filenames["lower"] <= self.uprn)
|
|
& (uprn_filenames["upper"] >= self.uprn)
|
|
]
|
|
if filtered_df.empty:
|
|
logger.warning("Could not find file containing UPRNS")
|
|
return None
|
|
|
|
filename = filtered_df.iloc[0]["filenames"]
|
|
|
|
spatial_data = read_dataframe_from_s3_parquet(
|
|
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
|
|
)
|
|
|
|
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
|
|
|
|
# Pull out spatial features
|
|
self.set_spatial(spatial)
|
|
|
|
def _filter_property_dimensions(self, property_dimensions):
|
|
"""
|
|
Will filter the property dimensions dataframe to only include the relevant rows for the property
|
|
:param property_dimensions:
|
|
:return: filtered property dimensions dataframe
|
|
"""
|
|
|
|
result = property_dimensions[
|
|
(property_dimensions["PROPERTY_TYPE"] == self.epc_record.property_type)
|
|
]
|
|
|
|
if (
|
|
self.construction_age_band is not None
|
|
and self.construction_age_band not in self.DATA_ANOMALY_MATCHES
|
|
):
|
|
result = result[
|
|
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
|
|
]
|
|
|
|
if (
|
|
self.epc_record.built_form not in self.DATA_ANOMALY_MATCHES
|
|
and self.epc_record.built_form in result["BUILT_FORM"]
|
|
):
|
|
result = result[(result["BUILT_FORM"] == self.epc_record.built_form)]
|
|
|
|
return result[
|
|
["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]
|
|
].mean()
|
|
|
|
def set_basic_property_dimensions(self):
|
|
"""
|
|
This method sets the number of floors of the property, using a simple approach based on an estimate for
|
|
average room size, number of rooms and total floor area
|
|
|
|
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
|
|
number of rooms and total floor area
|
|
|
|
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
|
|
medians across the EPC data
|
|
:return:
|
|
"""
|
|
# Many of these pieces of information are now contained in the condition data
|
|
condition_data = self.energy_assessment_condition_data.copy()
|
|
|
|
# We can update the number of floors if we have this information in the condition data
|
|
self.number_of_floors = int(self.energy_assessment_condition_data["number_of_floors"]) \
|
|
if (condition_data.get("number_of_floors") is not None) and (self.number_of_floors is not None) \
|
|
else self.number_of_floors
|
|
|
|
# If we already have this, we re-engineer the perimeter
|
|
if self.insulation_floor_area is not None:
|
|
self.perimeter = np.sqrt(self.insulation_floor_area) * 4
|
|
else:
|
|
self.perimeter = float(self.energy_assessment_condition_data["perimeter"]) \
|
|
if condition_data.get("perimeter") is not None \
|
|
else estimate_perimeter(
|
|
floor_area=self.floor_area / self.number_of_floors,
|
|
num_rooms=self.number_of_rooms / self.number_of_floors
|
|
)
|
|
|
|
self.insulation_wall_area = float(self.energy_assessment_condition_data["insulation_wall_area"]) \
|
|
if (condition_data.get("insulation_wall_area") is not None) and (self.insulation_wall_area is not None) \
|
|
else estimate_external_wall_area(
|
|
num_floors=self.number_of_floors,
|
|
floor_height=self.floor_height,
|
|
perimeter=self.perimeter,
|
|
built_form=self.epc_record.built_form,
|
|
)
|
|
|
|
if self.insulation_floor_area is None:
|
|
self.insulation_floor_area = float(
|
|
self.energy_assessment_condition_data["main_dwelling_ground_floor_area"]
|
|
) if (condition_data.get("main_dwelling_ground_floor_area") is not None) else (
|
|
self.floor_area / self.number_of_floors
|
|
)
|
|
|
|
if not self.roof["is_flat"]:
|
|
self.roof_area = estimate_pitched_roof_area(
|
|
floor_area=self.insulation_floor_area,
|
|
)
|
|
else:
|
|
self.roof_area = self.insulation_floor_area
|
|
|
|
def set_floor_level(self):
|
|
self.floor_level = (
|
|
FLOOR_LEVEL_MAP[self.epc_record.floor_level]
|
|
if self.epc_record.floor_level not in self.DATA_ANOMALY_MATCHES
|
|
and self.epc_record.floor_level is not None
|
|
else None
|
|
)
|
|
|
|
if self.floor_level is None:
|
|
|
|
if self.epc_record.property_type != "Flat":
|
|
return
|
|
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
else:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
|
|
# where a property is marked as being on the first floor
|
|
if self.floor_level > 0:
|
|
|
|
# We check if there is another property below (for a non-sap assessment)
|
|
if not self.floor["another_property_below"] and self.floor["thermal_transmittance_unit"] is None:
|
|
self.floor_level = 0
|
|
return
|
|
|
|
if self.floor_level == 0:
|
|
# Check if another property below
|
|
if self.floor["another_property_below"]:
|
|
self.floor_level = 1
|
|
return
|
|
|
|
def set_wall_type(self):
|
|
"""
|
|
This method sets the wall type of the property, using a simple approach based on the wall description
|
|
:return:
|
|
"""
|
|
self.wall_type = get_wall_type(**self.walls)
|
|
|
|
def set_floor_type(self):
|
|
"""
|
|
This method sets the floor type of the property, which is used for calculating u-values
|
|
|
|
Section 5.6 of the BRE indicates that
|
|
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
|
|
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
|
|
and the U-values in Table S12 are used.
|
|
|
|
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
|
|
calculating u-values
|
|
"""
|
|
|
|
if self.floor["is_suspended"] | self.floor["another_property_below"]:
|
|
self.floor_type = "suspended"
|
|
elif self.floor["is_solid"]:
|
|
self.floor_type = "solid"
|
|
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
|
|
self.floor_type = "exposed_floor"
|
|
elif self.floor["thermal_transmittance"] is not None:
|
|
self.floor_type = "solid"
|
|
else:
|
|
# in this case, it's not super clear what the floor type is, so we default - this is a temp
|
|
logger.warning(
|
|
f"Could not determine floor type, given: '{self.floor['original_description']}', defaulting to "
|
|
f"suspended for property {self.uprn}"
|
|
)
|
|
self.floor_type = "suspended"
|
|
|
|
def set_windows_count(self):
|
|
"""
|
|
Using the estimate_windows function, this method will set the number of windows in the property
|
|
:return:
|
|
"""
|
|
|
|
condition_data = self.energy_assessment_condition_data.copy()
|
|
|
|
self.number_of_windows = int(condition_data["number_of_windows"]) \
|
|
if condition_data.get("number_of_windows") is not None \
|
|
else estimate_windows(
|
|
property_type=self.epc_record.property_type,
|
|
built_form=self.epc_record.built_form,
|
|
construction_age_band=self.construction_age_band,
|
|
floor_area=self.floor_area,
|
|
number_habitable_rooms=self.number_of_rooms,
|
|
)
|
|
|
|
self.windows_area = float(condition_data["windows_area"]) \
|
|
if condition_data.get("windows_area") is not None \
|
|
else None
|
|
|
|
def is_ashp_valid(self, measures):
|
|
|
|
if "air_source_heat_pump" in self.non_invasive_recommendations:
|
|
return True
|
|
|
|
if "air_source_heat_pump" not in measures:
|
|
return False
|
|
|
|
# If we have a house over a floor area threshold, we recommend an ASHP
|
|
if (
|
|
self.epc_record.property_type in ["House", "Bungalow"] and
|
|
self.floor_area > assumptions.ASHP_FLOOR_AREA_THRESHOLD
|
|
):
|
|
return True
|
|
|
|
suitable_property_type = (
|
|
self.epc_record.property_type in ["House", "Bungalow"] and
|
|
self.epc_record.built_form not in ["Enclosed Mid-Terrace", "Enclosed End-Terrace"]
|
|
)
|
|
|
|
has_air_source_heat_pump = self.main_heating["has_air_source_heat_pump"]
|
|
|
|
return suitable_property_type and not has_air_source_heat_pump
|
|
|
|
def is_solar_pv_valid(self):
|
|
|
|
# If the property is a flat but we are looking at building solar potential, we can include this
|
|
if (self.building_id is not None) and (self.solar_panel_configuration is not None):
|
|
return True
|
|
|
|
# If the property is in a conservation area, is listed or is a heriage building, solar panels
|
|
# become a difficult measure to generally get through planning restrictions and so we do not recommend
|
|
# solar panels
|
|
if self.is_listed or self.is_heritage:
|
|
# If the property is in a conservation area, we can still recommend solar panels
|
|
# but they need to be done in a way that is sympathetic to the building. E.g. the panels
|
|
# may be installed such that they are not visible from the street
|
|
return False
|
|
|
|
if (self.epc_record.property_type in ["House", "Bungalow"]) and (
|
|
not pd.isnull(self.roof["thermal_transmittance"])
|
|
):
|
|
return True
|
|
|
|
is_valid_property_type = self.epc_record.property_type in ["House", "Bungalow", "Maisonette"]
|
|
is_valid_roof_type = (
|
|
self.roof["is_flat"] or self.roof["is_pitched"] or self.roof["is_roof_room"]
|
|
)
|
|
# If there is no existing solar PV, the photo-supply field will be None or a missing value
|
|
|
|
# We use inspections data to tell us this
|
|
|
|
if getattr(self.inspections, "roof_orientation", None):
|
|
has_no_existing_solar_pv = self.inspections.roof_orientation.value not in [
|
|
"already has solar pv", "roof too small", "no roof"
|
|
]
|
|
else:
|
|
has_no_existing_solar_pv = self.epc_record.photo_supply in [
|
|
None, 0, self.DATA_ANOMALY_MATCHES
|
|
]
|
|
|
|
return is_valid_property_type and is_valid_roof_type and has_no_existing_solar_pv
|
|
|
|
def estimate_electrical_consumption(self, assumed_ashp_efficiency, exclusions):
|
|
"""
|
|
Given a property, this method estimates the electrical consumption of the property, based on the energy
|
|
consumption, the assumed efficiency of an ASHP and the exclusions.
|
|
|
|
What we're trying to do here is size up the future electricicty demand of the property, assuming that the
|
|
home is eligible for an ASHP. If the property is not eligible for an ASHP, we don't need to adjust the
|
|
consumption.
|
|
|
|
This figure is used to size up solar panels, so they can cover heat generation, even if the property
|
|
today doesn't generate its heat from electricity
|
|
|
|
:param assumed_ashp_efficiency:
|
|
:param exclusions:
|
|
:return:
|
|
"""
|
|
|
|
exclusions = [] if exclusions is None else exclusions
|
|
if "air_source_heat_pump" in exclusions:
|
|
return self.current_energy_consumption
|
|
|
|
# If the property currently has an ASHP, we don't gain from any efficiency improvements
|
|
if not self.is_ashp_valid(measures=["air_source_heat_pump"]):
|
|
return self.current_energy_consumption
|
|
|
|
heating_consumption = self.energy_consumption_estimates["unadjusted"]["heating"]
|
|
hotwater_consumption = self.energy_consumption_estimates["unadjusted"]["hot_water"]
|
|
|
|
# Adjust the heating consumption to reflect the expected efficiency of an ASHP - broadly 3.0 COP
|
|
heating_consumption = heating_consumption / (assumed_ashp_efficiency / 100)
|
|
|
|
# Adjust the hot water consumption to reflect the expected efficiency of an ASHP
|
|
hotwater_consumption = hotwater_consumption / (assumed_ashp_efficiency / 100)
|
|
|
|
electric_consumption = (
|
|
heating_consumption +
|
|
hotwater_consumption +
|
|
self.energy_consumption_estimates["unadjusted"]["lighting"] +
|
|
self.energy_consumption_estimates["unadjusted"]["appliances"]
|
|
)
|
|
|
|
return electric_consumption
|
|
|
|
def insert_funding(
|
|
self,
|
|
scheme,
|
|
funded_measures,
|
|
project_funding,
|
|
total_uplift,
|
|
full_project_score,
|
|
partial_project_score,
|
|
uplift_project_score
|
|
):
|
|
"""
|
|
This method inserts the funding into the property object
|
|
"""
|
|
self.scheme = scheme
|
|
self.funded_measures = funded_measures
|
|
self.project_funding = project_funding
|
|
self.total_uplift = total_uplift
|
|
self.full_project_score = full_project_score
|
|
self.partial_project_score = partial_project_score
|
|
self.uplift_project_score = uplift_project_score
|
|
|
|
def identify_ventilation(self):
|
|
|
|
return self.epc_record.mechanical_ventilation in {
|
|
'mechanical, extract only',
|
|
'mechanical, supply and extract'
|
|
}
|
|
|
|
@property
|
|
def epc_is_expired(self) -> bool:
|
|
"""
|
|
This property indicates that the EPC is expired. This is based on the lodgement date, where an EPC is
|
|
valid for 10 years.
|
|
:return: boolean indicating whether the EPC is expired
|
|
"""
|
|
lodgement_date = self.epc_record.lodgement_date
|
|
return (datetime.now() - pd.to_datetime(lodgement_date)) > timedelta(days=3650)
|
|
|
|
@property
|
|
def epc_is_estimated(self) -> bool:
|
|
"""
|
|
This property indicates that the EPC is estimated, based on the presence of the "estimated" flag in the data
|
|
:return: boolean indicating whether the EPC is estimated
|
|
"""
|
|
return self.epc_record.estimated
|