import os import ast from itertools import groupby import pandas as pd from datetime import datetime, timedelta from etl.epc.Dataset import TrainingDataset from etl.epc.Record import EPCRecord from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map from etl.solar.SolarPhotoSupply import SolarPhotoSupply from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet from etl.epc.settings import DATA_ANOMALY_MATCHES from recommendations.rdsap_tables import FLOOR_LEVEL_MAP from recommendations.recommendation_utils import ( estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows, ) from backend.ml_models.AnnualBillSavings import AnnualBillSavings ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev") DATA_BUCKET = os.environ.get( "DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None ) logger = setup_logger() class Property: ATTRIBUTE_MAP = { "floor-description": "floor", "hotwater-description": "hotwater", "main-fuel": "main_fuel", "mainheat-description": "main_heating", "mainheatcont-description": "main_heating_controls", "roof-description": "roof", "walls-description": "walls", "windows-description": "windows", "lighting-description": "lighting", } floor = None hotwater = None main_fuel = None main_heating = None main_heating_controls = None roof = None walls = None windows = None lighting = None energy_source = None spatial = None base_difference_record = None DATA_ANOMALY_MATCHES = DATA_ANOMALY_MATCHES # Surplus information, that can be provided as optional inputs, by a customer n_bathrooms = None n_bedrooms = None building_id = None # Used to group properties together into a single building # Contains the solar panel optimisation results from the Google Solar API solar_panel_configuration = None def __init__( self, id, postcode, address, epc_record, already_installed=None, non_invasive_recommendations=None, measures=None, **kwargs ): self.epc_record = epc_record self.id = id self.address = address self.postcode = postcode self.data = { k.replace("_", "-"): v for k, v in epc_record.get("prepared_epc").items() } self.old_data = epc_record.get("old_data") self.property_dimensions = None # This is a list of measures that have already been installed in the property, typically found as a result # of the non-invasive surveys. We reflect that this has been installed in the recommendations, but remove the # cost and instead, provide a message that the measure has already been installed self.already_installed = ast.literal_eval(already_installed['already_installed']) if already_installed else [] self.non_invasive_recommendations = ( ast.literal_eval(non_invasive_recommendations['recommendations']) if non_invasive_recommendations else [] ) # This is a list of measures that have been recommended for the property if isinstance(measures, list): self.measures = measures else: self.measures = ast.literal_eval(measures) if measures else None self.uprn = epc_record.get("uprn") self.full_sap_epc = epc_record.get("full_sap_epc") self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None self.restricted_measures = False self.year_built = epc_record.get("year_built") self.number_of_rooms = epc_record.prepared_epc.get("number_habitable_rooms") self.age_band = epc_record.get("age_band") self.construction_age_band = epc_record.get("construction_age_band") self.number_of_floors = epc_record.get("number_of_floors") self.perimeter = None self.wall_type = None self.floor_type = None self.energy_cost_estimates = {} self.energy_consumption_estimates = {} self.energy = { "primary_energy_consumption": epc_record.get("energy_consumption_current"), "co2_emissions": epc_record.get("co2_emissions_current"), } self.ventilation = { "ventilation": epc_record.get("mechanical_ventilation"), } self.solar_pv = { "solar_pv": epc_record.get("photo_supply"), } self.solar_hot_water = { "solar_hot_water": epc_record.get("solar_water_heating_flag"), "solar_hot_water_boolean": epc_record.get("solar_water_heating_flag_bool"), } self.wind_turbine = { "wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"), } self.number_of_open_fireplaces = { "number_of_open_fireplaces": epc_record.prepared_epc.get( "number_open_fireplaces" ), } self.number_of_extensions = { "number_of_extensions": epc_record.prepared_epc.get("extension_count"), } self.number_of_storeys = { "number_of_storeys": epc_record.prepared_epc.get("flat_storey_count"), } self.heat_loss_corridor = { "heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"), "length": epc_record.prepared_epc.get("unheated_corridor_length"), "heat_loss_corridor_boolean": epc_record.get("heat_loss_corridor_bool"), } self.mains_gas = epc_record.prepared_epc.get("mains_gas_flag") self.floor_height = epc_record.prepared_epc.get("floor_height") self.insulation_wall_area = None self.floor_area = epc_record.prepared_epc.get("total_floor_area") self.pitched_roof_area = None self.insulation_floor_area = None self.number_lighting_outlets = epc_record.prepared_epc.get( "fixed_lighting_outlets_count" ) self.floor_level = None self.number_of_windows = None self.solar_pv_percentage = None self.current_adjusted_energy = None self.expected_adjusted_energy = None self.current_energy_bill = None self.expected_energy_bill = None self.heating_energy_source = None self.hot_water_energy_source = None self.recommendations_scoring_data = [] self.simulation_epcs = {} self.parse_kwargs(kwargs) @classmethod def extract_kwargs(cls, kwargs): """ This method is to be used in the router, to extract the kwargs from the request and prevent any errors such as non-integer values, or inputs that clash with the __init__ method of this class :param kwargs: :return: """ n_bathrooms = kwargs.get("n_bathrooms", None) if n_bathrooms not in [None, ""]: # We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5 n_bathrooms = int(round(float(n_bathrooms) + 1e-5)) n_bedrooms = kwargs.get("n_bedrooms", None) if n_bedrooms not in [None, ""]: n_bedrooms = int(round(float(n_bedrooms) + 1e-5)) return { "n_bathrooms": n_bathrooms, "n_bedrooms": n_bedrooms, "building_id": kwargs.get("building_id", None), } def parse_kwargs(self, kwargs): # We extract the elements from kwargs that we recognise. Anything additional is ignored self.n_bathrooms = kwargs.get("n_bathrooms", None) self.n_bedrooms = kwargs.get("n_bedrooms", None) self.building_id = kwargs.get("building_id", None) def create_base_difference_epc_record(self, cleaned_lookup: dict): """ Creates a EPCDifferenceRecord object, which is used to store the difference between the current and expected EPC It will be the same starting and ending EPC, as we don't have the expected EPC yet """ # difference_record = self.epc_record - self.epc_record # TODO: change these lower and replace in the settings file # print( # "CHANGE THE LATEST FIELD TO REMOVE NUMBER HABITABLE ROOMS IF WE WANT TO USE STARTING/ENDING" # ) fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD # print("NEED TO CHANGE THE DASH TO LOWER CASE") fixed_data_col_names = [ x.lower().replace("_", "-") for x in fixed_data_col_names ] fixed_data = { k.replace("-", "_"): v for k, v in self.data.items() if k in fixed_data_col_names } # difference_record.append_fixed_data(fixed_data) difference_record = self.epc_record.create_EPCDifferenceRecord( self.epc_record, fixed_data ) self.base_difference_record = TrainingDataset( datasets=[difference_record], cleaned_lookup=cleaned_lookup ) # TODO: adjust the base difference record with the previously calculated u values + features # estimated_perimeter is different to the perimeter in the epc record # self.base_difference_record.df def simulate_all_representative_recommendations( self, property_representative_recommendations, ): """ This method was put together to simulate the impact of the representative recommendations on the property all at once, for usage within the mds report :return: """ recommendation_record = self.base_difference_record.df.to_dict("records")[ 0 ].copy() scoring_dict = self.create_recommendation_scoring_data( property_id=self.id, recommendation_record=recommendation_record, recommendations=property_representative_recommendations, primary_recommendation_id=self.id, non_invasive_recommendations=self.non_invasive_recommendations, ) return scoring_dict def adjust_difference_record_with_recommendations( self, property_recommendations, property_representative_recommendations ): """ This method will adjust the difference record, based on the recommendations made for the property In order to score the measures, we need to consider the phase of the retrofit. :param property_recommendations: dictionary of recommendations for the property :param property_representative_recommendations: dictionary of representative recommendations for the property """ self.recommendations_scoring_data = [] self.simulation_epcs = {} phases = sorted( [ r[0]["phase"] for r in property_recommendations if r[0]["phase"] is not None ] ) simulation_lodgment_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") for phase in phases: property_recommendations_by_phase = [ r for r in property_recommendations if r[0]["phase"] == phase ][0] previous_phases = [p for p in phases if p < phase] previous_phase_representatives = [ r for r in property_representative_recommendations if r["phase"] in previous_phases ] # For solid wall insulation, we will actually have 2 representative recommendations, since we consider # both internal and external wall insulation as possible measures. We will use the representative that # has the lowest efficiency. # Take the representative with the lowest efficiency, by phase # To be safe, we sort by phase previous_phase_representatives = sorted( previous_phase_representatives, key=lambda x: x["phase"] ) previous_phase_representatives = [ min(group, key=lambda x: x["efficiency"]) for _, group in groupby( previous_phase_representatives, key=lambda x: x["phase"] ) ] recommendation_record = self.base_difference_record.df.to_dict("records")[ 0 ].copy() recommendation_record["days_to_ending"] = EPCRecord._calculate_days_to( lodgement_date=simulation_lodgment_date, ) for rec in property_recommendations_by_phase: # We simulate the impact of the recommendation at this current phase, and all of the prior phases if rec["type"] == "mechanical_ventilation": continue scoring_dict = self.create_recommendation_scoring_data( property_id=self.id, recommendation_record=recommendation_record, recommendations=previous_phase_representatives + [rec], primary_recommendation_id=rec["recommendation_id"], non_invasive_recommendations=self.non_invasive_recommendations, ) self.recommendations_scoring_data.append(scoring_dict) # We also use the representative recommendations to produce transformed EPCs represenative_recs_to_this_phase = [ r for r in property_representative_recommendations if r["phase"] <= phase ] epc_transformations = [x["description_simulation"] for x in represenative_recs_to_this_phase] # It is possible that we could have two simulations applied to the same descriptions # We extract these out phase_epc_transformation = {} for config in epc_transformations: for k, v in config.items(): if k in phase_epc_transformation: raise NotImplementedError( "Already have this key in the phase_epc_transformation - implement me") phase_epc_transformation[k] = v simulation_epc = self.epc_record.prepared_epc.copy() # Insert static values simulation_epc["lodgement_date"] = simulation_lodgment_date # Replace the understores with hyphens simulation_epc = {k.replace("_", "-"): v for k, v in simulation_epc.items()} simulation_epc.update(phase_epc_transformation) self.simulation_epcs[phase] = simulation_epc @staticmethod def create_recommendation_scoring_data( property_id, recommendation_record, recommendations: list, primary_recommendation_id: int, non_invasive_recommendations: list = None, ): """ This function will iterate through a list of recommendations and apply a simulation for each recommendation This allows us to later multiple measures and see the impact of the measures on the property :param property_id: The id of the property :param recommendation_record: The record of the property, which will be updated :param recommendations: The list of recommendations to apply :param primary_recommendation_id: The id of the primary recommendation, which is used to identify the record :param non_invasive_recommendations: The list of non-invasive recommendations :return: The updated recommendation record """ output = recommendation_record.copy() non_invasive_recommendations = [] if non_invasive_recommendations is None else non_invasive_recommendations for col in [ "walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness", ]: if output[col] is None: output[col] = "none" for recommendation in recommendations: # For the list of recommendations we have, we iteratively update the output # Update description to indicate it's insulate if recommendation["type"] in [ "solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation", ]: if len(recommendation["parts"]) > 1: raise NotImplementedError( "Have more than 1 floor insulation part - handle this case" ) # We don't really see above average for this in the training data output["floor_insulation_thickness_ending"] = "average" else: if output["floor_thermal_transmittance_ending"] is None: raise ValueError("We should not have a None value for the u value") if output["floor_insulation_thickness_ending"] is None: output["floor_insulation_thickness_ending"] = "none" if recommendation["type"] in [ "loft_insulation", "room_roof_insulation", "flat_roof_insulation", ]: output["roof_thermal_transmittance_ending"] = recommendation[ "new_u_value" ] parts = recommendation["parts"] if len(parts) != 1: raise ValueError( "More than one part for roof insulation - investiage me" ) # This is based on the values we have in the training data valid_numeric_values = [ 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400, ] proposed_depth = recommendation["new_thickness"] if proposed_depth not in valid_numeric_values: # Take the nearest value for scoring proposed_depth = min( valid_numeric_values, key=lambda x: abs(x - proposed_depth) ) output["roof_insulation_thickness_ending"] = str(int(proposed_depth)) if recommendation["type"] == "loft_insulation": if proposed_depth >= 270: output["roof_energy_eff_ending"] = "Very Good" else: if output["roof_energy_eff_ending"] not in ["Good", "Very Good"]: output["roof_energy_eff_ending"] = "Good" else: output["roof_energy_eff_ending"] = "Very Good" else: # Fill missing roof u-values - this fill is not based on recommended upgrades if output["roof_thermal_transmittance_ending"] is None: raise ValueError("We should not have a None value for the u value") if output["roof_insulation_thickness_ending"] is None: output["roof_insulation_thickness_ending"] = "none" if recommendation["type"] == "sealing_open_fireplace": output["number_open_fireplaces_ending"] = 0 if recommendation["type"] == "low_energy_lighting": output["low_energy_lighting_ending"] = 100 output["lighting_energy_eff_ending"] = "Very Good" if recommendation["type"] == "windows_glazing": output["multi_glaze_proportion_ending"] = 100 if output["windows_energy_eff_ending"] not in ["Average", "Good", "Very Good"]: output["windows_energy_eff_ending"] = "Average" is_secondary_glazing = recommendation["is_secondary_glazing"] if output["glazing_type_ending"] == "multiple": pass elif output["glazing_type_ending"] == "single": output["glazing_type_ending"] = ( "secondary" if is_secondary_glazing else "double" ) elif output["glazing_type_ending"] == "double": output["glazing_type_ending"] = ( "multiple" if is_secondary_glazing else "double" ) elif output["glazing_type_ending"] == "secondary": output["glazing_type_ending"] = ( "secondary" if is_secondary_glazing else "multiple" ) elif output["glazing_type_ending"] in ["triple", "high performance"]: output["glazing_type_ending"] = "multiple" else: raise ValueError("Invalid glazing type - implement me") if is_secondary_glazing: output["glazed_type_ending"] = "secondary glazing" else: output["glazed_type_ending"] = ( "double glazing installed during or after 2002" ) if recommendation["type"] in [ "heating", "hot_water_tank_insulation", "heating_control", "secondary_heating", "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", ]: # We update the data, as defined in the recommendaton if output["walls_insulation_thickness_ending"] is None: output["walls_insulation_thickness_ending"] = "none" simulation_config = recommendation["simulation_config"] # If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning # value for key, value in simulation_config.items(): if value is None: simulation_config[key] = "Unknown" output.update(simulation_config) if recommendation["type"] == "solar_pv": output["photo_supply_ending"] = recommendation["photo_supply"] if recommendation["type"] not in [ "sealing_open_fireplace", "low_energy_lighting", "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", "loft_insulation", "room_roof_insulation", "flat_roof_insulation", "solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation", "windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation", "heating_control", "secondary_heating" ]: raise NotImplementedError( "Implement me, given type %s" % recommendation["type"] ) output["id"] = "+".join([str(property_id), str(primary_recommendation_id)]) return output def get_components( self, cleaned, photo_supply_lookup, floor_area_decile_thresholds, energy_consumption_client ): """ Given the cleaning that has been performed, we'll use this to identify the property components, from roof to walls to windows, heating and hot water :param cleaned: This is the dictionary of components found in cleaner.cleaned :param photo_supply_lookup: This is the lookup table for the photo supply, used to estimate the percentage of the roof that is suitable for solar panels :param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the solar pv roof area :param energy_consumption_client: Contains the heating and hot water kwh models - used to predict current energy annual consumption in kWh :return: """ if not cleaned: raise ValueError("Cleaner does not contain cleaned data") if not self.data: raise ValueError("Property does not contain data") self.set_basic_property_dimensions() for description, attribute in cleaned.items(): if self.data[description] in self.DATA_ANOMALY_MATCHES: template = cleaned[description][0] fill_dict = dict(zip(template.keys(), [None] * len(template))) fill_dict.update( { "original_description": self.data[description], "clean_description": self.data[description], } ) setattr( self, self.ATTRIBUTE_MAP[description], fill_dict, ) continue attributes = [ x for x in cleaned[description] if x["original_description"] == self.data[description] ] if len(attributes) > 1: raise ValueError( "Either No attributes or multiple found for %s" % description ) if len(attributes) == 0: # We attempt to perform the clean on the fly cleaner_cls = all_cleaner_map[description] cleaner_cls = cleaner_cls(self.data[description]) processed = { "original_description": self.data[description], "clean_description": cleaner_cls.description.replace( "(assumed)", "" ) .rstrip() .capitalize(), **cleaner_cls.process(), } attributes = [processed] setattr(self, self.ATTRIBUTE_MAP[description], attributes[0]) self.set_wall_type() self.set_floor_type() self.set_floor_level() self.set_windows_count() self.set_solar_panel_area( photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds, ) self.set_energy_source() self.find_energy_sources() self.set_current_energy_bill(energy_consumption_client) def set_solar_panel_configuration(self, solar_panel_configuration): """ This funtion inserts the solar panel configuration into the property object """ self.solar_panel_configuration = solar_panel_configuration def set_current_energy_bill(self, energy_consumption_client): """ Given what we know about the property now, estimates the current energy consumption using the UCL paper https://www.sciencedirect.com/science/article/pii/S0378778823002542 :return: """ # We get the following things: # 1) Today's cost. This give us a basline figure for what the cost is today # 2) Predicted KwH # Today's costs todays_heating_cost = energy_consumption_client.convert_cost_to_today( original_cost=float(self.data["heating-cost-current"]), lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]) ) todays_hot_water_cost = energy_consumption_client.convert_cost_to_today( original_cost=float(self.data["hot-water-cost-current"]), lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]) ) todays_lighting_cost = energy_consumption_client.convert_cost_to_today( original_cost=float(self.data["lighting-cost-current"]), lodgement_date=pd.Timestamp(self.epc_record.prepared_epc["lodgement_date"]) ) scoring_df = pd.DataFrame([self.epc_record.prepared_epc]) # Change columns from underscores to hyphens scoring_df.columns = [ x.lower().replace("_", "-") for x in scoring_df.columns ] for col in ["heating_kwh", "hot_water_kwh"]: scoring_df[col] = None energy_consumption_client.data = None heating_prediction = energy_consumption_client.score_new_data( new_data=scoring_df, target="heating_kwh" )[0] hot_water_prediction = energy_consumption_client.score_new_data( new_data=scoring_df, target="hot_water_kwh" )[0] # We convert the lighting cost into kwh, just using the price cap lighting_kwh = float(self.data["lighting-cost-current"]) / AnnualBillSavings.ELECTRICITY_PRICE_CAP appliances_kwh = AnnualBillSavings.estimate_appliances_energy_use(total_floor_area=self.floor_area) adjusted_heating_kwh = AnnualBillSavings.adjust_energy_to_metered( epc_energy=heating_prediction, current_epc_rating=self.data["current-energy-rating"], ) adjusted_hot_water_kwh = AnnualBillSavings.adjust_energy_to_metered( epc_energy=hot_water_prediction, current_epc_rating=self.data["current-energy-rating"], ) adjusted_lighting_kwh = AnnualBillSavings.adjust_energy_to_metered( epc_energy=lighting_kwh, current_epc_rating=self.data["current-energy-rating"], ) adjusted_applicances_kwh = AnnualBillSavings.adjust_energy_to_metered( epc_energy=appliances_kwh, current_epc_rating=self.data["current-energy-rating"], ) # Adjust today's cost figures with the UCL model adjusted_heating_cost = AnnualBillSavings.adjust_energy_to_metered( epc_energy=todays_heating_cost, current_epc_rating=self.data["current-energy-rating"], ) adjusted_hot_water_cost = AnnualBillSavings.adjust_energy_to_metered( epc_energy=todays_hot_water_cost, current_epc_rating=self.data["current-energy-rating"], ) adjusted_lighting_cost = AnnualBillSavings.adjust_energy_to_metered( epc_energy=todays_lighting_cost, current_epc_rating=self.data["current-energy-rating"], ) adjusted_appliances_cost = AnnualBillSavings.adjust_energy_to_metered( epc_energy=appliances_kwh * AnnualBillSavings.ELECTRICITY_PRICE_CAP, current_epc_rating=self.data["current-energy-rating"], ) # Sum up the adjusted kwh figures self.current_adjusted_energy = ( adjusted_heating_kwh + adjusted_hot_water_kwh + adjusted_lighting_kwh + adjusted_applicances_kwh ) self.current_energy_bill = ( adjusted_heating_cost + adjusted_hot_water_cost + adjusted_lighting_cost + adjusted_appliances_cost ) self.energy_cost_estimates = { "adjusted": { "heating": adjusted_heating_cost, "hot_water": adjusted_hot_water_cost, "lighting": adjusted_lighting_cost, "appliances": adjusted_appliances_cost }, "unadjusted": { "heating": todays_heating_cost, "hot_water": todays_hot_water_cost, "lighting": todays_lighting_cost, "appliances": appliances_kwh * AnnualBillSavings.ELECTRICITY_PRICE_CAP }, "epc": { "heating": float(self.data["heating-cost-current"]), "hot_water": float(self.data["hot-water-cost-current"]), "lighting": float(self.data["lighting-cost-current"]), } } self.energy_consumption_estimates = { "adjusted": { "heating": adjusted_heating_kwh, "hot_water": adjusted_hot_water_kwh, "lighting": adjusted_lighting_kwh, "appliances": adjusted_applicances_kwh }, "unadjusted": { "heating": heating_prediction, "hot_water": hot_water_prediction, "lighting": lighting_kwh, "appliances": appliances_kwh } } def set_spatial(self, spatial: pd.DataFrame): """ Sets whether the property is in a conservation area given the output of the ConservationAreaClient Will store a dictionary, spatial, which is used to populate the property spatial table in the database :param spatial: Dataframe, containing the spatial data for the property """ self.in_conservation_area = spatial["conservation_status"].values[0] self.is_listed = spatial["is_listed_building"].values[0] self.is_heritage = spatial["is_heritage_building"].values[0] # We do an equals True, in the case of one of these variables being True if ( (self.in_conservation_area == True) | (self.is_listed == True) | (self.is_heritage == True) ): self.restricted_measures = True spatial_dict = spatial.to_dict("records")[0] self.spatial = { "x_coordinate": spatial_dict["X_COORDINATE"], "y_coordinate": spatial_dict["Y_COORDINATE"], "latitude": spatial_dict["LATITUDE"], "longitude": spatial_dict["LONGITUDE"], "conservation_status": spatial_dict["conservation_status"], "is_listed_building": spatial_dict["is_listed_building"], "is_heritage_building": spatial_dict["is_heritage_building"], } def _clean_upload_data(self, to_update): for k, v in to_update.items(): if v in self.DATA_ANOMALY_MATCHES: to_update[k] = None return to_update def get_full_property_data(self, current_valuation=None): """ This method extracts the data which is pushed to the database, containing core information, from the EPC about a property :return: """ property_data = { "creation_status": "READY", "uprn": int(self.data["uprn"]), "building_reference_number": int(self.data["building-reference-number"]), "has_pre_condition_report": True, "has_recommendations": True, "property_type": self.data["property-type"], "built_form": self.data["built-form"], "local_authority": self.data["local-authority-label"], "constituency": self.data["constituency-label"], "number_of_rooms": self.number_of_rooms, "year_built": self.year_built, "tenure": self.data["tenure"], "current_epc_rating": self.data["current-energy-rating"], "current_sap_points": self.data["current-energy-efficiency"], "current_valuation": current_valuation, } property_data = self._clean_upload_data(property_data) return property_data @classmethod def _prepare_rating_field(cls, field, rating_lookup): """ Utility function for usage in the lambda, for preparing the _rating fields """ return ( rating_lookup[field].value if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None) else None ) def get_property_details_epc(self, portfolio_id: int, rating_lookup): property_details_epc = { "property_id": self.id, "portfolio_id": portfolio_id, "full_address": self.data["address"], "total_floor_area": float(self.data["total-floor-area"]), "walls": self.walls["clean_description"], "walls_rating": self._prepare_rating_field( self.data["walls-energy-eff"], rating_lookup ), "roof": self.roof["clean_description"], "roof_rating": self._prepare_rating_field( self.data["roof-energy-eff"], rating_lookup ), "floor": self.floor["clean_description"], "floor_rating": self._prepare_rating_field( self.data["floor-energy-eff"], rating_lookup ), "windows": self.windows["clean_description"], "windows_rating": self._prepare_rating_field( self.data["windows-energy-eff"], rating_lookup ), "heating": self.main_heating["clean_description"], "heating_rating": self._prepare_rating_field( self.data["mainheat-energy-eff"], rating_lookup ), "heating_controls": self.main_heating_controls["clean_description"], "heating_controls_rating": self._prepare_rating_field( self.data["mainheatc-energy-eff"], rating_lookup ), "hot_water": self.hotwater["clean_description"], "hot_water_rating": self._prepare_rating_field( self.data["hot-water-energy-eff"], rating_lookup ), "lighting": self.lighting["clean_description"], "lighting_rating": self._prepare_rating_field( self.data["lighting-energy-eff"], rating_lookup ), "mainfuel": self.main_fuel["clean_description"], "ventilation": self.ventilation["ventilation"], "solar_pv": self.solar_pv["solar_pv"], "solar_hot_water": self.solar_hot_water["solar_hot_water_boolean"], "wind_turbine": self.wind_turbine["wind_turbine"], "floor_height": self.floor_height, "heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"], "unheated_corridor_length": self.heat_loss_corridor["length"], "number_of_open_fireplaces": self.number_of_open_fireplaces[ "number_of_open_fireplaces" ], "number_of_extensions": self.number_of_extensions["number_of_extensions"], "number_of_storeys": self.number_of_storeys["number_of_storeys"], "mains_gas": self.mains_gas, "energy_tariff": self.data["energy-tariff"], "primary_energy_consumption": self.energy["primary_energy_consumption"], "co2_emissions": self.energy["co2_emissions"], "adjusted_energy_consumption": self.current_adjusted_energy, "estimated": self.data.get("estimated", False), } return property_details_epc def get_spatial_data(self, uprn_filenames): """ Given a property's UPRN, this method will pull the associated spatial data from s3 :return: """ if self.uprn is None: logger.warning( "We do not have a UPRN for this property - this needs to be implemented" ) self.in_conservation_area = False self.is_listed = False self.is_heritage = False self.restricted_measures = True return # We get the file name for the uprn filtered_df = uprn_filenames[ (uprn_filenames["lower"] <= self.uprn) & (uprn_filenames["upper"] >= self.uprn) ] if filtered_df.empty: logger.warning("Could not find file containing UPRNS") return None filename = filtered_df.iloc[0]["filenames"] spatial_data = read_dataframe_from_s3_parquet( bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}" ) spatial = spatial_data[spatial_data["UPRN"] == self.uprn] # Pull out spatial features self.set_spatial(spatial) def _filter_property_dimensions(self, property_dimensions): """ Will filter the property dimensions dataframe to only include the relevant rows for the property :param property_dimensions: :return: filtered property dimensions dataframe """ result = property_dimensions[ (property_dimensions["PROPERTY_TYPE"] == self.data["property-type"]) ] if ( self.construction_age_band is not None and self.construction_age_band not in self.DATA_ANOMALY_MATCHES ): result = result[ (result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band) ] if ( self.data["built-form"] not in self.DATA_ANOMALY_MATCHES and self.data["built-form"] in result["BUILT_FORM"] ): result = result[(result["BUILT_FORM"] == self.data["built-form"])] return result[ ["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"] ].mean() def set_basic_property_dimensions(self): """ This method sets the number of floors of the property, using a simple approach based on an estimate for average room size, number of rooms and total floor area It sets the perimeter of the property, using a simple approach based on an estimate for average room size, number of rooms and total floor area Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on medians across the EPC data :return: """ # TODO: These functions should work on an EPCRecord object, so that the format is more standardised. # They could also be added as attributes to the EPC Record self.perimeter = estimate_perimeter( self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors, ) self.insulation_wall_area = estimate_external_wall_area( num_floors=self.number_of_floors, floor_height=self.floor_height, perimeter=self.perimeter, built_form=self.data["built-form"], ) self.insulation_floor_area = self.floor_area / self.number_of_floors self.pitched_roof_area = esimtate_pitched_roof_area( floor_area=self.insulation_floor_area, floor_height=self.floor_height ) def set_floor_level(self): self.floor_level = ( FLOOR_LEVEL_MAP[self.data["floor-level"]] if self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data["floor-level"] is not None else None ) if self.floor_level is None: if self.data["property-type"] != "Flat": return if self.floor["another_property_below"]: self.floor_level = 1 else: self.floor_level = 0 return # We perform some extra checks, if the property is not on the ground floor, as we have found cases # where a property is marked as being on the first floor if self.floor_level > 0: # We check if there is another property below (for a non-sap assessment) if not self.floor["another_property_below"] and self.floor["thermal_transmittance_unit"] is None: self.floor_level = 0 return if self.floor_level == 0: # Check if another property below if self.floor["another_property_below"]: self.floor_level = 1 return def set_wall_type(self): """ This method sets the wall type of the property, using a simple approach based on the wall description :return: """ self.wall_type = get_wall_type(**self.walls) def set_floor_type(self): """ This method sets the floor type of the property, which is used for calculating u-values Section 5.6 of the BRE indicates that "to simplify data collection no distinction is made in terms of U-value between an exposed floor (to outside air below) and a semi-exposed floor (to an enclosed but unheated space below) and the U-values in Table S12 are used. Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for calculating u-values """ if self.floor["is_suspended"] | self.floor["another_property_below"]: self.floor_type = "suspended" elif self.floor["is_solid"]: self.floor_type = "solid" elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]: self.floor_type = "exposed_floor" elif self.floor["thermal_transmittance"] is not None: self.floor_type = "solid" else: raise NotImplementedError("Implement this floor type") @staticmethod def _extract_component( component_data, component_rename_cols, component_drop_cols, rename_prefix=None ): for k in component_rename_cols: component_data[f"{rename_prefix}_{k}"] = component_data.get(k) component_data = { k: v for k, v in component_data.items() if k not in component_drop_cols + component_rename_cols } return component_data def set_adjusted_energy( self, expected_adjusted_energy, expected_energy_bill ): """ Stores these values for usage later """ self.expected_adjusted_energy = expected_adjusted_energy self.expected_energy_bill = expected_energy_bill def set_windows_count(self): """ Using the estimate_windows function, this method will set the number of windows in the property :return: """ self.number_of_windows = estimate_windows( property_type=self.data["property-type"], built_form=self.data["built-form"], construction_age_band=self.construction_age_band, floor_area=self.floor_area, number_habitable_rooms=self.number_of_rooms, ) def set_solar_panel_area(self, photo_supply_lookup, floor_area_decile_thresholds): """ Sets the approximate area of the solar panels :return: """ if (self.insulation_floor_area is None) and (self.pitched_roof_area is None): raise ValueError( "Need to set insulation floor area and pitched roof area before setting solar pv roof area" ) photo_supply_matched = SolarPhotoSupply.filter_photo_supply_lookup( photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds, tenure=self.data["tenure"], built_form=self.data["built-form"], property_type=self.data["property-type"], construction_age_band=self.construction_age_band, is_flat=self.roof["is_flat"], is_pitched=self.roof["is_pitched"], is_roof_room=self.roof["is_roof_room"], floor_area=self.floor_area, ) percentage_of_roof = photo_supply_matched["photo_supply_median"].mean() percentage_of_roof = percentage_of_roof / 100 self.solar_pv_percentage = percentage_of_roof def get_solar_pv_roof_area(self, percentage_of_roof): """ Given a percentage of the roof, this method will return the estimated area of the solar panels :param percentage_of_roof: :return: """ return ( self.insulation_floor_area * percentage_of_roof if self.roof["is_flat"] else self.pitched_roof_area * percentage_of_roof ) def set_energy_source(self): """ This method sets the energy source of the property, based on the mains gas flag and energy tariff. """ # Default to "electricity_and_gas" to cover most scenarios including when mains_gas_flag is True energy_source = "electricity_and_gas" # If the tariff explicitly indicates electricity use without a dual indication and mains_gas_flag is not True # We check for the common electricity tariffs if not self.data["mains-gas-flag"] and self.data["energy-tariff"] in [ "Single", "off-peak 7 hour", "off-peak 10 hour", "off-peak 18 hour", "standard tariff", "24 hour", ]: energy_source = "electricity" # Set the energy source based on the conditions above self.energy_source = energy_source def find_energy_sources(self): # Based on the heating and the hot water heating_fuel_mapping = { 'has_mains_gas': 'Natural Gas', 'has_electric': 'Electricity', 'has_oil': 'Oil', 'has_wood_logs': 'Wood Logs', 'has_coal': 'Coal', 'has_anthracite': 'Anthracite', 'has_smokeless_fuel': 'Smokeless Fuel', 'has_lpg': 'LPG', 'has_b30k': 'B30K Biofuel', 'has_air_source_heat_pump': 'Electricity', 'has_ground_source_heat_pump': 'Electricity', 'has_water_source_heat_pump': 'Electricity', 'has_electric_heat_pump': 'Electricity', 'has_solar_assisted_heat_pump': 'Electricity', 'has_exhaust_source_heat_pump': 'Electricity', 'has_community_heat_pump': 'Electricity', 'has_wood_pellets': 'Wood Pellets', 'has_community_scheme': 'Varied (Community Scheme)' } # Hot water heater_type_to_fuel = { 'gas instantaneous': 'Natural Gas', 'electric heat pump': 'Electricity', 'electric immersion': 'Electricity', 'gas boiler': 'Natural Gas', 'oil boiler': 'Oil', 'electric instantaneous': 'Electricity', 'gas multipoint': 'Natural Gas', 'heat pump': 'Electricity', 'solid fuel boiler': 'Solid Fuel', 'solid fuel range cooker': 'Solid Fuel', 'room heaters': 'Varied' # Could be any fuel, further specifics needed based on context } # Define a mapping from system types to general categories or modifications of fuel types system_type_modification = { 'from main system': 'Main System', 'from secondary system': 'Secondary System', 'from second main heating system': 'Secondary System', 'community scheme': 'Community Scheme' } self.heating_energy_source = [ fuel for key, fuel in heating_fuel_mapping.items() if self.main_heating.get(key, False) ] if len(self.heating_energy_source) == 0 or len(self.heating_energy_source) > 1: raise Exception("Investigate em") self.heating_energy_source = self.heating_energy_source[0] if self.hotwater["heater_type"] is not None: self.hot_water_energy_source = heater_type_to_fuel[self.hotwater["heater_type"]] else: fuel = system_type_modification[self.hotwater["system_type"]] if fuel == 'Main System': self.hot_water_energy_source = self.heating_energy_source else: raise Exception("Investiage me")