Model/backend/Property.py
2024-01-16 16:57:45 +00:00

722 lines
30 KiB
Python

from datetime import datetime
import re
import os
import numpy as np
import pandas as pd
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Dataset import TrainingDataset
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from BaseUtility import Definitions
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
logger = setup_logger()
class Property(Definitions):
ATTRIBUTE_MAP = {
"floor-description": "floor",
"hotwater-description": "hotwater",
"main-fuel": "main_fuel",
"mainheat-description": "main_heating",
"mainheatcont-description": "main_heating_controls",
"roof-description": "roof",
"walls-description": "walls",
"windows-description": "windows",
"lighting-description": "lighting"
}
floor = None
hotwater = None
main_fuel = None
main_heating = None
main_heating_controls = None
roof = None
walls = None
windows = None
lighting = None
spatial = None
def __init__(self, id, postcode, address, epc_record, data=None):
self.epc_record = epc_record
self.id = id
self.address = address
self.postcode = postcode
self.data = {k.replace("_", "-"): v for k,v in epc_record.get("prepared_epc").items()}
self.old_data = epc_record.get("old_data")
self.property_dimensions = None
self.uprn = epc_record.get("uprn")
self.full_sap_epc = epc_record.get("full_sap_epc")
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = epc_record.get("year_built")
self.number_of_rooms = epc_record.prepared_epc.get("number_of_rooms")
self.age_band = epc_record.get("age_band")
self.construction_age_band = epc_record.get("construction_age_band")
self.number_of_floors = epc_record.get("number_of_floors")
self.perimeter = None
self.wall_type = None
self.floor_type = None
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
"co2_emissions": epc_record.get("co2_emissions_current"),
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
}
self.solar_pv = {
"solar_pv": epc_record.get("photo_supply"),
}
self.solar_hot_water = {
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
}
self.wind_turbine = {
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
}
self.number_of_open_fireplaces = {
"number_of_open_fireplaces": epc_record.prepared_epc.get("number_of_open_fireplaces"),
}
self.number_of_extensions = {
"number_of_extensions": epc_record.prepared_epc.get("number_of_extensions"),
}
self.number_of_storeys = {
"number_of_storeys": epc_record.prepared_epc.get("number_of_storeys"),
}
self.heat_loss_corridor = {
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
}
self.mains_gas = epc_record.prepared_epc.get('mains_gas_flag')
self.floor_height = epc_record.prepared_epc.get('floor_height')
self.insulation_wall_area = None
self.floor_area = epc_record.prepared_epc.get('total_floor_area')
self.pitched_roof_area = None
self.insulation_floor_area = None
self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count")
self.floor_level = None
self.number_of_windows = None
self.solar_pv_roof_area = None
self.solar_pv_percentage = None
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
self.recommendations_scoring_data = []
def create_base_difference_epc_record(self, cleaned_lookup: dict):
"""
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
expected EPC
It will be the same starting and ending EPC, as we don't have the expected EPC yet
"""
difference_record = self.epc_record - self.epc_record
# TODO: change these lower and replace in the settings file
fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD
print("NEED TO CHANGE THE DASH TO LOWER CASE")
fixed_data_col_names = [x.lower().replace("_", "-") for x in fixed_data_col_names]
fixed_data = {k.replace("-", "_"):v for k,v in self.data.items() if k in fixed_data_col_names}
difference_record.append_fixed_data(fixed_data)
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
# TODO: adjust the base difference record with the previously calculated u values + features
# estimated_perimeter is different to the perimeter in the epc record
# self.base_difference_record.df
def adjust_difference_record_with_recommendations(self, property_recommendations):
"""
This method will adjust the difference record, based on the recommendations made for the property
:param recommendations: dictionary of recommendations for the property
:return:
"""
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = self.create_recommendation_scoring_data(
recommendation=rec,
)
scoring_dict['id'] = "+".join([str(self.id), str(rec["recommendation_id"])])
self.recommendations_scoring_data.append(scoring_dict)
def create_recommendation_scoring_data(self, recommendation: dict):
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
]:
if recommendation_record[col] is None:
recommendation_record[col] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
recommendation_record["walls_insulation_thickness_ending"] = "above average"
recommendation_record["walls_energy_eff_ending"] = "Good"
else:
wind_turbine_count = int(wind_turbine_count)
self.wind_turbine = {
"wind_turbine": wind_turbine_count,
}
def set_count_variables(self):
"""
For EPC fields that are just counts, we'll set them here
These are fields that are integers but may contain additional values such as "" so we can't do a direct
conversion straight to an integer
:return:
"""
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
value = self.data["extension-count"]
if value == "" or value in self.DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
setattr(self, attribute, value)
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
"""
Given the cleaning that has been performed, we'll use this to identify the property
components, from roof to walls to windows, heating and hot water
:param cleaned: This is the dictionary of components found in cleaner.cleaned
:param photo_supply_lookup: This is the lookup table for the photo supply, used to estimate the percentage
of the roof that is suitable for solar panels
:param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the
solar pv roof area
:return:
"""
if not cleaned:
raise ValueError("Cleaner does not contain cleaned data")
if not self.data:
raise ValueError("Property does not contain data")
self.set_basic_property_dimensions()
for description, attribute in cleaned.items():
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
fill_dict.update({
"original_description": self.data[description],
"clean_description": self.data[description],
})
setattr(
self,
self.ATTRIBUTE_MAP[description],
fill_dict,
)
continue
attributes = [
x for x in cleaned[description] if x["original_description"] == self.data[description]
]
if len(attributes) > 1:
raise ValueError("Either No attributes or multiple found for %s" % description)
if len(attributes) == 0:
# We attempt to perform the clean on the fly
cleaner_cls = all_cleaner_map[description]
cleaner_cls = cleaner_cls(self.data[description])
processed = {
"original_description": self.data[description],
"clean_description": cleaner_cls.description.replace("(assumed)", "").rstrip().capitalize(),
**cleaner_cls.process()
}
attributes = [processed]
setattr(self, self.ATTRIBUTE_MAP[description], attributes[0])
self.set_wall_type()
self.set_floor_type()
self.set_floor_level()
self.set_windows_count()
self.set_solar_panel_area(
photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
)
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
Will store a dictionary, spatial, which is used to populate the property spatial table in the database
:param spatial: Dataframe, containing the spatial data for the property
"""
self.in_conservation_area = spatial["conservation_status"].values[0]
self.is_listed = spatial["is_listed_building"].values[0]
self.is_heritage = spatial["is_heritage_building"].values[0]
# We do an equals True, in the case of one of these variables being True
if (self.in_conservation_area == True) | (self.is_listed == True) | (self.is_heritage == True):
self.restricted_measures = True
spatial_dict = spatial.to_dict("records")[0]
self.spatial = {
"x_coordinate": spatial_dict["X_COORDINATE"],
"y_coordinate": spatial_dict["Y_COORDINATE"],
"latitude": spatial_dict["LATITUDE"],
"longitude": spatial_dict["LONGITUDE"],
"conservation_status": spatial_dict["conservation_status"],
"is_listed_building": spatial_dict["is_listed_building"],
"is_heritage_building": spatial_dict["is_heritage_building"],
}
def _clean_upload_data(self, to_update):
for k, v in to_update.items():
if v in self.DATA_ANOMALY_MATCHES:
to_update[k] = None
return to_update
def get_full_property_data(self):
"""
This method extracts the data which is pushed to the database, containing core information, from the EPC
about a property
:return:
"""
property_data = {
"creation_status": "READY",
"uprn": int(self.data["uprn"]),
"building_reference_number": int(self.data["building-reference-number"]),
"has_pre_condition_report": True,
"has_recommendations": True,
"property_type": self.data["property-type"],
"built_form": self.data["built-form"],
"local_authority": self.data["local-authority-label"],
"constituency": self.data["constituency-label"],
"number_of_rooms": self.number_of_rooms,
"year_built": self.year_built,
"tenure": self.data["tenure"],
"current_epc_rating": self.data["current-energy-rating"],
"current_sap_points": self.data["current-energy-efficiency"],
}
property_data = self._clean_upload_data(property_data)
return property_data
@classmethod
def _prepare_rating_field(cls, field, rating_lookup):
"""
Utility function for usage in the lambda, for preparing the _rating fields
"""
return rating_lookup[field].value if (field not in cls.DATA_ANOMALY_MATCHES) and (field is not None) else None
def get_property_details_epc(self, portfolio_id: int, rating_lookup):
property_details_epc = {
"property_id": self.id,
"portfolio_id": portfolio_id,
"full_address": self.data["address"],
"total_floor_area": float(self.data["total-floor-area"]),
"walls": self.walls["clean_description"],
"walls_rating": self._prepare_rating_field(self.data["walls-energy-eff"], rating_lookup),
"roof": self.roof["clean_description"],
"roof_rating": self._prepare_rating_field(self.data["roof-energy-eff"], rating_lookup),
"floor": self.floor["clean_description"],
"floor_rating": self._prepare_rating_field(self.data["floor-energy-eff"], rating_lookup),
"windows": self.windows["clean_description"],
"windows_rating": self._prepare_rating_field(self.data["windows-energy-eff"], rating_lookup),
"heating": self.main_heating["clean_description"],
"heating_rating": self._prepare_rating_field(self.data["mainheat-energy-eff"], rating_lookup),
"heating_controls": self.main_heating_controls["clean_description"],
"heating_controls_rating": self._prepare_rating_field(self.data["mainheatc-energy-eff"], rating_lookup),
"hot_water": self.hotwater["clean_description"],
"hot_water_rating": self._prepare_rating_field(self.data["hot-water-energy-eff"], rating_lookup),
"lighting": self.lighting["clean_description"],
"lighting_rating": self._prepare_rating_field(self.data["lighting-energy-eff"], rating_lookup),
"mainfuel": self.main_fuel["clean_description"],
"ventilation": self.ventilation["ventilation"],
"solar_pv": self.solar_pv["solar_pv"],
"solar_hot_water": self.solar_hot_water["solar_hot_water"],
"wind_turbine": self.wind_turbine["wind_turbine"],
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces,
"number_of_extensions": self.number_of_extensions,
"number_of_storeys": self.number_of_storeys,
"mains_gas": self.mains_gas,
"energy_tariff": self.data["energy-tariff"],
"primary_energy_consumption": self.energy["primary_energy_consumption"],
"co2_emissions": self.energy["co2_emissions"],
"adjusted_energy_consumption": self.current_adjusted_energy,
"estimated": self.data.get("estimated", False)
}
return property_details_epc
def get_spatial_data(self, uprn_filenames):
"""
Given a property's UPRN, this method will pull the associated spatial data from s3
:return:
"""
if self.uprn is None:
logger.warning("We do not have a UPRN for this property - this needs to be implemented")
self.in_conservation_area = False
self.is_listed = False
self.is_heritage = False
self.restricted_measures = True
return
# We get the file name for the uprn
filtered_df = uprn_filenames[(uprn_filenames['lower'] <= self.uprn) & (uprn_filenames['upper'] >= self.uprn)]
if filtered_df.empty:
logger.warning("Could not find file containing UPRNS")
return None
filename = filtered_df.iloc[0]['filenames']
spatial_data = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"spatial/{filename}"
)
spatial = spatial_data[spatial_data["UPRN"] == self.uprn]
# Pull out spatial features
self.set_spatial(spatial)
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.data["property-type"])]
if self.construction_age_band is not None and self.construction_age_band not in self.DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if self.data["built-form"] not in self.DATA_ANOMALY_MATCHES and self.data["built-form"] in result["BUILT_FORM"]:
result = result[(result["BUILT_FORM"] == self.data["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
def set_basic_property_dimensions(self):
"""
This method sets the number of floors of the property, using a simple approach based on an estimate for
average room size, number of rooms and total floor area
It sets the perimeter of the property, using a simple approach based on an estimate for average room size,
number of rooms and total floor area
Also sets floor area, number of rooms, using backup cleaned values if this data is not present, based on
medians across the EPC data
:return:
"""
self.perimeter = estimate_perimeter(
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
)
self.insulation_wall_area = estimate_external_wall_area(
num_floors=self.number_of_floors,
floor_height=self.floor_height,
perimeter=self.perimeter,
built_form=self.data["built-form"],
)
self.insulation_floor_area = self.floor_area / self.number_of_floors
self.pitched_roof_area = esimtate_pitched_roof_area(
floor_area=self.insulation_floor_area, floor_height=self.floor_height
)
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None
else None
)
if self.floor_level is None:
if self.data["property-type"] != "Flat":
return
if self.floor["another_property_below"]:
self.floor_level = 1
else:
self.floor_level = 0
return
# We perform some extra checks, if the property is not on the ground floor, as we have found cases
# where a property is marked as being on the first floor
if self.floor_level > 0:
# We check if there is another property below
if not self.floor["another_property_below"]:
self.floor_level = 0
return
if self.floor_level == 0:
# Check if another property below
if self.floor["another_property_below"]:
self.floor_level = 1
return
def set_wall_type(self):
"""
This method sets the wall type of the property, using a simple approach based on the wall description
:return:
"""
self.wall_type = get_wall_type(**self.walls)
def set_floor_type(self):
"""
This method sets the floor type of the property, which is used for calculating u-values
Section 5.6 of the BRE indicates that
"to simplify data collection no distinction is made in terms of U-value between an exposed floor (to
outside air below) and a semi-exposed floor (to an enclosed but unheated space below)
and the U-values in Table S12 are used.
Therefore, we treat the exposed floor and suspended floor as the same type of floor, which is used for
calculating u-values
"""
if self.floor["is_suspended"] | self.floor["another_property_below"]:
self.floor_type = "suspended"
elif self.floor["is_solid"]:
self.floor_type = "solid"
elif self.floor["is_to_unheated_space"] | self.floor["is_to_external_air"]:
self.floor_type = "exposed_floor"
elif self.floor["thermal_transmittance"] is not None:
self.floor_type = "solid"
else:
raise NotImplementedError("Implement this floor type")
@staticmethod
def _extract_component(component_data, component_rename_cols, component_drop_cols, rename_prefix=None):
for k in component_rename_cols:
component_data[f"{rename_prefix}_{k}"] = component_data.get(k)
component_data = {
k: v for k, v in component_data.items() if k not in component_drop_cols + component_rename_cols
}
return component_data
def get_model_data(self):
"""
This method extracts cleaned data from the property object, which is used in our machine learning models
This will use many of the cleaned properties, extracted from the epc data, or methods in DataProcessor.
For future iterations of this, we probably want to implement a singular method in DataProcessor, which can
be used in the etl code and in here
:return: dictionary of model data to be scored in the model
"""
drop_cols = ["original_description", "clean_description"]
insulation_drop_cols = ["thermal_transmittance_unit", "is_assumed", "is_valid"]
insulation_rename_cols = ["thermal_transmittance", "insulation_thickness"]
walls = self._extract_component(self.walls, insulation_rename_cols, insulation_drop_cols + drop_cols, "walls")
roof = self._extract_component(self.roof, insulation_rename_cols, insulation_drop_cols + drop_cols, "roof")
floor = self._extract_component(self.floor, insulation_rename_cols, insulation_drop_cols + drop_cols, "floor")
windows = self._extract_component(self.windows, [], drop_cols + ["no_data"])
fuel = self._extract_component(self.main_fuel, ["tariff_type"], drop_cols + ["tariff_type"], "main-fuel")
main_heating = self._extract_component(self.main_heating, [], drop_cols + ["has_assumed"])
main_heating_controls = self._extract_component(self.main_heating_controls, [], drop_cols)
hotwater = self._extract_component(self.hotwater, ["tariff_type"], drop_cols + ['assumed'], "hotwater")
# We'll need to clean second heating
second_heating = self.data["secondheat-description"]
epc_raw_columns = POTENTIAL_COLUMNS + EFFICIENCY_FEATURES + [
'TRANSACTION_TYPE',
'ENERGY_TARIFF',
'PROPERTY_TYPE',
'UPRN',
'NUMBER_OPEN_FIREPLACES',
'MULTI_GLAZE_PROPORTION',
'MECHANICAL_VENTILATION',
'PHOTO_SUPPLY',
'LOW_ENERGY_LIGHTING',
'SOLAR_WATER_HEATING_FLAG',
'GLAZED_TYPE',
'CONSTITUENCY',
'NUMBER_HEATED_ROOMS',
'EXTENSION_COUNT',
]
epc_raw_data = {
k: self.data[k.lower().replace("_", "-")] for k in epc_raw_columns
}
built_form_cleaning_map = {
"Flat": "Mid-Terrace",
"House": "Semi-Detached",
"Bungalow": "Detached",
"Maisonette": "Mid-Terrace"
}
built_form = self.data["built-form"]
if built_form in self.DATA_ANOMALY_MATCHES:
# TODO: If built form isn't captured, we use the most common value for that property type - we shall
# improve this methodology
built_form = built_form_cleaning_map.get(self.data["property-type"])
if not built_form:
raise NotImplementedError("Not handled this property type when cleaning built form")
property_data = {
**walls,
**roof,
**floor,
**fuel,
**main_heating,
**main_heating_controls,
**hotwater,
**windows,
"SECONDHEAT_DESCRIPTION": second_heating,
"DAYS_TO": EPCDataProcessor.calculate_days_to(self.data["lodgement-date"]),
"SAP": float(self.data["current-energy-efficiency"]),
"CARBON": float(self.data["co2-emissions-current"]),
"HEAT_DEMAND": float(self.data["energy-consumption-current"]),
"estimated_perimeter": self.perimeter,
"CONSTRUCTION_AGE_BAND": self.construction_age_band,
"FLOOR_HEIGHT": self.floor_height,
"NUMBER_HABITABLE_ROOMS": self.number_of_rooms,
"TOTAL_FLOOR_AREA": self.floor_area,
"FIXED_LIGHTING_OUTLETS_COUNT": self.number_lighting_outlets,
**epc_raw_data,
"BUILT_FORM": built_form,
"POSTCODE": self.data["postcode"],
}
return property_data
def set_number_lighting_outlets(self, cleaned_property_data):
"""
Extracts and cleans the estimated number of lighting outlets
:return:
"""
if self.data["fixed-lighting-outlets-count"] in [None, ""]:
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(x["fixed-lighting-outlets-count"]) for x in self.old_data if
x["fixed-lighting-outlets-count"] != ""
])
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
if lighting_data:
self.number_lighting_outlets = round(np.median(lighting_data))
else:
self.number_lighting_outlets = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
else:
self.number_lighting_outlets = float(self.data["fixed-lighting-outlets-count"])
def set_adjusted_energy(self, current_adjusted_energy, expected_adjusted_energy):
"""
Stores these values for usage later
"""
self.current_adjusted_energy = current_adjusted_energy
self.expected_adjusted_energy = expected_adjusted_energy
def set_windows_count(self):
"""
Using the estimate_windows function, this method will set the number of windows in the property
:return:
"""
self.number_of_windows = estimate_windows(
property_type=self.data["property-type"],
built_form=self.data["built-form"],
construction_age_band=self.construction_age_band,
floor_area=self.floor_area,
number_habitable_rooms=self.number_of_rooms,
extension_count=float(self.data["extension-count"]),
)
def set_solar_panel_area(self, photo_supply_lookup, floor_area_decile_thresholds):
"""
Sets the approximate area of the solar panels
:return:
"""
if (self.insulation_floor_area is None) and (self.pitched_roof_area is None):
raise ValueError(
"Need to set insulation floor area and pitched roof area before setting solar pv roof area"
)
photo_supply_matched = SolarPhotoSupply.filter_photo_supply_lookup(
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds,
tenure=self.data["tenure"],
built_form=self.data["built-form"],
property_type=self.data["property-type"],
construction_age_band=self.construction_age_band,
is_flat=self.roof["is_flat"],
is_pitched=self.roof["is_pitched"],
is_roof_room=self.roof["is_roof_room"],
floor_area=self.floor_area
)
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
percentage_of_roof = percentage_of_roof / 100
self.solar_pv_roof_area = (
self.insulation_floor_area * percentage_of_roof if self.roof["is_flat"] else
self.pitched_roof_area * percentage_of_roof
)
self.solar_pv_percentage = percentage_of_roof