Merge pull request #269 from Hestia-Homes/solar-recs

Solar recs
This commit is contained in:
KhalimCK 2024-01-16 11:45:38 +00:00 committed by GitHub
commit 8904467788
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1909 additions and 380 deletions

View file

@ -8,9 +8,9 @@ import pandas as pd
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from epc_api.client import EpcClient
from BaseUtility import Definitions
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from recommendations.recommendation_utils import (
@ -89,6 +89,8 @@ class Property(Definitions):
self.number_lighting_outlets = None
self.floor_level = None
self.number_of_windows = None
self.solar_pv_roof_area = None
self.solar_pv_percentage = None
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
@ -148,7 +150,7 @@ class Property(Definitions):
"""
solar_pv = self.data["photo-supply"]
if solar_pv == "":
if solar_pv in ["", None]:
solar_pv = None
else:
solar_pv = float(solar_pv)
@ -168,6 +170,7 @@ class Property(Definitions):
"Y": True,
"N": False,
"": None,
None: None,
}
self.solar_hot_water = {
@ -221,11 +224,15 @@ class Property(Definitions):
setattr(self, attribute, value)
def get_components(self, cleaned):
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
"""
Given the cleaning that has been performed, we'll use this to identify the property
components, from roof to walls to windows, heating and hot water
:param cleaned: This is the dictionary of components found in cleaner.cleaned
:param photo_supply_lookup: This is the lookup table for the photo supply, used to estimate the percentage
of the roof that is suitable for solar panels
:param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the
solar pv roof area
:return:
"""
@ -239,8 +246,8 @@ class Property(Definitions):
# it
self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"])
if self.data["built-form"] in self.DATA_ANOMALY_MATCHES:
if self.data["property-type"] == "Flat":
self.data["built-form"] = "Semi-Detached"
if self.data["property-type"] in ["Flat", "Maisonette"]:
self.data["built-form"] = "End-Terrace"
self.set_year_built()
self.set_energy()
@ -295,6 +302,9 @@ class Property(Definitions):
self.set_floor_type()
self.set_floor_level()
self.set_windows_count()
self.set_solar_panel_area(
photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
)
def set_age_band(self):
"""
@ -326,7 +336,9 @@ class Property(Definitions):
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
raise ValueError("age_band is missing")
logger.info("Age band is missing - filling with national average")
self.age_band = "C"
self.construction_age_band = "England and Wales: 1930-1949"
def set_spatial(self, spatial: pd.DataFrame):
"""
@ -385,7 +397,8 @@ class Property(Definitions):
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
"heated corridor": False,
None: False
}
if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES:
@ -394,7 +407,7 @@ class Property(Definitions):
has_heat_loss_corridor = map[self.data["heat-loss-corridor"]]
length = self.data["unheated-corridor-length"]
if length == "":
if length in ["", None]:
length = None
else:
length = float(length)
@ -570,7 +583,7 @@ class Property(Definitions):
self.floor_area = float(self.data["total-floor-area"])
if not self.data["number-habitable-rooms"] or (
self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
self.data["floor-height"] in ["", None] or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
):
if self.property_dimensions is None:
property_dimensions = read_dataframe_from_s3_parquet(
@ -592,7 +605,7 @@ class Property(Definitions):
else:
raise NotImplementedError("Implement me")
if self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
if self.data["floor-height"] in [None, ""] or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.floor_height = float(self.data["floor-height"])
@ -617,7 +630,7 @@ class Property(Definitions):
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES else None
self.data["floor-level"] not in list(self.DATA_ANOMALY_MATCHES) + [None] else None
)
if self.floor_level is None:
@ -785,7 +798,7 @@ class Property(Definitions):
:return:
"""
if self.data["fixed-lighting-outlets-count"] == "":
if self.data["fixed-lighting-outlets-count"] in [None, ""]:
# We check old EPCs and the full SAP EPC
@ -829,3 +842,37 @@ class Property(Definitions):
number_habitable_rooms=self.number_of_rooms,
extension_count=float(self.data["extension-count"]),
)
def set_solar_panel_area(self, photo_supply_lookup, floor_area_decile_thresholds):
"""
Sets the approximate area of the solar panels
:return:
"""
if (self.insulation_floor_area is None) and (self.pitched_roof_area is None):
raise ValueError(
"Need to set insulation floor area and pitched roof area before setting solar pv roof area"
)
photo_supply_matched = SolarPhotoSupply.filter_photo_supply_lookup(
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds,
tenure=self.data["tenure"],
built_form=self.data["built-form"],
property_type=self.data["property-type"],
construction_age_band=self.construction_age_band,
is_flat=self.roof["is_flat"],
is_pitched=self.roof["is_pitched"],
is_roof_room=self.roof["is_roof_room"],
floor_area=self.floor_area
)
percentage_of_roof = photo_supply_matched["photo_supply_median"].mean()
percentage_of_roof = percentage_of_roof / 100
self.solar_pv_roof_area = (
self.insulation_floor_area * percentage_of_roof if self.roof["is_flat"] else
self.pitched_roof_area * percentage_of_roof
)
self.solar_pv_percentage = percentage_of_roof

View file

@ -146,6 +146,7 @@ class SearchEpc:
max_retries: int = None,
uprn: [int, None] = None,
size=None,
property_type=None,
):
"""
Address lines 1 and postcode are mandatory fields. The other address lines are optional
@ -157,6 +158,7 @@ class SearchEpc:
:param uprn: int, optional, the uprn of the property
:param size: int, optional, the number of results to return. If not provided, defaults to 25 which is the api's
default
:param property_type: str, optional, the property type of the property, if known before hand
"""
self.address1 = address1
@ -184,6 +186,8 @@ class SearchEpc:
self.size = size if size is not None else 25
self.property_type = property_type
@classmethod
def get_house_number(cls, address: str) -> str | None:
"""
@ -335,7 +339,7 @@ class SearchEpc:
return address, postcode
def extract_epc_data(self, property_type=None, address=None):
def extract_epc_data(self, address=None):
"""
Given a successful search, this method will format the data and return it
@ -351,7 +355,7 @@ class SearchEpc:
# Firstly, we should only have 1 urpn so if we have multiple, we'll need to filter down the
# property further
rows = self.filter_rows(rows, property_type=property_type, address=None)
rows = self.filter_rows(rows, property_type=self.property_type, address=None)
rows = self.filter_rows(rows, property_type=None, address=address)
# We now check for a full sap epc:
@ -366,9 +370,19 @@ class SearchEpc:
# Ge the uprn from the newest record for this home
uprns = {r["uprn"] for r in rows if r["uprn"]}
if len(uprns) != 1:
raise ValueError("Multiple UPRNs found - investigate me")
uprn = uprns.pop()
# We can sometimes have no uprn for a property
if (len(uprns) == 0) and len(rows) > 0:
logger.warning("Found data but missing uprn")
elif len(uprns) != 1:
# There is a possibility that we have multiple UPRNs for a single property, which is an error
addresses = {r["address"] for r in rows}
if len(addresses) == 1:
# Take the uprn from the most recent
uprns = {newest_epc["uprn"]}
else:
raise ValueError("Multiple UPRNs found - investigate me")
uprn = uprns.pop() if uprns else None
return newest_epc, older_epcs, full_sap_epc, address_epc, postcode_epc, uprn
@ -458,7 +472,7 @@ class SearchEpc:
if not epc_data.empty:
# Further processing of the EPC data
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'])
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], format='mixed')
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
epc_data["numeric_house_number"] = epc_data["house_number"].apply(
@ -646,7 +660,7 @@ class SearchEpc:
return agg[key].values[0]
def find_property(self):
def find_property(self, skip_os=False):
"""
This method will attempt to identify a property. It will, at first, use the EPC api to try and
find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
@ -669,6 +683,22 @@ class SearchEpc:
return
# Step 2: If we don't have an EPC, we use the ordnance survey api to find the uprn
if skip_os:
if self.ordnance_survey_client.property_type is not None:
# We can try and estimate
estimated_epc = self.estimate_epc(
property_type=self.ordnance_survey_client.property_type,
built_form=self.ordnance_survey_client.built_form
)
self.newest_epc = estimated_epc
self.older_epcs = []
self.full_sap_epc = {}
# Finally, set a standardised address 1 and postcode
self.address_clean = self.ordnance_survey_client.address_os
self.postcode_clean = self.ordnance_survey_client.postcode_os
return
os_response = self.ordnance_survey_client.get_places_api()
if os_response["status"] != 200:

View file

@ -23,12 +23,13 @@ from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
@ -61,13 +62,6 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
@ -109,17 +103,25 @@ async def trigger_plan(body: PlanTriggerRequest):
if not input_properties:
return Response(status_code=204)
logger.info("Getting spatial data")
for p in input_properties:
p.get_spatial_data(uprn_filenames)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
logger.info("Reading in data sources required for the engine")
materials = get_materials(session)
cleaned = get_cleaned()
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
logger.info("Getting spatial data")
for p in input_properties:
p.get_spatial_data(uprn_filenames)
logger.info("Getting components and epc recommendations")
recommendations = {}
@ -129,7 +131,7 @@ async def trigger_plan(body: PlanTriggerRequest):
for p in input_properties:
# Property recommendations
p.get_components(cleaned)
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(

View file

@ -194,12 +194,15 @@ def create_recommendation_scoring_data(
else:
raise ValueError("Invalid glazing type - implement me")
if recommendation["type"] == "solar_pv":
scoring_dict["PHOTO_SUPPLY_ENDING"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing"
"windows_glazing", "solar_pv"
]:
raise NotImplementedError("Implement me")

View file

@ -121,19 +121,6 @@ def epc_to_sap_lower_bound(epc: str):
raise ValueError("EPC rating should be between A and G")
def read_parquet_from_s3(bucket_name, file_key):
client = boto3.client('s3')
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
df = pd.read_parquet(BytesIO(csv_body))
return df
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.

View file

@ -19,7 +19,9 @@ class PropertyValuation:
100070505235: 344000, # Based on Zoopla's estimation of 131 School road, which is also semi-detached
100070513306: 182000, # Based on Zoopla's estimation of 61 Simmons Drive
100071306896: 77000, # Based on Flat 2 of 44 Wedgewood Road on Zoopla
100021192109: 650000 # Based on Zoopla
100021192109: 650000, # Based on Zoopla
766249482: 358000, # Based on Zoopla estimate for 19 Spring Lane, 3 bedroom semi-detached
100120703802: 277000, # Based on Zoopla
}
# We base our valuation uplifts on a number of sources

View file

@ -2,8 +2,7 @@ import pandas as pd
import requests
from requests.exceptions import RequestException
from utils.logger import setup_logger
from utils.s3 import save_dataframe_to_s3_parquet
from backend.app.utils import read_parquet_from_s3
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
logger = setup_logger()
@ -125,7 +124,7 @@ class ModelApi:
# Retrieve the predictions
predictions_df = pd.DataFrame(
read_parquet_from_s3(
read_dataframe_from_s3_parquet(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)

View file

@ -9,6 +9,7 @@ from etl.epc_clean.EpcClean import EpcClean
mock_epc_response = {
"rows": [
{
"tenure": "rental (social)",
"lmk-key": 1,
"uprn": 1,
"number-habitable-rooms": 5,
@ -17,7 +18,7 @@ mock_epc_response = {
"inspection-date": "2023-06-01",
'lodgement-datetime': '2023-06-01 20:29:01',
"some-other-key": "some-value",
"roof-description": "Roof Description",
"roof-description": "pitched, no insulation",
"walls-description": "Walls Description",
"windows-description": "Windows Description",
"mainheat-description": "Main Heating Description",
@ -168,29 +169,54 @@ mock_epc_response_dupe = {
class TestProperty:
@pytest.fixture(autouse=True)
def property_instance(self, mock_epc_client, mock_cleaner):
property_instance = Property(1, "AB12CD", "Test Address", epc_client=mock_epc_client)
def mock_photo_supply_lookup(self):
return pd.DataFrame(
[
dict(
tenure="rental (social)",
built_form="Detached",
property_type="House",
construction_age_band="England and Wales: 1967-1975",
is_flat=False,
is_pitched=True,
is_roof_room=False,
floor_area_decile=2,
photo_supply_median=40
)
]
)
@pytest.fixture(autouse=True)
def mock_floor_area_decile_thresholds(self):
return pd.DataFrame(
{"floor_area_decile_thresholds": [0, 10, 30, 50]}
)
@pytest.fixture(autouse=True)
def property_instance(self, mock_cleaner):
property_instance = Property(id=1, postcode="AB12CD", address="Test Address", data=mock_epc_response["rows"][0])
return property_instance
@pytest.fixture(autouse=True)
def property_instance_dupe_data(self, mock_epc_client_dupe_data):
property_instance_dupe_data = Property(2, "AB12CD", "Test Address", epc_client=mock_epc_client_dupe_data)
def property_instance_dupe_data(self):
property_instance_dupe_data = Property(id=2, postcode="AB12CD", address="Test Address")
return property_instance_dupe_data
@pytest.fixture
def mock_epc_client(self):
mock_epc_client = Mock(spec=EpcClient(auth_token="mocked_auth_token"))
mock_epc_client.domestic.search.return_value = mock_epc_response.copy()
mock_epc_client.auth_token = "mocked_auth_token"
return mock_epc_client
@pytest.fixture
def mock_epc_client_dupe_data(self):
mock_epc_client_dupe_data = Mock(spec=EpcClient(auth_token="mocked_auth_token"))
mock_epc_client_dupe_data.domestic.search.return_value = mock_epc_response_dupe.copy()
mock_epc_client_dupe_data.auth_token = "mocked_auth_token"
return mock_epc_client_dupe_data
# @pytest.fixture
# def mock_epc_client(self):
# mock_epc_client = Mock(spec=EpcClient(auth_token="mocked_auth_token"))
# mock_epc_client.domestic.search.return_value = mock_epc_response.copy()
# mock_epc_client.auth_token = "mocked_auth_token"
# return mock_epc_client
#
# @pytest.fixture
# def mock_epc_client_dupe_data(self):
# mock_epc_client_dupe_data = Mock(spec=EpcClient(auth_token="mocked_auth_token"))
# mock_epc_client_dupe_data.domestic.search.return_value = mock_epc_response_dupe.copy()
# mock_epc_client_dupe_data.auth_token = "mocked_auth_token"
# return mock_epc_client_dupe_data
@pytest.fixture
def mock_cleaner(self):
@ -229,7 +255,11 @@ class TestProperty:
}
mock_cleaner.cleaned = {
"roof-description": [{"original_description": "Roof Description"}],
"roof-description": [
{"original_description": "Roof Description"},
{"original_description": "pitched, no insulation", "is_pitched": True, "is_flat": False,
"is_roof_room": False}
],
"walls-description": [walls_data],
"windows-description": [{"original_description": "Windows Description"}],
"mainheat-description": [{"original_description": "Main Heating Description"}],
@ -240,37 +270,32 @@ class TestProperty:
}
return mock_cleaner
def test_init(self, mock_epc_client):
inst1 = Property(0, "AB12CD", "Test Address", epc_client=mock_epc_client)
# Should be mocked auth token
assert inst1.epc_client.auth_token == "mocked_auth_token"
def test_init(self):
inst1 = Property(0, postcode="AB12CD", address="Test Address")
inst2 = Property(3, "AB12CD", "Test Address", epc_client=mock_epc_client)
assert inst2.epc_client.auth_token
assert inst1.data is None
inst3 = Property(4, "AB12CD", "Test Address", data={"some": "data"}, epc_client=mock_epc_client)
assert inst3.data == {"some": "data"}
inst2 = Property(3, "AB12CD", "Test Address")
assert inst2.id == 3
data = inst3.search_address_epc()
assert data is None
inst3 = Property(4, "AB12CD", "Test Address", data={"some": "data", "uprn": 123})
assert inst3.data == {"some": "data", "uprn": 123}
def test_search_address_epc(self, property_instance):
# Call the method to test
property_instance.search_address_epc()
# Verify that the correct data is being returned
assert property_instance.data == mock_epc_response["rows"][0]
def test_search_address_epc_multiple_results(self, property_instance_dupe_data, mock_epc_client_dupe_data):
with pytest.raises(Exception, match="More than one result found for this address - investigate me"):
property_instance_dupe_data.search_address_epc()
def test_get_components(self, property_instance, mock_cleaner, mock_epc_client):
property_instance.search_address_epc()
property_instance.get_components(mock_cleaner.cleaned)
def test_get_components(
self, property_instance, mock_cleaner, mock_photo_supply_lookup, mock_floor_area_decile_thresholds
):
property_instance.get_components(
mock_cleaner.cleaned,
photo_supply_lookup=mock_photo_supply_lookup,
floor_area_decile_thresholds=mock_floor_area_decile_thresholds
)
# Verify that the components are set correctly
assert property_instance.roof == {"original_description": "Roof Description"}
assert property_instance.roof == {
'original_description': 'pitched, no insulation', 'is_pitched': True,
'is_flat': False, 'is_roof_room': False
}
assert property_instance.walls == {
"original_description": "Walls Description",
"is_cavity_wall": True,
@ -294,24 +319,15 @@ class TestProperty:
# Verify that ValueError is raised when EpcClean doesn't contain cleaned data
with pytest.raises(ValueError, match="Cleaner does not contain cleaned data"):
property_instance.get_components(mock_cleaner.cleaned)
property_instance.get_components(mock_cleaner.cleaned, pd.DataFrame(), pd.DataFrame())
def test_get_components_no_data(self, property_instance, mock_cleaner):
def test_get_components_no_attributes(
self, property_instance, mock_cleaner, mock_photo_supply_lookup, mock_floor_area_decile_thresholds
):
# Modify the mock cleaner to have no attributes for a specific description
mock_cleaner.cleaned = {
"roof-description": []
}
# Verify that ValueError is raised when no attributes are found
with pytest.raises(ValueError, match="Property does not contain data"):
property_instance.get_components(mock_cleaner.cleaned)
def test_get_components_no_attributes(self, property_instance, mock_cleaner):
# Modify the mock cleaner to have no attributes for a specific description
mock_cleaner.cleaned = {
"roof-description": []
}
property_instance.search_address_epc()
property_instance.data["roof-description"] = "Pitched, no insulation"
property_instance.walls = {
"original_description": "Walls Description",
@ -332,14 +348,17 @@ class TestProperty:
}
# Assert backup cleaning has been applied
property_instance.get_components(mock_cleaner.cleaned)
property_instance.get_components(
mock_cleaner.cleaned, mock_photo_supply_lookup, mock_floor_area_decile_thresholds
)
assert property_instance.roof["clean_description"] == "Pitched, no insulation"
assert property_instance.roof["is_pitched"]
def test_get_components_multiple_attributes(self, property_instance, mock_cleaner):
def test_get_components_multiple_attributes(
self, property_instance, mock_cleaner, mock_photo_supply_lookup, mock_floor_area_decile_thresholds
):
# This shouldn't happen - it would mean a cleaning error
property_instance.search_address_epc()
property_instance.data["roof-description"] = "Roof Description"
cleaned = {
"roof-description": [
@ -350,10 +369,10 @@ class TestProperty:
# Verify that ValueError is raised when multiple attributes are found
with pytest.raises(ValueError, match="Either No attributes or multiple found for roof-description"):
property_instance.get_components(cleaned)
property_instance.get_components(cleaned, mock_photo_supply_lookup, mock_floor_area_decile_thresholds)
def test_set_spatial(self, mock_epc_client):
prop = Property(1, "AB12CD", "Test Address", mock_epc_client)
def test_set_spatial(self):
prop = Property(1, postcode="AB12CD", address="Test Address")
spatial1 = pd.DataFrame([{
'X_COORDINATE': 411143.0, 'Y_COORDINATE': 281701.0, 'LATITUDE': 52.4331896, 'LONGITUDE': -1.8375238,
@ -367,7 +386,7 @@ class TestProperty:
assert prop.is_heritage
assert prop.restricted_measures
prop2 = Property(1, "AB12CD", "Test Address", mock_epc_client)
prop2 = Property(1, "AB12CD", "Test Address")
spatial2 = pd.DataFrame([{
'X_COORDINATE': 411143.0, 'Y_COORDINATE': 281701.0, 'LATITUDE': 52.4331896, 'LONGITUDE': -1.8375238,
@ -381,10 +400,10 @@ class TestProperty:
assert not prop2.is_heritage
assert not prop2.restricted_measures
def test_set_floor_level(self, mock_epc_client):
def test_set_floor_level(self):
# In this case, we have a flat which looks looks it's on the first floor, but it's actually on the ground
# floor, so we should set floor_level to 0
prop = Property(1, "AB12CD", "Test Address", mock_epc_client)
prop = Property(1, postcode="AB12CD", address="Test Address")
prop.data = {'floor-level': '01', 'property-type': 'Flat'}
prop.floor = {
'original_description': 'Solid, no insulation (assumed)', 'clean_description': 'Solid, no insulation',
@ -400,7 +419,7 @@ class TestProperty:
# This property is labelled as being on the ground floor but actually has another property below
# so we set floor level to 1
prop2 = Property(1, "AB12CD", "Test Address", mock_epc_client)
prop2 = Property(1, postcode="AB12CD", address="Test Address")
prop2.data = {'floor-level': 'Ground', 'property-type': 'Flat'}
prop2.floor = {
'original_description': '(Another dwelling below)', 'clean_description': 'Solid, no insulation',
@ -415,7 +434,7 @@ class TestProperty:
assert prop2.floor_level == 1
# this property is correctly labelled as being on the 2nd floor
prop3 = Property(1, "AB12CD", "Test Address", mock_epc_client)
prop3 = Property(1, postcode="AB12CD", address="Test Address")
prop3.data = {'floor-level': '02', 'property-type': 'Flat'}
prop3.floor = {
'original_description': '(Another dwelling below)', 'clean_description': 'Solid, no insulation',
@ -430,7 +449,7 @@ class TestProperty:
assert prop3.floor_level == 2
# Example of a house
prop4 = Property(1, "AB12CD", "Test Address", mock_epc_client)
prop4 = Property(1, postcode="AB12CD", address="Test Address")
prop4.data = {'floor-level': '', 'property-type': 'House'}
prop4.floor = {
'original_description': '(Another dwelling below)', 'clean_description': 'Solid, no insulation',

View file

@ -2,13 +2,11 @@ from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from epc_api.client import EpcClient
import pandas as pd
import pytest
import msgpack
from utils.s3 import read_dataframe_from_s3_parquet, read_from_s3
from tqdm import tqdm
# Handy code for selecting testing data
@ -122,7 +120,21 @@ class TestSapModelPrep:
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def test_fill_cavity_wall(self, cleaned, cleaning_data):
@pytest.fixture
def photo_supply_lookup(self):
photo_supply_lookup = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
return photo_supply_lookup
@pytest.fixture
def floor_area_decile_thresholds(self):
floor_area_decile_thresholds = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="solar_pv_supply/floor_area_decile_thresholds.parquet",
)
return floor_area_decile_thresholds
def test_fill_cavity_wall(self, cleaned, cleaning_data, photo_supply_lookup, floor_area_decile_thresholds):
"""
We ensure that the process that prepares the data in the engine code results in the same data as
the model is trained on
@ -288,11 +300,10 @@ class TestSapModelPrep:
home = Property(
id=0,
postcode=starting_epc["postcode"],
address1=starting_epc["address1"],
epc_client=EpcClient(auth_token="notoken"),
address=starting_epc["address1"],
data=starting_epc
)
home.get_components(cleaned)
home.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([home.get_model_data()]))
@ -356,7 +367,7 @@ class TestSapModelPrep:
assert test_record[c].values[0] == row[c]
def test_internal_wall_insulation(self, cleaned, cleaning_data):
def test_internal_wall_insulation(self, cleaned, cleaning_data, photo_supply_lookup, floor_area_decile_thresholds):
starting_epc2 = {
'low-energy-fixed-light-count': '2', 'address': 'FLAT 12, WAREHOUSE W, 3 WESTERN GATEWAY',
@ -508,11 +519,10 @@ class TestSapModelPrep:
home2 = Property(
id=0,
postcode=starting_epc2["postcode"],
address1=starting_epc2["address1"],
epc_client=EpcClient(auth_token="notoken"),
address=starting_epc2["address1"],
data=starting_epc2
)
home2.get_components(cleaned)
home2.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
home2.set_number_lighting_outlets(None)
data_processor2 = DataProcessor(None, newdata=True)
@ -578,7 +588,7 @@ class TestSapModelPrep:
assert test_record2[c].values[0] == row2[c]
def test_ventilation(self, cleaned, cleaning_data):
def test_ventilation(self, cleaned, cleaning_data, photo_supply_lookup, floor_area_decile_thresholds):
starting_epc3 = {
'low-energy-fixed-light-count': '', 'address': '45 Shepperson Road', 'uprn-source': 'Energy Assessor',
@ -728,11 +738,10 @@ class TestSapModelPrep:
home3 = Property(
id=0,
postcode=starting_epc3["postcode"],
address1=starting_epc3["address1"],
epc_client=EpcClient(auth_token="notoken"),
address=starting_epc3["address1"],
data=starting_epc3
)
home3.get_components(cleaned)
home3.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
home3.set_number_lighting_outlets(None)
data_processor3 = DataProcessor(None, newdata=True)
@ -782,7 +791,7 @@ class TestSapModelPrep:
assert test_record3[c].values[0] == row3[c]
def test_fireplaces(self, cleaned, cleaning_data):
def test_fireplaces(self, cleaned, cleaning_data, photo_supply_lookup, floor_area_decile_thresholds):
starting_epc4 = {
'low-energy-fixed-light-count': '', 'address': '9 Glebe Road, Asfordby Hill',
@ -937,11 +946,10 @@ class TestSapModelPrep:
home4 = Property(
id=0,
postcode=starting_epc4["postcode"],
address1=starting_epc4["address1"],
epc_client=EpcClient(auth_token="notoken"),
address=starting_epc4["address1"],
data=starting_epc4
)
home4.get_components(cleaned)
home4.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
home4.set_number_lighting_outlets(None)
data_processor4 = DataProcessor(None, newdata=True)

View file

@ -33,6 +33,7 @@ class Eligibility:
# If the loft has less than 100mm of insulation, we classify the home has needing loft insulation
LOFT_INSULATION_THRESHOLD = 100
HIGH_LOFT_INSULATION_THRESHOLD = 269
# Because EPCS have different values for tenure, we need to remap them to a common set of values
tenure_remap = {
@ -104,6 +105,8 @@ class Eligibility:
self.LOFT_INSULATION_THRESHOLD if loft_thickness_threshold is None else loft_thickness_threshold
)
high_loft_thickness_threshold = self.HIGH_LOFT_INSULATION_THRESHOLD
# We firstly check if the roof is a loft
is_loft = self.roof["is_pitched"] and (not self.roof["is_roof_room"])
@ -122,7 +125,22 @@ class Eligibility:
is_flat=self.roof["is_flat"]
)
if insulation_thickness > loft_thickness_threshold:
if insulation_thickness <= loft_thickness_threshold:
self.loft = {
"suitability": True,
"thickness": insulation_thickness,
"reason": None
}
if insulation_thickness <= high_loft_thickness_threshold:
self.loft = {
"suitability": True,
"thickness": insulation_thickness,
"reason": "high loft thickness but below regulation"
}
return
if insulation_thickness > high_loft_thickness_threshold:
# Insulation is already thick enough
self.loft = {
"suitability": False,
@ -131,12 +149,6 @@ class Eligibility:
}
return
self.loft = {
"suitability": True,
"thickness": insulation_thickness,
"reason": None
}
def cavity_insulation(self):
"""
@ -161,6 +173,17 @@ class Eligibility:
is_partial_filled_cavity = is_cavity and is_partial_filled
is_underperforming_cavity = is_cavity and is_underperforming
# Check if it has internal or external wall insulation
has_internal_wall_insulation = self.walls["internal_insulation"]
has_external_wall_insulation = self.walls["external_insulation"]
if has_internal_wall_insulation or has_external_wall_insulation:
self.cavity = {
"suitability": False,
"type": "internal or external wall insulation"
}
return
if is_unfilled_cavity:
self.cavity = {
"suitability": True,
@ -333,7 +356,8 @@ class Eligibility:
"""
current_sap = int(self.epc["current-energy-efficiency"])
if current_sap > 54:
if current_sap >= 69:
self.eco4_warmfront = {
"eligible": False,
"message": "sap too high"
@ -347,7 +371,19 @@ class Eligibility:
is_eligible = self.cavity["suitability"] & self.loft["suitability"]
if post_retrofit_sap is None:
message = "subject to post retrofit sap" if is_eligible else "not eligible"
if current_sap >= 55:
message = "Possibly eligible but property currently EPC D"
else:
message = "subject to post retrofit sap" if is_eligible else "not eligible"
# Update the message to flag properties that failed just because of a full cavity.
# We need to double check that the wall is a cavity, that the loft is suitable and that the
# sap is within reason
# We can then estimate the age of the cavity fill
if not is_eligible and (current_sap < 69) and self.loft["suitability"] and self.walls["is_cavity_wall"]:
message = "Failed due to full cavity - check cavity age"
self.eco4_warmfront = {
"eligible": is_eligible,
"message": message

View file

@ -11,13 +11,12 @@ import numpy as np
import msgpack
from datetime import datetime, timedelta
from utils.logger import setup_logger
from utils.s3 import read_from_s3
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from backend.Property import Property
from etl.eligibility.Eligibility import Eligibility
from etl.epc.DataProcessor import DataProcessor
from backend.app.utils import read_parquet_from_s3
from backend.app.plan.utils import create_recommendation_scoring_data
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
@ -247,6 +246,8 @@ def merge_ha_15(asset_list, identified_addresses):
identified_addresses = identified_addresses.drop_duplicates("merge_key")
# We pull out raw counts for the survey lists
# Check asset list for dupes
asset_list_dupes = asset_list["merge_key"].duplicated()
if asset_list_dupes.sum():
@ -337,7 +338,8 @@ def merge_ha_15(asset_list, identified_addresses):
def prepare_model_data_row(
property_id, modelling_epc, cleaned, cleaning_data, created_at, old_data=None, full_sap_epc=None
property_id, modelling_epc, cleaned, cleaning_data, created_at,
photo_supply_lookup, floor_area_decile_thresholds, old_data=None, full_sap_epc=None,
):
"""
This function prepares the data for modelling, in the same fashion as the recommendation engine
@ -348,17 +350,24 @@ def prepare_model_data_row(
p = Property(
id=property_id,
postcode=modelling_epc["postcode"],
address1=modelling_epc["address1"],
epc_client=None,
data=modelling_epc
address=modelling_epc["address1"],
data=modelling_epc,
old_data=old_data,
full_sap_epc=full_sap_epc
)
p.old_data = old_data
p.full_sap_epc = full_sap_epc
p.get_components(cleaned)
p.get_components(cleaned, photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds)
# THIS IS TEMP AND SHOULDN'T BE HERE
data_to_clean = p.get_model_data()
if data_to_clean["NUMBER_HEATED_ROOMS"] in ['', None]:
data_to_clean["NUMBER_HEATED_ROOMS"] = data_to_clean["NUMBER_HABITABLE_ROOMS"]
p.data["number-heated-rooms"] = data_to_clean["NUMBER_HABITABLE_ROOMS"]
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]),
data_to_clean=pd.DataFrame([dict(**data_to_clean, LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
@ -971,6 +980,8 @@ def analyse_ha_15_results(results_df, ha15, no_house_numbers):
results_df["warmfront_identified"]
]
warmfront_identified = warmfront_identified
n_identified = (warmfront_identified["gbis_eligible"] | warmfront_identified["eco4_eligible"]).sum()
success_rate = n_identified / warmfront_identified.shape[0]
@ -1021,6 +1032,11 @@ def analyse_ha_15_results(results_df, ha15, no_house_numbers):
(results_df["eco4_eligible"] == True)
].copy()
new_possibilities_gbis = results_df[
(~results_df["warmfront_identified"]) &
(results_df["eco4_eligible"] == False) & (results_df["gbis_eligible"] == True)
].copy()
# These are future possibilityies
future_possibilities_eco = results_df[
(~results_df["warmfront_identified"]) &
@ -1087,7 +1103,7 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)

View file

@ -1,6 +1,6 @@
import os
import msgpack
import openpyxl
from openpyxl.styles.colors import COLOR_INDEX
from pathlib import Path
from datetime import datetime
import pandas as pd
@ -8,7 +8,7 @@ import numpy as np
from utils.s3 import read_from_s3
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from utils.s3 import read_dataframe_from_s3_parquet
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -16,10 +16,14 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.recommendation_utils import calculate_cavity_age
from recommendation_utils import convert_thickness_to_numeric
import re
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
logger = setup_logger()
load_dotenv(ENV_FILE)
@ -250,24 +254,55 @@ def load_data():
return data, survey_list
def get_epc_data(data, cleaned, cleaning_data, created_at):
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
scoring_data = []
results = []
nodata = []
for _, property_meta in tqdm(data.iterrows(), total=len(data)):
property_type_lookup = {
'Semi Detached Bungalow': {"property-type": "Bungalow", "built-form": "Semi-Detached"},
'Mid Terraced House': {"property-type": "House", "built-form": "Mid-Terrace"},
'End Terraced House': {"property-type": "House", "built-form": "End-Terrace"},
'Low Rise Flat': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Semi-Detached House': {"property-type": "House", "built-form": "Semi-Detached"},
'Detached Bungalow': {"property-type": "Bungalow", "built-form": "Detached"},
'End Terraced Bungalow': {"property-type": "Bungalow", "built-form": "End-Terrace"},
'Mid Terraced Bungalow': {"property-type": "Bungalow", "built-form": "Mid-Terrace"},
'Medium Rise Flat': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Detached House': {"property-type": "House", "built-form": "Detached"},
'Cottage Flat': {"property-type": "Flat", "built-form": "Semi-Detached"},
'Maisonette Medium Rise': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Maisonette Over Shop': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'End Terraced Town House': {"property-type": "House", "built-form": "End-Terrace"},
'Flat Over Shop': {"property-type": "Flat", "built-form": "Mid-Terrace"},
'Mid Terraced Town House': {"property-type": "House", "built-form": "Mid-Terrace"},
}
for index, property_meta in tqdm(data.iterrows(), total=len(data)):
searcher = SearchEpc(
address1=property_meta["HouseNo"],
postcode=property_meta["Postcode"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=property_meta["Address"]
)
searcher.search()
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["Type"]]["property-type"]
searcher.ordnance_survey_client.built_form = property_type_lookup[property_meta["Type"]]["built-form"]
searcher.find_property(skip_os=True)
if searcher.data is None:
if searcher.newest_epc is None:
nodata.append(property_meta)
continue
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(address=property_meta["Address"])
if searcher.newest_epc.get("estimated"):
# We insert the row ID as our proxy for UPRN
proxy_uprn = int(property_meta["row_id"].split("_")[1])
searcher.newest_epc["uprn"] = proxy_uprn
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
@ -277,16 +312,27 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront) and (
property_meta["warmfront_identified"]
):
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
older_epcs = [
x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
]
# We don't update just to make data cleaning easier
if penultimate_epc.get("estimated") is None:
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# If the property is a cavity wall and it's filled, we produce an estimate for the age of the cavity
# Loft MUST be suitable
cavity_age = None
if (
eligibility.walls["is_cavity_wall"] and
eligibility.walls["is_filled_cavity"] and
eligibility.loft["suitability"] and
eligibility.eco4_warmfront["message"] == "Failed due to full cavity - check cavity age"
):
# We check the age of the cavity and if it's particularly old, we flag it
cavity_age = calculate_cavity_age(newest_epc, older_epcs, cleaned)
# Full checks
eligibility.check_gbis()
@ -303,7 +349,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
@ -329,6 +377,10 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
"loft_thickness": eligibility.roof["insulation_thickness"],
"cavity_age": cavity_age,
**eligibility.walls,
**eligibility.roof,
}
)
@ -426,13 +478,93 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
def analyse_results(results_df, data, survey_list):
analysis_data = data[["row_id", "survey_key", "warmfront_identified"]].merge(
analysis_data = data[["row_id", "survey_key", "warmfront_identified", "row_colour_name"]].merge(
results_df, how="left", on="row_id"
).merge(
survey_list[["survey_key", survey_list.columns[0]]].rename(columns={survey_list.columns[0]: "funding_scheme"}),
how="left", on="survey_key"
)
analysis_data["roof_insulation_thickness"] = np.where(
pd.isnull(analysis_data["roof_insulation_thickness"]), None, analysis_data["roof_insulation_thickness"]
)
analysis_data["roof_insulation_thickness_numeric"] = analysis_data["roof_insulation_thickness"].apply(
lambda x: convert_thickness_to_numeric(x, is_flat=False, is_pitched=True)
)
warmfront_sold_eco4 = analysis_data[
(analysis_data["warmfront_identified"] == True) & (
analysis_data["funding_scheme"].isin(["ECO4 A/W", "AFFORDABLE WARMTH"]))
] # 1407
warmfront_sold_gbis = analysis_data[
(analysis_data["warmfront_identified"] == True) & (
analysis_data["funding_scheme"].isin(["ECO4 GBIS (ECO+)"]))
]
ideal_eco4_warmfront_not_sold = analysis_data[
(analysis_data["eco4_eligible"] == True) & (analysis_data["warmfront_identified"] == False) & (
analysis_data["roof_insulation_thickness_numeric"] <= 100)
]
secondary_eco4_warmfront_not_sold = analysis_data[
(analysis_data["eco4_eligible"] == True) & (analysis_data["warmfront_identified"] == False) & (
analysis_data["roof_insulation_thickness_numeric"] > 100)
]
# underperforming cavities
underperforming_cavities = analysis_data[
(analysis_data["eco4_message"] == "Failed due to full cavity - check cavity age") & (
analysis_data["cavity_age"] > 10 * 365
) & (analysis_data["roof_insulation_thickness_numeric"] <= 100)
]
identified_gbis_not_sold = analysis_data[
(analysis_data["gbis_eligible"] == True) & (analysis_data["warmfront_identified"] == False) & (
analysis_data["eco4_eligible"] == False
)
]
eco_eligible = analysis_data[analysis_data["eco4_eligible"] == True]
eco_ineligible = analysis_data[analysis_data["eco4_eligible"] == False]
eco_ineligible["eco4_message"].value_counts()
# SAP too high:
sap_too_high = eco_ineligible[eco_ineligible["eco4_message"] == "sap too high"].copy()
further_possibilities = sap_too_high[
sap_too_high["walls"].isin(
[
"Cavity wall, as built, insulated",
"Cavity wall, as built, no insulation",
"Cavity wall, as built, partial insulation",
"Cavity wall, no insulation",
"Cavity wall, partial insulation"
]
)
]
filled_cavities = eco_ineligible[
eco_ineligible["eco4_message"] == "sap too high"
]
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
warmfront_identified["walls"].value_counts()
all_identified_gbis = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 GBIS (ECO+)"])) |
(analysis_data["gbis_eligible"] & analysis_data["eco4_eligible"].isin([False, None]))
]
empty_cavity_desriptions = [
"Cavity wall, as built, no insulation", "Cavity wall, as built, partial insulation",
"Cavity wall, no insulation", "Cavity wall, partial insulation"
]
empty_cavities = analysis_data[analysis_data["walls"].isin(empty_cavity_desriptions)]
remaining_empty = empty_cavities[~empty_cavities["warmfront_identified"]]
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
# Of the ECO jobs, what proportion to we get right
@ -482,17 +614,22 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_epc_data(data, cleaned, cleaning_data, created_at)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
results_df, scoring_data, nodata = get_epc_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Store
# Old file was ha16.pickle
# import pickle
# with open("ha16.pickle", "wb") as f:
# with open("ha16_10_jan.pickle", "wb") as f:
# pickle.dump(
# {
# "scoring_data": scoring_data,
@ -500,3 +637,11 @@ def app():
# "nodata": nodata
# }, f
# )
# Read pickle
# import pickle
# with open("ha16_10_jan.pickle", "rb") as f:
# saved = pickle.load(f)
# scoring_data = saved["scoring_data"]
# results_df = saved["results"]
# nodata = saved["nodata"]

View file

@ -1,14 +1,13 @@
import os
import msgpack
import openpyxl
from openpyxl.styles.colors import COLOR_INDEX
from pathlib import Path
from datetime import datetime
import pandas as pd
import numpy as np
from utils.s3 import read_from_s3
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -16,9 +15,11 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.recommendation_utils import calculate_cavity_age
from recommendation_utils import convert_thickness_to_numeric
import re
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
logger = setup_logger()
@ -170,24 +171,46 @@ def load_data():
return data, survey_list
def get_epc_data(data, cleaned, cleaning_data, created_at):
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
scoring_data = []
results = []
nodata = []
property_type_lookup = {
"01 HOUSE": "House",
"02 FLAT": "Flat",
"03 BUNGALOW": "Bungalow",
"05 BEDSIT": "Flat",
"04 MAISONETTE": "Maisonette",
"01 HOUSE MID": "House",
"10 PBUNGALOW": "Bungalow",
"14 SFLAT": "Flat",
"12 SBEDSIT": "Flat",
"11 PFLAT": "Flat",
"13 SBUNGALOW": "Bungalow",
" 01 HOUSE MID": "House",
"09 PBEDSIT": "Flat"
}
for _, property_meta in tqdm(data.iterrows(), total=len(data)):
searcher = SearchEpc(
address1=property_meta["HouseNo"],
postcode=property_meta["Postcode"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=property_meta["Address"]
)
searcher.search()
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["Property Type"]]
searcher.find_property(skip_os=True)
if searcher.data is None:
if searcher.newest_epc is None:
nodata.append(property_meta)
continue
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(address=property_meta["Address"])
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
@ -197,23 +220,36 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront) and (
property_meta["warmfront_identified"]
):
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
older_epcs = [
x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
]
# older_epcs = [
# x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
# ]
# If this is the case, we need to update the older epcs
# We don't update just to make data cleaning easier
if penultimate_epc.get("estimated") is None:
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# Loft MUST be suitable
cavity_age = None
if (
eligibility.walls["is_cavity_wall"] and
eligibility.walls["is_filled_cavity"] and
eligibility.loft["suitability"] and
eligibility.eco4_warmfront["message"] == "Failed due to full cavity - check cavity age"
):
# We check the age of the cavity and if it's particularly old, we flag it
cavity_age = calculate_cavity_age(newest_epc, older_epcs, cleaned)
# Full checks
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4_warmfront["eligible"]:
if eligibility.epc["uprn"] == "":
if eligibility.epc["uprn"] in ["", None]:
eligibility.epc["uprn"] = int(property_meta["row_id"].split("_")[1])
scoring_dictionary = prepare_model_data_row(
@ -223,7 +259,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
@ -249,6 +287,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
"cavity_age": cavity_age,
**eligibility.walls,
**eligibility.roof,
}
)
@ -277,7 +318,7 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
scoring_df = DataProcessor.clean_efficiency_variables(scoring_df)
scoring_df["UPRN"] = scoring_df["UPRN"].astype(int)
model_api = ModelApi(portfolio_id="ha33-eligibility", timestamp=created_at)
model_api = ModelApi(portfolio_id="ha24-eligibility", timestamp=created_at)
all_predictions = model_api.predict_all(
df=scoring_df,
bucket="retrofit-data-dev",
@ -353,6 +394,54 @@ def analyse_results(results_df, data, survey_list):
how="left", on="survey_key"
)
# NEW
analysis_data["roof_insulation_thickness"] = np.where(
pd.isnull(analysis_data["roof_insulation_thickness"]), None, analysis_data["roof_insulation_thickness"]
)
analysis_data["roof_insulation_thickness_numeric"] = analysis_data["roof_insulation_thickness"].apply(
lambda x: convert_thickness_to_numeric(x, is_flat=False, is_pitched=True)
)
warmfront_sold_eco4 = analysis_data[
(analysis_data["warmfront_identified"] == True) & (
analysis_data["funding_scheme"].isin(["ECO4 A/W", "AFFORDABLE WARMTH"]))
]
warmfront_sold_gbis = analysis_data[
(analysis_data["warmfront_identified"] == True) & (
analysis_data["funding_scheme"].isin(["ECO4 GBIS (ECO+)"]))
]
# 1407
additional_eco4_warmfront_not_sold = analysis_data[
(analysis_data["eco4_eligible"] == True) & (analysis_data["warmfront_identified"] == False) & (
analysis_data["roof_insulation_thickness_numeric"] <= 100)
]
additional_gbis_warmfront_not_sold = analysis_data[
(analysis_data["gbis_eligible"] == True) & (analysis_data["warmfront_identified"] == False) & (
~analysis_data["row_id"].isin(additional_eco4_warmfront_not_sold["row_id"].values)
)
]
additional_gbis_warmfront_not_sold["walls"].value_counts()
analysis_data["walls"].value_counts()
# END NEW
all_identified_eco = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 A/W"])) |
(analysis_data["eco4_eligible"])
]
all_identified_gbis = analysis_data[
(analysis_data["warmfront_identified"] & analysis_data["funding_scheme"].isin(
["ECO4 GBIS (ECO+)"])) |
(analysis_data["gbis_eligible"] & analysis_data["eco4_eligible"].isin([False, None]))
]
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
# Of the ECO jobs, what proportion to we get right
@ -403,17 +492,21 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_epc_data(data, cleaned, cleaning_data, created_at)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
results_df, scoring_data, nodata = get_epc_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Pickle results just in case
# import pickle
# with open("ha24.pickle", "wb") as f:
# with open("ha24_10_jan.pickle", "wb") as f:
# pickle.dump(
# {
# "scoring_data": scoring_data,
@ -421,3 +514,11 @@ def app():
# "nodata": nodata
# }, f
# )
# Read in pickle
# import pickle
# with open("ha24_10_jan.pickle", "rb") as f:
# saved = pickle.load(f)
# scoring_data = saved["scoring_data"]
# results_df = saved["results"]
# nodata = saved["nodata"]

View file

@ -1,6 +1,6 @@
import os
import msgpack
import openpyxl
from openpyxl.styles.colors import COLOR_INDEX
from pathlib import Path
from datetime import datetime
import pandas as pd
@ -8,7 +8,7 @@ import numpy as np
from utils.s3 import read_from_s3
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from utils.s3 import read_dataframe_from_s3_parquet
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -16,9 +16,13 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.recommendation_utils import calculate_cavity_age
from recommendation_utils import convert_thickness_to_numeric
import re
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
logger = setup_logger()
@ -272,55 +276,150 @@ def load_data():
)
data["warmfront_identified"] = data["warmfront_identified"].fillna(False)
return data, eco4_prospects_survey_list
lost_identified_properties = eco4_prospects_survey_list[
~eco4_prospects_survey_list["survey_key"].isin(matched["survey_key"])
]
return data, eco4_prospects_survey_list, lost_identified_properties
def get_epc_data(data, cleaned, cleaning_data, created_at):
def map_year_to_age_band(year):
try:
year = int(year)
except ValueError:
return "Invalid Year" # Or any other way you want to handle invalid inputs
if year < 1900:
return "England and Wales: before 1900"
elif 1900 <= year <= 1929:
return "England and Wales: 1900-1929"
elif 1930 <= year <= 1949:
return "England and Wales: 1930-1949"
elif 1950 <= year <= 1966:
return "England and Wales: 1950-1966"
elif 1967 <= year <= 1975:
return "England and Wales: 1967-1975"
elif 1976 <= year <= 1982:
return "England and Wales: 1976-1982"
elif 1983 <= year <= 1990:
return "England and Wales: 1983-1990"
elif 1991 <= year <= 1995:
return "England and Wales: 1991-1995"
elif 1996 <= year <= 2002:
return "England and Wales: 1996-2002"
elif 2003 <= year <= 2006:
return "England and Wales: 2003-2006"
elif 2007 <= year <= 2011:
return "England and Wales: 2007-2011"
else: # Assuming all remaining years are 2012 onwards
return "England and Wales: 2012 onwards"
def get_epc_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
scoring_data = []
results = []
nodata = []
for _, property_meta in tqdm(data.iterrows(), total=len(data)):
property_type_lookup = {
"Flat": {"property-type": "Flat", "built-form": None},
"Mid Terrace House": {"property-type": "House", "built-form": "Mid-Terrace"},
"End Terrace House": {"property-type": "House", "built-form": "End-Terrace"},
"Maisonnette": {"property-type": "Flat", "built-form": None},
"Semi Detached House": {"property-type": "House", "built-form": "Semi-Detached"},
"Detached House": {"property-type": "House", "built-form": "Detached"},
"Coach House": {"property-type": "House", "built-form": "Detached"},
"Bungalow": {"property-type": "Bungalow", "built-form": None},
"Detached Bungalow": {"property-type": "Bungalow", "built-form": "Detached"},
"House": {"property-type": "House", "built-form": None},
"Semi Detached Bung": {"property-type": "Bungalow", "built-form": "Semi-Detached"},
"Bedspace": {"property-type": None, "built-form": None},
"Office Buildings": {"property-type": None, "built-form": None},
"End Terrace Bungalow": {"property-type": "Bungalow", "built-form": "End-Terrace"},
"Mid Terrace Bungalow": {"property-type": "Bungalow", "built-form": "Mid-Terrace"},
"Bedsit": {"property-type": "Flat", "built-form": None},
"Mid Terrace Housekeeping": {"property-type": "House", "built-form": "Mid-Terrace"},
"Mid Terrace Housekeeping ": {"property-type": "House", "built-form": "Mid-Terrace"},
"End Terrace Housex": {"property-type": "House", "built-form": "End-Terrace"},
"Guest Room": {"property-type": None, "built-form": None}
}
for _, property_meta in tqdm(data, total=len(data)):
searcher = SearchEpc(
address1=property_meta["HouseNo"],
postcode=property_meta["postcode"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=property_meta["address"]
)
searcher.search()
searcher.ordnance_survey_client.property_type = property_type_lookup[property_meta["T1_AssetType"]][
"property-type"]
searcher.ordnance_survey_client.built_form = property_type_lookup[property_meta["T1_AssetType"]]["built-form"]
searcher.find_property(skip_os=True)
if searcher.data is None:
if searcher.newest_epc is None:
nodata.append(property_meta)
continue
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(address=property_meta["T1_Address"])
if searcher.newest_epc.get("estimated"):
# We insert the row ID as our proxy for UPRN
proxy_uprn = int(property_meta["row_id"].split("_")[1])
searcher.newest_epc["uprn"] = proxy_uprn
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
penultimate_epc = newest_epc
# penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
# if not penultimate_epc:
# penultimate_epc = newest_epc
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront) and (
property_meta["warmfront_identified"]
# if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
# eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
# eligibility.check_gbis_warmfront()
# eligibility.check_eco4_warmfront()
# # If this is the case, we need to update the older epcs
# # We don't update just to make data cleaning easier
# if penultimate_epc.get("estimated") is None:
# older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# If the property is a cavity wall and it's filled, we produce an estimate for the age of the cavity
# Loft MUST be suitable
cavity_age = None
if (
eligibility.walls["is_cavity_wall"] and
eligibility.walls["is_filled_cavity"] and
eligibility.loft["suitability"] and
eligibility.eco4_warmfront["message"] == "Failed due to full cavity - check cavity age"
):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
older_epcs = [
x for x in older_epcs if x["lmk-key"] not in [newest_epc["lmk-key"], penultimate_epc["lmk-key"]]
]
# We check the age of the cavity and if it's particularly old, we flag it
cavity_age = calculate_cavity_age(newest_epc, older_epcs, cleaned)
# Full checks
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4_warmfront["eligible"]:
if eligibility.epc["uprn"] == "":
if eligibility.epc["uprn"] in ["", None]:
eligibility.epc["uprn"] = int(property_meta["row_id"].split("_")[1])
if eligibility.epc["construction-age-band"] in ["", None]:
eligibility.epc["construction-age-band"] = map_year_to_age_band(property_meta["Build Yr"])
# This is not the right place to do this but this is temp
if eligibility.epc["extension-count"] in ["", None]:
eligibility.epc["extension-count"] = 0
# Not in the right place but temp
if eligibility.epc["built-form"] in ["", None]:
if not older_epcs:
eligibility.epc["built-form"] = "Mid-Terrace"
scoring_dictionary = prepare_model_data_row(
property_id=property_meta["row_id"],
modelling_epc=eligibility.epc,
@ -328,7 +427,9 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds,
)
scoring_data.extend(scoring_dictionary)
@ -354,6 +455,237 @@ def get_epc_data(data, cleaned, cleaning_data, created_at):
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
"cavity_age": cavity_age,
**eligibility.walls,
**eligibility.roof,
}
)
scoring_df = pd.DataFrame(scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
scoring_df = DataProcessor.apply_averages_cleaning(
data_to_clean=scoring_df,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
scoring_df = DataProcessor.apply_averages_cleaning(
data_to_clean=scoring_df,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
scoring_df = DataProcessor.clean_missings_after_description_process(
scoring_df,
ignore_cols=[c for c in scoring_df.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)]
)
scoring_df = DataProcessor.clean_efficiency_variables(scoring_df)
scoring_df["UPRN"] = scoring_df["UPRN"].astype(int)
model_api = ModelApi(portfolio_id="ha33-eligibility", timestamp=created_at)
all_predictions = model_api.predict_all(
df=scoring_df,
bucket="retrofit-data-dev",
prediction_buckets={
"sap_change_predictions": "retrofit-sap-predictions-dev",
"heat_demand_predictions": "retrofit-heat-predictions-dev",
"carbon_change_predictions": "retrofit-carbon-predictions-dev"
}
)
predictions = all_predictions["sap_change_predictions"].copy()
results_df = pd.DataFrame(results)
predictions = predictions.rename(columns={"property_id": "row_id"}).merge(
results_df[["row_id", "sap"]], how="left", on="row_id"
)
predictions["sap_uplift"] = predictions["predictions"] - predictions["sap"]
predictions = predictions.groupby("row_id")["sap_uplift"].sum().reset_index()
results_df = results_df.merge(
predictions[["sap_uplift", "row_id"]],
how="left",
on="row_id"
)
results_df["post_install_sap"] = results_df["sap"] + results_df["sap_uplift"]
eligibility_assessment = []
for _, row in results_df[results_df["eco4_eligible"] == True].iterrows():
# The upgrade requirements are dependent on the current SAP
# If the property is an F or G, it only needs to upgrade to an %
if row["sap"] <= 38:
if row["post_install_sap"] >= 57:
eligibility_classification = "highest confidence"
elif row["post_install_sap"] >= 55:
eligibility_classification = "high confidence"
elif row["post_install_sap"] >= 53:
eligibility_classification = "medium confidence"
else:
eligibility_classification = "unlikely"
else:
if row["post_install_sap"] >= 71:
eligibility_classification = "highest confidence"
elif row["post_install_sap"] >= 69:
eligibility_classification = "high confidence"
elif row["post_install_sap"] >= 67:
eligibility_classification = "medium confidence"
else:
eligibility_classification = "unlikely"
eligibility_assessment.append(
{
"row_id": row["row_id"],
"eligibility_classification": eligibility_classification
}
)
eligibility_assessment = pd.DataFrame(eligibility_assessment)
results_df = results_df.merge(
eligibility_assessment, how="left", on="row_id"
)
return results_df, scoring_data, nodata
def get_epc_data_for_lost_surveys(
lost_identified_properties, cleaned, cleaning_data, created_at, photo_supply_lookup,
floor_area_decile_thresholds
):
lost_identified_properties["row_id"] = [
"lost_surveys_ha25_" + str(i) for i in range(0, len(lost_identified_properties))
]
scoring_data = []
results = []
nodata = []
property_type_lookup = {
"MID-TERRACE": {"property-type": "House", "built-form": "Mid-Terrace"},
"N/A": {"property-type": "House", "built-form": None},
"END-TERRACE": {"property-type": "House", "built-form": "End-Terrace"},
"GROUND-FLOOR": {"property-type": "House", "built-form": None},
"TOP-FLOOR": {"property-type": "House", "built-form": None},
"SEMI-DETACHED": {"property-type": "House", "built-form": "Semi-Detached"},
"MID-FLOOR": {"property-type": "House", "built-form": None},
"TOP-FLOOR FLAT": {"property-type": "House", "built-form": None},
"DETACHED": {"property-type": "House", "built-form": "Detached"},
"MID-FLOOR FLAT": {"property-type": "House", "built-form": None},
"SEMI- DETACHED": {"property-type": "House", "built-form": "Semi-Detached"},
"NO EPC ON GOV": {"property-type": "House", "built-form": None},
"Top-floor flat": {"property-type": "House", "built-form": None},
"GROUND-FLOOR FLAT": {"property-type": "House", "built-form": None},
"NOT ON GOV SITE": {"property-type": "House", "built-form": None}
}
for _, property_meta in tqdm(lost_identified_properties.iterrows(), total=len(lost_identified_properties)):
if property_meta["POSTCODE"] is None:
continue
full_address = ", ".join(
[str(x) for x in [
property_meta["NO"], property_meta["ADDRESS 1"], property_meta["ADDRESS 2"], property_meta["ADDRESS 3"]
] if x is not None]
)
searcher = SearchEpc(
address1=str(property_meta["NO"]),
postcode=property_meta["POSTCODE"],
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
full_address=full_address
)
property_type_key = property_meta["PROPERTY TYPE"]
if property_type_key is not None:
searcher.ordnance_survey_client.property_type = property_type_lookup[property_type_key.strip()][
"property-type"]
searcher.ordnance_survey_client.built_form = property_type_lookup[property_type_key.strip()][
"built-form"]
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
nodata.append(property_meta)
continue
if searcher.newest_epc.get("estimated"):
# We insert the row ID as our proxy for UPRN
proxy_uprn = int(property_meta["row_id"].split("_")[-1])
searcher.newest_epc["uprn"] = proxy_uprn
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
# We also want to get the penultimate epc
penultimate_epc, _ = searcher.filter_newest_epc(older_epcs)
if not penultimate_epc:
penultimate_epc = newest_epc
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
if (not eligibility.eco4_warmfront["eligible"]) and (not eligibility.gbis_warmfront):
eligibility = Eligibility(epc=penultimate_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If this is the case, we need to update the older epcs
# We don't update just to make data cleaning easier
if penultimate_epc.get("estimated") is None:
older_epcs = [x for x in searcher.data["rows"] if x["lmk-key"] != penultimate_epc["lmk-key"]]
# Full checks
eligibility.check_gbis()
eligibility.check_eco4()
if eligibility.eco4_warmfront["eligible"] & (eligibility.epc["construction-age-band"] not in ["", None]):
if eligibility.epc["uprn"] in ["", None]:
eligibility.epc["uprn"] = int(property_meta["row_id"].split("_")[1])
scoring_dictionary = prepare_model_data_row(
property_id=property_meta["row_id"],
modelling_epc=eligibility.epc,
cleaned=cleaned,
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds,
)
scoring_data.extend(scoring_dictionary)
results.append(
{
"row_id": property_meta["row_id"],
"uprn": eligibility.epc["uprn"],
"Address": property_meta["ADDRESS 1"],
"Postcode": property_meta["POSTCODE"],
"property_type": eligibility.epc["property-type"],
"gbis_eligible": eligibility.gbis_warmfront,
"eco4_eligible": eligibility.eco4_warmfront["eligible"],
"eco4_message": eligibility.eco4_warmfront["message"],
"sap": float(eligibility.epc["current-energy-efficiency"]),
"gbis_eligible_future": eligibility.gbis["eligible"],
"gbis_eligible_future_message": eligibility.gbis["message"],
"eco4_eligible_future": eligibility.eco4["eligible"],
"eco4_eligible_future_message": eligibility.eco4["message"],
# Property components
"roof": eligibility.roof["clean_description"],
"walls": eligibility.walls["clean_description"],
"cavity_type": eligibility.cavity["type"],
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
**eligibility.walls,
**eligibility.roof,
}
)
@ -455,43 +787,60 @@ def analyse_results(results_df, data, eco4_prospects_survey_list):
results_df, how="left", on="row_id"
)
warmfront_identified = analysis_data[analysis_data["warmfront_identified"]]
analysis_data = analysis_data.merge(
eco4_prospects_survey_list[["survey_key", "ADDRESS 1", "NO", "POSTCODE"]],
how="left", on="survey_key"
)
# Of the ECO jobs, what proportion to we get right
# NEW
analysis_data["roof_insulation_thickness"] = np.where(
pd.isnull(analysis_data["roof_insulation_thickness"]), None, analysis_data["roof_insulation_thickness"]
)
analysis_data["roof_insulation_thickness_numeric"] = analysis_data["roof_insulation_thickness"].apply(
lambda x: convert_thickness_to_numeric(x, is_flat=False, is_pitched=True)
)
success_rate = (warmfront_identified["eco4_eligible"] | warmfront_identified["gbis_eligible"]).sum() / \
warmfront_identified.shape[
0]
warmfront_identified = analysis_data[
(analysis_data["warmfront_identified"] == True)
] # 2204
# No gbis for this
# gbis_success_rate = warmfront_identified_gbis["gbis_eligible"].sum() / warmfront_identified_gbis.shape[0]
# Because we don't know which property is for which scheme, we'll just look at what we found
ideal_eco4 = analysis_data[
(analysis_data["eco4_eligible"] == True) &
(analysis_data["roof_insulation_thickness_numeric"] <= 100) &
(analysis_data["sap"] <= 54)
] # 335
# Additional identified
additional_identified_eco = analysis_data[
(analysis_data["eco4_eligible"] == True) & (analysis_data["warmfront_identified"] == False)
gbis = analysis_data[
(analysis_data["gbis_eligible"] == True) &
~analysis_data["row_id"].isin(ideal_eco4["row_id"].values)
]
additional_identified_eco["eligibility_classification"].value_counts()
ideal_eco4 = ideal_eco4[ideal_eco4["sap"] <= 54]
additional_identified_gbis = analysis_data[
(analysis_data["gbis_eligible"] == True) & (analysis_data["eco4_eligible"] == False) & (
analysis_data["warmfront_identified"] == False
)
].shape[0]
# Future
additional_identified_eco_future = analysis_data[
(analysis_data["eco4_eligible_future"] == True) & (analysis_data["warmfront_identified"] == False)
].shape[0]
additional_identified_gbis_future = analysis_data[
(analysis_data["gbis_eligible_future"] == True) & (analysis_data["eco4_eligible_future"] == False) & (
analysis_data["warmfront_identified"] == False
)
].shape[0]
def analyse_lost_surveys(results_df):
results_df["roof_insulation_thickness"] = np.where(
pd.isnull(results_df["roof_insulation_thickness"]), None, results_df["roof_insulation_thickness"]
)
results_df["roof_insulation_thickness_numeric"] = results_df["roof_insulation_thickness"].apply(
lambda x: convert_thickness_to_numeric(x, is_flat=False, is_pitched=True)
)
ideal_eco4 = results_df[
(results_df["eco4_eligible"] == True) &
(results_df["roof_insulation_thickness_numeric"] <= 100) &
(results_df["sap"] <= 54)
] # 25
gbis = results_df[
(results_df["gbis_eligible"] == True) &
~results_df["row_id"].isin(ideal_eco4["row_id"].values)
] # 82
def app():
data, eco4_prospects_survey_list = load_data()
data, eco4_prospects_survey_list, lost_identified_properties = load_data()
data["row_id"] = ["ha25_" + str(i) for i in range(0, len(data))]
@ -501,16 +850,21 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_epc_data(data, cleaned, cleaning_data, created_at)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
results_df, scoring_data, nodata = get_epc_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Pickle the outputs
# Old data was ha25.pickle
# import pickle
# with open("ha25.pickle", "wb") as f:
# with open("ha25_10_jan.pickle", "wb") as f:
# pickle.dump(
# {
# "results_df": results_df,
@ -519,3 +873,11 @@ def app():
# },
# f
# )
# Load in pickle
import pickle
with open("ha25_10_jan.pickle", "rb") as f:
saved = pickle.load(f)
results_df = saved["results_df"]
scoring_data = saved["scoring_data"]
nodata = saved["nodata"]

View file

@ -1,3 +1,4 @@
import os
import msgpack
from pathlib import Path
from datetime import datetime
@ -6,7 +7,7 @@ import pandas as pd
from utils.s3 import read_from_s3
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from utils.s3 import read_dataframe_from_s3_parquet
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -14,9 +15,13 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.recommendation_utils import calculate_cavity_age
from recommendation_utils import convert_thickness_to_numeric
import re
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
logger = setup_logger()
@ -52,7 +57,7 @@ def standardise_ha_4(data):
return data
def get_ha_4_data(data, cleaned, cleaning_data, created_at):
def get_ha_4_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
scoring_data = []
results = []
nodata = []
@ -62,19 +67,33 @@ def get_ha_4_data(data, cleaned, cleaning_data, created_at):
searcher = SearchEpc(
address1=property_meta["Address Line 1"],
postcode=property_meta["Post Code"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
property_type=property_type_lookup.get(house["Archetype"]),
)
searcher.search()
searcher.find_property(skip_os=True)
if searcher.data is None:
if searcher.newest_epc is None:
searcher = SearchEpc(
address1=property_meta["Location Name"],
postcode=property_meta["Post Code"],
size=1000
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
property_type=property_type_lookup.get(house["Archetype"]),
)
searcher.search()
if searcher.newest_epc is None:
nodata.append(house["row_id"])
continue
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
searcher.search()
if searcher.data is None:
nodata.append(property_meta.to_dict())
continue
@ -273,17 +292,21 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
created_at = datetime.now().isoformat()
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
results_df, scoring_data, nodata = get_ha_4_data(
data=data,
cleaned=cleaned,
cleaning_data=cleaning_data,
created_at=created_at
created_at=created_at,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
# Store the data locally as a pickle

View file

@ -1,3 +1,4 @@
import os
import msgpack
import openpyxl
from openpyxl.styles.colors import COLOR_INDEX
@ -5,10 +6,9 @@ from pathlib import Path
from datetime import datetime
import pandas as pd
import numpy as np
from utils.s3 import read_from_s3
from utils.s3 import read_from_s3, read_dataframe_from_s3_parquet
from utils.logger import setup_logger
from dotenv import load_dotenv
from backend.app.utils import read_parquet_from_s3
from tqdm import tqdm
from backend.SearchEpc import SearchEpc
from etl.eligibility.Eligibility import Eligibility
@ -16,14 +16,18 @@ from etl.eligibility.ha_15_32.app import prepare_model_data_row
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
import re
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.recommendation_utils import calculate_cavity_age
from recommendation_utils import convert_thickness_to_numeric
ENV_FILE = Path(__file__).parent / "etl" / "eligibility" / "ha_15_32" / ".env"
logger = setup_logger()
load_dotenv(ENV_FILE)
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
OS_API_KEY = os.getenv("ORDNANCE_SURVEY_API_KEY")
def load_data():
"""
@ -66,12 +70,16 @@ def load_data():
return df
def get_ha7_data(data, cleaned, cleaning_data, created_at):
def get_ha7_data(data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds):
property_type_lookup = {
"Mid Terrace": "Mid-Terrace",
"End Terrace": "End-Terrace",
"Semi Detached": "Semi-Detached",
"Detached": "Detached",
# "Mid Terrace": "Mid-Terrace",
# "End Terrace": "End-Terrace",
# "Semi Detached": "Semi-Detached",
# "Detached": "Detached",
"House": "House",
"Flat": "Flat",
"Bungalow": "Bungalow",
"Maisonette": "Maisonette",
}
scoring_data = []
@ -79,25 +87,46 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
nodata = []
for _, house in tqdm(data.iterrows(), total=len(data)):
if house["Address"]:
address = house["Address"]
else:
address = house["Address2"]
searcher = SearchEpc(
address1=house["Address"],
postcode=house["Postcode"]
address1=address,
postcode=house["Postcode"],
auth_token=EPC_AUTH_TOKEN,
os_api_key=None,
property_type=property_type_lookup.get(house["Archetype"]),
)
response = searcher.search()
if response["status"] == 204:
nodata.append(house)
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
nodata.append(house["row_id"])
continue
newest_epc, older_epcs, full_sap_epc = searcher.retrieve(
property_type=property_type_lookup.get(house["Property Type"], None),
address=house["Address"],
)
newest_epc = searcher.newest_epc
older_epcs = searcher.older_epcs
full_sap_epc = searcher.full_sap_epc
eligibility = Eligibility(epc=newest_epc, cleaned=cleaned)
eligibility.check_gbis_warmfront()
eligibility.check_eco4_warmfront()
# If the property is a cavity wall and it's filled, we produce an estimate for the age of the cavity
# Loft MUST be suitable
cavity_age = None
if (
eligibility.walls["is_cavity_wall"] and
eligibility.walls["is_filled_cavity"] and
eligibility.loft["suitability"] and
eligibility.eco4_warmfront["message"] == "Failed due to full cavity - check cavity age"
):
# We check the age of the cavity and if it's particularly old, we flag it
cavity_age = calculate_cavity_age(newest_epc, older_epcs, cleaned)
# If the house is not identified, we do a full gbis and eco4 check
eligibility.check_gbis()
eligibility.check_eco4()
@ -110,7 +139,9 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
cleaning_data=cleaning_data,
created_at=created_at,
old_data=older_epcs,
full_sap_epc=full_sap_epc
full_sap_epc=full_sap_epc,
photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds
)
scoring_data.extend(scoring_dictionary)
@ -134,6 +165,10 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
"heating": eligibility.epc["mainheat-description"],
"tenure": eligibility.tenure,
"date_epc": eligibility.epc["lodgement-date"],
**newest_epc,
"cavity_age": cavity_age,
**eligibility.walls,
**eligibility.roof,
}
)
@ -233,13 +268,62 @@ def get_ha7_data(data, cleaned, cleaning_data, created_at):
def analyse_ha_7(results_df, data):
df = results_df.merge(
data[["row_id", "row_code", "Property Type"]], how="left", on="row_id"
analysis_data = results_df.merge(
data[["row_id", "row_code", "Property Type", "Construction Year Band"]], how="left", on="row_id"
)
warmfront_identification = df["row_code"].value_counts()
warmfront_identified = df[df["row_code"] == "potential ECO4"]
property_types = df["Property Type"].value_counts()
analysis_data["row_code"].value_counts()
# NEW
analysis_data["roof_insulation_thickness"] = np.where(
pd.isnull(analysis_data["roof_insulation_thickness"]), None, analysis_data["roof_insulation_thickness"]
)
analysis_data["roof_insulation_thickness_numeric"] = analysis_data["roof_insulation_thickness"].apply(
lambda x: convert_thickness_to_numeric(x, is_flat=False, is_pitched=True)
)
ideal_eco4 = analysis_data[
(analysis_data["eco4_eligible"] == True) & (
analysis_data["roof_insulation_thickness_numeric"] <= 100)
]
secondary_eco4_warmfront_not_sold = analysis_data[
(analysis_data["eco4_eligible"] == True) & (
analysis_data["roof_insulation_thickness_numeric"] > 100)
]
# underperforming cavities
underperforming_cavities = analysis_data[
(analysis_data["eco4_message"] == "Failed due to full cavity - check cavity age") & (
analysis_data["cavity_age"] > 9 * 365
) & (analysis_data["roof_insulation_thickness_numeric"] <= 100)
]
identified_gbis_not_sold = analysis_data[
(analysis_data["gbis_eligible"] == True) & (
analysis_data["eco4_eligible"] == False
)
]
wf_identified = analysis_data[
(analysis_data["row_code"] == "potential ECO4")
]
# END NEW
warmfront_identification = analysis_data["row_code"].value_counts()
warmfront_identified = analysis_data[analysis_data["row_code"] == "potential ECO4"]
warmfront_identified["walls"].value_counts(normalize=True)
analysis_data["Construction Year Band"].value_counts(normalize=True)
# Number of days from today
days_to_today = (datetime.now() - pd.to_datetime(warmfront_identified["date_epc"])).dt.days
days_to_today.mean()
property_types = analysis_data["Property Type"].value_counts()
n_identified = (results_df["gbis_eligible"] | results_df["eco4_eligible"]).sum()
@ -273,15 +357,27 @@ def app():
)
cleaned = msgpack.unpackb(cleaned, raw=False)
cleaning_data = read_parquet_from_s3(
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket="retrofit-data-dev")
created_at = datetime.now().isoformat()
results_df, scoring_data, nodata = get_ha7_data(data, cleaned, cleaning_data, created_at)
results_df, scoring_data, nodata = get_ha7_data(
data, cleaned, cleaning_data, created_at, photo_supply_lookup, floor_area_decile_thresholds
)
# Pickle results
# import pickle
# with open("ha7_results.pkl", "wb") as f:
# with open("ha7_results_jan_10.pkl", "wb") as f:
# pickle.dump({"results_df": results_df, "scoring_data": scoring_data, "nodata": nodata}, f)
# Read in the old data
# import pickle
# with open("ha7_results_jan_10.pkl", "rb") as f:
# old_data = pickle.load(f)
# results_df = old_data["results_df"]
# scoring_data = old_data["scoring_data"]
# nodata = old_data["nodata"]

View file

@ -0,0 +1,244 @@
import pandas as pd
from tqdm import tqdm
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
from utils.logger import setup_logger
logger = setup_logger()
class SolarPhotoSupply:
DATASET_COLUMNS = [
"UPRN", "PROPERTY_TYPE", "TENURE", "BUILT_FORM", "ROOF_DESCRIPTION", "PHOTO_SUPPLY", "TOTAL_FLOOR_AREA",
"CONSTRUCTION_AGE_BAND", "SOLAR_WATER_HEATING_FLAG"
]
def __init__(self, file_directories, cleaned_lookup):
"""
Initialize the SolarPhotoSupply class with file directories and a cleaned lookup. Currently, this class
just works with locally stored data, but this could be extended to work with data stored in S3.
:param file_directories: A list of directories where files are stored.
:param cleaned_lookup: A dictionary containing cleaned lookup data.
"""
self.file_directories = file_directories
self.results = []
self.decile_thresholds = None
self.roof_lookup = pd.DataFrame(cleaned_lookup.get("roof-description"))
self.photo_supply_lookup = pd.DataFrame()
self.floor_area_decile_thresholds = pd.DataFrame()
def create_dataset(self):
"""
Create a dataset from the provided file directories. This method processes the data files,
applies transformations, and aggregates data into a useful format.
"""
if self.roof_lookup.empty:
raise ValueError("No roof lookup data")
results = []
logger.info("Creating solar photo supply dataset")
for dir in tqdm(self.file_directories):
filepath = dir / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df = df[~pd.isnull(df["UPRN"])]
df["UPRN"] = df["UPRN"].astype(int).astype(str)
# Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA
for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]:
df = df[~pd.isnull(df[col])]
# Take newest LODGEMENT_DATE per UPRN
df = df.sort_values(by="LODGEMENT_DATE", ascending=False).drop_duplicates(subset=["UPRN"])
data = df[self.DATASET_COLUMNS].copy()
data["PHOTO_SUPPLY"] = data["PHOTO_SUPPLY"].fillna(0)
data = data[data["PHOTO_SUPPLY"] != 0]
results.append(data)
self.results = pd.concat(results)
# Convert total floor area to deciles
self.decile_thresholds = self.results["TOTAL_FLOOR_AREA"].quantile(
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
).values
self.results["floor_area_decile"] = pd.cut(
self.results["TOTAL_FLOOR_AREA"],
bins=[0] + list(self.decile_thresholds) + [float('inf')],
labels=False,
include_lowest=True
)
# Convert tenure to lower
self.results["TENURE"] = self.results["TENURE"].str.lower()
self.results = self.results.merge(
self.roof_lookup.drop(
columns=[
"clean_description", "thermal_transmittance", "thermal_transmittance_unit", "insulation_thickness",
"is_assumed"
]
),
left_on="ROOF_DESCRIPTION",
right_on="original_description",
how="left"
)
self.photo_supply_lookup = self.results.groupby(
[
"PROPERTY_TYPE", "BUILT_FORM", "TENURE", "is_pitched", "is_roof_room", "is_flat",
"CONSTRUCTION_AGE_BAND", "floor_area_decile"
],
observed=True
).agg(
{
"PHOTO_SUPPLY": ["median", "mean"],
}
).reset_index()
self.photo_supply_lookup.columns = ['_'.join(col).strip() for col in self.photo_supply_lookup.columns.values]
# Remove trailing underscore from columns
self.photo_supply_lookup.columns = [
col[:-1] if col.endswith("_") else col for col in self.photo_supply_lookup.columns.values
]
# Convert columns to lowercase
self.photo_supply_lookup.columns = [col.lower() for col in self.photo_supply_lookup.columns.values]
self.floor_area_decile_thresholds = pd.DataFrame(
self.decile_thresholds,
columns=["floor_area_decile_thresholds"]
)
@staticmethod
def classify_floor_area(new_area, thresholds):
"""
Classify a given floor area into a decile based on provided thresholds.
:param new_area: The new floor area to be classified.
:param thresholds: A list of thresholds used for classification.
:return: An integer representing the decile index.
"""
for i, threshold in enumerate(thresholds):
if new_area <= threshold:
return i # Returns the decile index (0 to 9)
return len(thresholds)
def save(self):
"""
Save the processed data to an S3 bucket in the parquet format. This method also handles
logging and validation to ensure data is present before saving.
"""
if self.photo_supply_lookup.empty:
raise ValueError("No data to save")
logger.info("Storing outputs to S3")
# Store this data in s3 as a parquet file
save_dataframe_to_s3_parquet(
df=self.photo_supply_lookup,
bucket_name="retrofit-data-dev",
file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
save_dataframe_to_s3_parquet(
df=self.floor_area_decile_thresholds,
bucket_name="retrofit-data-dev",
file_key=f"solar_pv_supply/floor_area_decile_thresholds.parquet",
)
@staticmethod
def load(bucket):
"""
Load datasets from an S3 bucket.
:param bucket: The name of the S3 bucket to load data from.
:return: A tuple containing photo supply lookup and floor area decile thresholds dataframes.
"""
photo_supply_lookup = read_dataframe_from_s3_parquet(
bucket_name=bucket, file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
floor_area_decile_thresholds = read_dataframe_from_s3_parquet(
bucket_name=bucket, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet",
)
return photo_supply_lookup, floor_area_decile_thresholds
@classmethod
def filter_photo_supply_lookup(
cls,
photo_supply_lookup: pd.DataFrame,
floor_area_decile_thresholds: pd.DataFrame,
tenure: str,
built_form: str,
property_type: str,
construction_age_band: str,
is_flat: bool,
is_pitched: bool,
is_roof_room: bool,
floor_area: float
):
"""
Filter the photo supply lookup to find the most appropriate photo supply for a given property.
:param photo_supply_lookup: The photo supply lookup dataframe.
:param floor_area_decile_thresholds: The floor area decile thresholds dataframe.
:param tenure: The tenure of the property.
:param built_form: The built form of the property.
:param property_type: The property type of the property.
:param construction_age_band: The construction age band of the property.
:param is_flat: Whether the property has a flat roof.
:param is_pitched: Whether the property has a pitched roof.
:param is_roof_room: Whether the property has a roof room.
:param floor_area: The floor area of the property.
:return:
"""
# Convert the tenure to lower case, as is done in the creation of the dataset
tenure = tenure.lower()
# We remap the "not defined"
tenure = {
"not defined - use in the case of a new dwelling for which the intended tenure in not known. it is not to "
"be used for an existing dwelling":
"not defined - use in the case of a new dwelling for which the intended tenure in not known. it is no"
}.get(tenure, tenure)
photo_supply_matched = photo_supply_lookup[
(photo_supply_lookup["tenure"] == tenure) &
(photo_supply_lookup["built_form"] == built_form) &
(photo_supply_lookup["property_type"] == property_type) &
(photo_supply_lookup["construction_age_band"] == construction_age_band) &
(photo_supply_lookup["is_flat"] == is_flat) &
(photo_supply_lookup["is_pitched"] == is_pitched) &
(photo_supply_lookup["is_roof_room"] == is_roof_room)
]
if photo_supply_matched.empty:
# There are a small number of cases where we don't get a full match so try again with a more aggregated
# average
photo_supply_matched = photo_supply_lookup[
(photo_supply_lookup["tenure"] == tenure) &
(photo_supply_lookup["built_form"] == built_form) &
(photo_supply_lookup["property_type"] == property_type)
]
if construction_age_band in photo_supply_matched["construction_age_band"].values:
photo_supply_matched = photo_supply_matched[
photo_supply_matched["construction_age_band"] == construction_age_band
]
if photo_supply_matched.empty:
raise ValueError("No photo supply matches")
floor_area_decile = cls.classify_floor_area(
floor_area, floor_area_decile_thresholds["floor_area_decile_thresholds"].values
)
if floor_area_decile in photo_supply_matched["floor_area_decile"].values:
photo_supply_matched = photo_supply_matched[
photo_supply_matched["floor_area_decile"] == floor_area_decile
]
return photo_supply_matched

31
etl/solar/app.py Normal file
View file

@ -0,0 +1,31 @@
from pathlib import Path
from etl.epc.property_change_app import get_cleaned
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
def app():
"""
This code reads in the EPC data and attempt to produce a reasonable figure for the photo-supply variable, which
is the following:
"Percentage of photovoltaic area as a percentage of total roof area. 0% indicates that a Photovoltaic Supply
is not present in the property."
When recommending solar, we want to simulate the retrofit by increasing this value from 0, so we need a sensible
figure to increase this to. This script will pull the data for that, to allow us to try and deduce what
a sensible figure would be
:return:
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
cleaned_lookup = get_cleaned()
solar_data_client = SolarPhotoSupply(
file_directories=directories,
cleaned_lookup=cleaned_lookup
)
solar_data_client.create_dataset()
solar_data_client.save()

View file

@ -0,0 +1,109 @@
import unittest
import pandas as pd
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
class TestSolarPhotoSupply(unittest.TestCase):
def setUp(self):
# Mock data for photo_supply_lookup and floor_area_decile_thresholds
self.photo_supply_lookup = pd.DataFrame({
"tenure": ["leasehold", "freehold"],
"built_form": ["detached", "semi-detached"],
"property_type": ["house", "flat"],
"construction_age_band": ["pre-1900", "1900-1929"],
"is_flat": [False, True],
"is_pitched": [True, False],
"is_roof_room": [False, True],
"floor_area_decile": [0, 1],
"photo_supply": [100, 200]
})
self.floor_area_decile_thresholds = pd.DataFrame({
"floor_area_decile_thresholds": [50, 100]
})
self.solar_photo_supply = SolarPhotoSupply([], {})
def test_correct_filtering(self):
result = self.solar_photo_supply.filter_photo_supply_lookup(
self.photo_supply_lookup,
self.floor_area_decile_thresholds,
"leasehold",
"detached",
"house",
"pre-1900",
False,
True,
False,
45
)
self.assertEqual(len(result), 1)
self.assertEqual(result.iloc[0]["photo_supply"], 100)
def test_no_matches(self):
with self.assertRaises(ValueError):
self.solar_photo_supply.filter_photo_supply_lookup(
self.photo_supply_lookup,
self.floor_area_decile_thresholds,
"leasehold",
"unknown",
"house",
"pre-1900",
False,
True,
False,
45
)
def test_floor_area_decile_matching(self):
result = self.solar_photo_supply.filter_photo_supply_lookup(
self.photo_supply_lookup,
self.floor_area_decile_thresholds,
"freehold",
"semi-detached",
"flat",
"1900-1929",
True,
False,
True,
60
)
self.assertEqual(len(result), 1)
self.assertEqual(result.iloc[0]["photo_supply"], 200)
def test_invalid_parameters(self):
with self.assertRaises(AttributeError):
self.solar_photo_supply.filter_photo_supply_lookup(
self.photo_supply_lookup,
self.floor_area_decile_thresholds,
123, # Invalid type for tenure
"detached",
"house",
"pre-1900",
False,
True,
False,
45
)
def test_classify_floor_area(self):
# Setup
thresholds = [10, 20, 30, 40, 50]
solar_photo_supply = SolarPhotoSupply([], {})
# Test Case 1: Valid floor area
floor_area = 25
expected_decile = 2
result = solar_photo_supply.classify_floor_area(floor_area, thresholds)
self.assertEqual(result, expected_decile, "Decile classification did not match expected result")
# Test Case 2: Out of range floor area
floor_area = 60
expected_decile = len(thresholds)
result = solar_photo_supply.classify_floor_area(floor_area, thresholds)
self.assertEqual(result, expected_decile, "Decile classification for out of range value is incorrect")
if __name__ == '__main__':
unittest.main()

View file

@ -73,7 +73,9 @@ def app():
df["UPRN"] = df["UPRN"].astype("Int64").astype("str")
df = df[~pd.isnull(df["UPRN"])]
uprn_sample = sample(df["UPRN"].unique().tolist(), DIR_SAMPLE_SIZE)
# uprn_sample = sample(df["UPRN"].unique().tolist(), DIR_SAMPLE_SIZE)
# Take a fixed sample based on the first DIR_SAMPLE_SIZE uprns
uprn_sample = sorted(df["UPRN"].unique().tolist())[:DIR_SAMPLE_SIZE]
df_sample = df[df["UPRN"].isin(uprn_sample)]
# Take the record with the newest LODGEMENT_DATETIME by uprn
df_sample = df_sample.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN")
@ -149,6 +151,8 @@ def app():
# 0.7859617377809409
# 0.5348837209302325
# Fixed sample, sqrt weights
# Group by tenure
by_tenure = results_df.groupby("tenure").agg(
{"numeric_success": "median", "categorical_success": "median", "uprn": "count"}

View file

@ -18,6 +18,25 @@ regional_labour_variations = [
{"Region": "Northern Ireland", "Adjustment_Factor": 0.76}
]
# This data is based on the MCS database
MCS_SOLAR_PV_COST_DATA = {
"last_updated": "2024-01-04",
"average_cost_per_kwh": 2013.94,
"average_cost_per_kwh-Outer London": 2618.75,
"average_cost_per_kwh-Inner London": 2618.75,
"average_cost_per_kwh-South East England": 2083.33,
"average_cost_per_kwh-South West England": 2113,
"average_cost_per_kwh-East of England": 1973.86,
"average_cost_per_kwh-East Midlands": 1981.86,
"average_cost_per_kwh-West Midlands": 1926.55,
"average_cost_per_kwh-North East England": 2028.49,
"average_cost_per_kwh-North West England": 1620.42,
"average_cost_per_kwh-Yorkshire and the Humber": 2060.9,
"average_cost_per_kwh-Wales": 1898.83,
"average_cost_per_kwh-Scotland": 1967.97,
"average_cost_per_kwh-Northern Ireland": 2126.09,
}
class Costs:
"""
@ -42,7 +61,7 @@ class Costs:
# We use a higher contingency rate for internal wall insulation because of the potential for issues with moving
# fittings and trimming doors, as well as scope for damage to the existing wall during preparation.
IWI_CONTINGENCY = 0.15
IWI_CONTINGENCY = 0.2
# Where there is more uncertainty, a higher contingency rate is used
HIGH_RISK_CONTINGENCY = 0.2
@ -58,8 +77,8 @@ class Costs:
# have a preliminaries of 12-14% so we use 12% as the median for the preliminaries rate.
# For External wall insulation (EWI), we use 15% as the preliminaries rate if we think the property might
# need scaffolding, otherwise we use 12%. This is to account for any site preparation that might be required
EWI_NO_SCAFFOLDING_PRELIMINARIES = 0.15
EWI_SCAFFOLDING_PRELIMINARIES = 0.20
EWI_NO_SCAFFOLDING_PRELIMINARIES = 0.2
EWI_SCAFFOLDING_PRELIMINARIES = 0.25
VAT_RATE = 0.2
PROFIT_MARGIN = 0.2
@ -157,12 +176,16 @@ class Costs:
"""
material_cost_per_m2 = material["material_cost"]
# We inflate material costs due to recent price increases
material_cost_per_m2 = material_cost_per_m2 * 1.5
base_material_cost = material_cost_per_m2 * floor_area
labour_cost = material["labour_cost"] * floor_area * self.labour_adjustment_factor
subtotal_before_profit = base_material_cost + labour_cost
contingency_cost = subtotal_before_profit * self.CONTINGENCY
# We use high risk contingency because of the possibility of access issues and clearing existing insulation
contingency_cost = subtotal_before_profit * self.HIGH_RISK_CONTINGENCY
preliminaries_cost = subtotal_before_profit * self.PRELIMINARIES
profit_cost = subtotal_before_profit * self.PROFIT_MARGIN
@ -811,3 +834,39 @@ class Costs:
"labour_cost": labour_cost,
"labour_days": labour_days
}
def solar_pv(self, wattage: float):
"""
Calculates the total cost for solar PV based data provided by the MCS dashboard, which contains
costing data for installations of renewable and clean energy measures.
The data in the dashboard is filtered on domestic building installations and then the data across the
various regions is manually collected. There is currently no automated way to get the data from the MCS
dashboard
Price can also be benchmarked against this checkatrade article:
https://www.checkatrade.com/blog/cost-guides/cost-of-solar-panel-installation/
:param wattage: Peak wattage of the solar PV system
:return:
"""
# Get the cost data relevant to the region
regional_cost = MCS_SOLAR_PV_COST_DATA["-".join(["average_cost_per_kwh", self.region])]
kw = wattage / 1000
total_cost = kw * regional_cost
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
# Labour hours are based on estimates from online research but an average team seems to consist of 3 people
# and most jobs take around 2 days. Assuming an 8 hour day for 3 people across 2 days, gives us 72 hours of
# labour
return {
"total": total_cost,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": 72,
"labour_days": 2,
}

View file

@ -6,6 +6,7 @@ from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.FireplaceRecommendations import FireplaceRecommendations
from recommendations.LightingRecommendations import LightingRecommendations
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from recommendations.WindowsRecommendations import WindowsRecommendations
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
@ -37,6 +38,7 @@ class Recommendations:
self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance)
self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials)
self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials)
self.solar_recommender = SolarPvRecommendations(property_instance=property_instance)
def recommend(self):
@ -84,6 +86,11 @@ class Recommendations:
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
# Solar recommendations
self.solar_recommender.recommend()
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)
# We insert temporary ids into the recommendations which is important for the optimiser later
property_recommendations = self.insert_temp_recommendation_id(property_recommendations)

View file

@ -0,0 +1,65 @@
import numpy as np
from recommendations.Costs import Costs
class SolarPvRecommendations:
# Approximate area of the solar panels
SOLAR_PANEL_AREA = 1.6
# Wattage per panel
SOLAR_PANEL_WATTAGE = 360
def __init__(self, property_instance):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
"""
self.property = property_instance
self.costs = Costs(self.property)
self.recommendation = []
def recommend(self):
"""
We check if a property is potentially suitable for solar PV based on the following criteria:
- The property is a house or bungalow
- The property has a flat or pitched roof
- The property does not have existing solar pv
:return:
"""
is_valid_property_type = self.property.data["property-type"] in ["House", "Bungalow"]
is_valid_roof_type = (
self.property.roof["is_flat"] or self.property.roof["is_pitched"] or self.property.roof["is_roof_room"]
)
# If there is no existing solar PV, the photo-supply field will be None or a missing value
has_no_existing_solar_pv = self.property.data["photo-supply"] in [
None, 0, self.property.DATA_ANOMALY_MATCHES
]
if not is_valid_property_type or not is_valid_roof_type or not has_no_existing_solar_pv:
return
# We now have a property which is potentially suitable for solar PV
number_solar_panels = np.floor(self.property.solar_pv_roof_area / self.SOLAR_PANEL_AREA)
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
# of solar PV installations
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage)
kw = int(np.round(solar_panel_wattage / 1000))
self.recommendation = [
{
"parts": [],
"type": "solar_pv",
"description": f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) panel system on the roof",
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
# back up here
"photo_supply": 100 * self.property.solar_pv_percentage
}
]

View file

@ -1,4 +1,5 @@
import math
from datetime import datetime
from copy import deepcopy
import numpy as np
@ -713,3 +714,26 @@ def estimate_windows(
raise ValueError("Window count cannot be negative.")
return window_count
def calculate_cavity_age(newest_epc, older_epcs, cleaned):
all_epcs = [newest_epc] + older_epcs
df = []
for x in all_epcs:
# Get the cleaned mapping
mapped = [y for y in cleaned["walls-description"] if y["original_description"] == x["walls-description"]]
if not mapped:
continue
df.append(
{
**mapped[0],
"inspection-date": x["lodgement-date"],
}
)
df = pd.DataFrame(df)
df = df[df["is_cavity_wall"] & df["is_filled_cavity"]]
cavity_age = (datetime.now() - pd.to_datetime(df["inspection-date"].max())).days
return cavity_age

View file

@ -1,6 +1,7 @@
from recommendations.Costs import Costs
from unittest.mock import Mock
import datetime
import pytest
class TestCosts:
@ -58,9 +59,9 @@ class TestCosts:
)
assert loft_results == {
'total': 430.21445040000003, 'subtotal': 358.512042, 'vat': 71.70240840000001,
'contingency': 25.608003000000004, 'preliminaries': 25.608003000000004, 'material': 198.29923000000002,
'profit': 51.21600600000001, 'labour_hours': 3.685, 'labour_cost': 57.7808, 'labour_days': 0.460625
'total': 639.4133610000001, 'subtotal': 532.8444675000001, 'vat': 106.56889350000002,
'contingency': 71.045929, 'preliminaries': 35.5229645, 'material': 297.448845, 'profit': 71.045929,
'labour_hours': 3.685, 'labour_cost': 57.7808, 'labour_days': 0.460625
}
def test_internal_wall_insulation(self):
@ -176,11 +177,9 @@ class TestCosts:
)
assert iwi_results == {
'total': 6650.889456921851, 'subtotal': 5542.407880768209, 'vat': 1108.4815761536418,
'contingency': 573.3525393898148, 'preliminaries': 382.2350262598765,
'material': 1747.488000615996,
'profit': 764.470052519753, 'labour_hours': 88.23759388401297,
'labour_days': 2.757424808875405,
'total': 6880.2304726777775, 'subtotal': 5733.525393898148, 'vat': 1146.7050787796295,
'contingency': 764.470052519753, 'preliminaries': 382.2350262598765, 'material': 1747.488000615996,
'profit': 764.470052519753, 'labour_hours': 88.23759388401297, 'labour_days': 2.757424808875405,
'labour_cost': 1927.1602026551818
}
@ -414,8 +413,8 @@ class TestCosts:
)
assert ewi_results == {
'total': 14561.688989159393, 'subtotal': 12134.740824299493, 'vat': 2426.948164859899,
'contingency': 808.9827216199662, 'preliminaries': 1617.9654432399325, 'material': 4020.565147410677,
'total': 15047.078622131372, 'subtotal': 12539.232185109477, 'vat': 2507.8464370218953,
'contingency': 808.9827216199662, 'preliminaries': 2022.4568040499155, 'material': 4020.565147410677,
'profit': 1617.9654432399325, 'labour_hours': 187.02533486285358, 'labour_days': 5.8445417144641745,
'labour_cost': 3921.5600094613983
}
@ -499,3 +498,48 @@ class TestCosts:
'labour_hours': 24.79, 'labour_days': 1.549375, 'labour_cost': 186.9032}
assert costs.labour_adjustment_factor == 0.88
# Mock property instance for regional tests
@pytest.fixture(params=[
("Northamptonshire", "East Midlands", 7927.44),
("Greater London Authority", "Inner London", 10475.0),
("Adur", "South East England", 8333.32),
("Bournemouth", "South West England", 8452),
("Basildon", "East of England", 7895.44),
("Birmingham", "West Midlands", 7706.2),
("County Durham", "North East England", 8113.96),
("Allerdale", "North West England", 6481.68),
("York", "Yorkshire and the Humber", 8243.6),
("Cardiff", "Wales", 7595.32),
("Glasgow City", "Scotland", 7871.88),
("Belfast", "Northern Ireland", 8504.36)
])
def mock_property_with_region(self, request):
county, region, expected_cost = request.param
mock_property = Mock()
mock_property.data = {"county": county}
return mock_property, region, expected_cost
# Test for different wattages
@pytest.mark.parametrize("wattage, expected_cost", [
(3000, 5945.58),
(4000, 7927.44),
(5000, 9909.3),
(6000, 11891.16),
])
def test_solar_pv_different_wattages(self, wattage, expected_cost):
mock_property = Mock()
mock_property.data = {"county": "Mansfield"}
costs = Costs(mock_property)
result = costs.solar_pv(wattage)
assert result['total'] == pytest.approx(expected_cost, rel=0.01)
def test_solar_pv_regional_variation(self, mock_property_with_region):
# Test for regional cost variations
property_instance, expected_region, expected_cost = mock_property_with_region
costs = Costs(property_instance)
assert costs.region == expected_region
result = costs.solar_pv(4000) # Testing with a fixed wattage of 4000
assert result['total'] == pytest.approx(expected_cost, rel=0.01)

View file

@ -6,7 +6,7 @@ from recommendations.FireplaceRecommendations import FireplaceRecommendations
class TestFirepaceRecommendations:
def test_no_fireplaces(self):
property_instance = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.data = {
"number-open-fireplaces": 0
}
@ -22,7 +22,7 @@ class TestFirepaceRecommendations:
assert recommender.recommendation is None
def test_one_fireplace(self):
property_instance = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.data = {
"number-open-fireplaces": 1
}
@ -40,7 +40,7 @@ class TestFirepaceRecommendations:
assert recommender.recommendation[0]["total"] == 300
def test_multiple_fireplaces(self):
property_instance = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.data = {
"number-open-fireplaces": 3
}

View file

@ -21,16 +21,6 @@ class TestFloorRecommendations:
) as f:
return pickle.load(f)
@pytest.fixture
def mock_floor_rec_instance(self):
# Creating a mock instance of WallRecommendations with the necessary attributes
property_mock = Mock()
property_mock.full_sap_epc = {"lodgement-date": "2000-01-01"}
property_mock.data = {"county": "York"}
mock_wall_rec_instance = FloorRecommendations(property_mock, materials)
return mock_wall_rec_instance
def test_init(self, input_properties):
input_properties[0].insulation_floor_area = 50
input_properties[0].insulation_wall_area = 90

View file

@ -9,7 +9,7 @@ from recommendations.tests.test_data.materials import materials
class TestLightingRecommendations:
def test_init_invalid_materials(self):
input_property0 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property0 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property0.lighting = {"low_energy_proportion": 0}
input_property0.data = {"county": "Greater London Authority"}
# Test for invalid materials
@ -18,7 +18,7 @@ class TestLightingRecommendations:
def test_recommend_no_action_needed(self):
# Case where no recommendation is needed
input_property1 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property1.lighting = {"low_energy_proportion": 100}
input_property1.data = {"county": "Greater London Authority"}
@ -28,7 +28,7 @@ class TestLightingRecommendations:
def test_recommend_action_needed(self):
# Case where recommendation is needed
input_property1 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property1.lighting = {"low_energy_proportion": 100}
input_property1.data = {"county": "Greater London Authority"}
input_property1.lighting = {"low_energy_proportion": 0.80}

View file

@ -1,5 +1,4 @@
from backend.Property import Property
from unittest.mock import Mock
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.tests.test_data.materials import materials
@ -7,7 +6,7 @@ from recommendations.tests.test_data.materials import materials
class TestRoofRecommendations:
def test_loft_insulation_recommendation_no_insulation(self):
property_instance = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.age_band = "F"
property_instance.insulation_floor_area = 100
property_instance.roof = {
@ -32,7 +31,7 @@ class TestRoofRecommendations:
assert len(roof_recommender.recommendations)
def test_loft_insulation_recommendation_50mm_insulation(self):
property_instance2 = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance2 = Property(id=0, address="fake", postcode="fake")
property_instance2.age_band = "F"
property_instance2.insulation_floor_area = 100
property_instance2.roof = {
@ -54,11 +53,11 @@ class TestRoofRecommendations:
assert len(roof_recommender2.recommendations) == 1
assert roof_recommender2.recommendations[0]["total"] == 1310.56464
assert roof_recommender2.recommendations[0]["total"] == 1936.9206000000004
assert roof_recommender2.recommendations[0]["new_u_value"] == 0.14
assert roof_recommender2.recommendations[0]["starting_u_value"] == 0.68
property_instance3 = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance3 = Property(id=0, address="fake", postcode="fake")
property_instance3.age_band = "F"
property_instance3.insulation_floor_area = 100
property_instance3.roof = {
@ -83,7 +82,7 @@ class TestRoofRecommendations:
assert roof_recommender3.recommendations[0]["parts"][0]["depth"] == 270
def test_loft_insulation_recommendation_150mm_insulation(self):
property_instance4 = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance4 = Property(id=0, address="fake", postcode="fake")
property_instance4.age_band = "F"
property_instance4.insulation_floor_area = 100
property_instance4.roof = {
@ -105,12 +104,12 @@ class TestRoofRecommendations:
assert len(roof_recommender4.recommendations) == 4
assert roof_recommender4.recommendations[0]["total"] == 788.0544
assert roof_recommender4.recommendations[0]["total"] == 1128.744
assert roof_recommender4.recommendations[0]["new_u_value"] == 0.15
assert roof_recommender4.recommendations[0]["starting_u_value"] == 0.3
assert roof_recommender4.recommendations[0]["parts"][0]["depth"] == 150
property_instance5 = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance5 = Property(id=0, address="fake", postcode="fake")
property_instance5.age_band = "F"
property_instance5.insulation_floor_area = 100
property_instance5.roof = {
@ -137,7 +136,7 @@ class TestRoofRecommendations:
def test_loft_insulation_recommendation_270mm_insulation(self):
# We shouldn't recommend anything in this case
property_instance6 = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance6 = Property(id=0, address="fake", postcode="fake")
property_instance6.age_band = "F"
property_instance6.insulation_floor_area = 100
property_instance6.roof = {
@ -278,7 +277,7 @@ class TestRoofRecommendations:
# "Insulate your room roof with 270mm of Example room roof insulation"
def test_flat_no_insulation(self):
property_instance11 = Property(id=11, address1="fake", postcode="fake", epc_client=Mock())
property_instance11 = Property(id=11, address="fake", postcode="fake")
property_instance11.age_band = "D"
property_instance11.insulation_floor_area = 33.5
property_instance11.perimeter = 24
@ -307,7 +306,7 @@ class TestRoofRecommendations:
"Insulate the home's flat roof with 150mm of Ecotherm Eco-Versal General Purpose Insulation Board"
def test_flat_insulated(self):
property_instance12 = Property(id=12, address1="fake", postcode="fake", epc_client=Mock())
property_instance12 = Property(id=12, address="fake", postcode="fake")
property_instance12.age_band = "D"
property_instance12.insulation_floor_area = 40
property_instance12.perimeter = 30
@ -331,7 +330,7 @@ class TestRoofRecommendations:
assert not roof_recommender12.recommendations
def test_flat_limited_insulation(self):
property_instance13 = Property(id=12, address1="fake", postcode="fake", epc_client=Mock())
property_instance13 = Property(id=12, address="fake", postcode="fake")
property_instance13.age_band = "D"
property_instance13.insulation_floor_area = 40
property_instance13.perimeter = 40
@ -363,7 +362,7 @@ class TestRoofRecommendations:
"Insulate the home's flat roof with 150mm of Ecotherm Eco-Versal General Purpose Insulation Board"
def test_property_above(self):
property_instance14 = Property(id=0, address1="fake", postcode="fake", epc_client=Mock())
property_instance14 = Property(id=0, address="fake", postcode="fake")
property_instance14.age_band = "F"
property_instance14.insulation_floor_area = 100
property_instance14.roof = {

View file

@ -0,0 +1,79 @@
import pytest
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from backend.Property import Property
class TestSolarPvRecommendations:
@pytest.fixture
def property_instance_invalid_type(self):
# Setup the property_instance with an invalid property type
property_instance_invalid_type = Property(id=1, address="", postcode="")
property_instance_invalid_type.data = {
"property-type": "InvalidType", "county": "Broxbourne", "photo-supply": None
}
property_instance_invalid_type.roof = {"is_flat": False, "is_pitched": False, "is_roof_room": False}
return property_instance_invalid_type
@pytest.fixture
def property_instance_invalid_roof(self):
# Setup the property_instance with invalid roof type
property_instance_invalid_roof = Property(id=1, address="", postcode="")
property_instance_invalid_roof.data = {
"county": "Huntingdonshire", "property-type": "House", "photo-supply": None
}
property_instance_invalid_roof.roof = {"is_flat": False, "is_pitched": False, "is_roof_room": False}
return property_instance_invalid_roof
@pytest.fixture
def property_instance_has_solar_pv(self):
# Setup the property_instance without existing solar pv
property_instance_has_solar_pv = Property(id=1, address="", postcode="")
property_instance_has_solar_pv.data = {"photo-supply": "40", "county": "Huntingdonshire",
"property-type": "House"}
property_instance_has_solar_pv.roof = {"is_flat": True}
return property_instance_has_solar_pv
@pytest.fixture
def property_instance_valid_all(self):
# Setup a valid property_instance that passes all conditions
property_instance_valid_all = Property(id=1, address="", postcode="")
property_instance_valid_all.solar_pv_roof_area = 20
property_instance_valid_all.solar_pv_percentage = 40
property_instance_valid_all.data = {"property-type": "House", "photo-supply": None, "county": "Huntingdonshire"}
property_instance_valid_all.roof = {"is_flat": True}
return property_instance_valid_all
def test_invalid_property_type(self, property_instance_invalid_type):
solar_pv = SolarPvRecommendations(property_instance_invalid_type)
solar_pv.recommend()
assert not solar_pv.recommendation
def test_invalid_roof_type(self, property_instance_invalid_roof):
solar_pv = SolarPvRecommendations(property_instance_invalid_roof)
solar_pv.recommend()
assert not solar_pv.recommendation
def test_existing_solar_pv(self, property_instance_has_solar_pv):
solar_pv = SolarPvRecommendations(property_instance_has_solar_pv)
solar_pv.recommend()
assert not solar_pv.recommendation
def test_valid_all_conditions(self, property_instance_valid_all):
solar_pv = SolarPvRecommendations(property_instance_valid_all)
solar_pv.recommend()
assert solar_pv.recommendation == [
{
'parts': [],
'type': 'solar_pv',
'description': 'Install a 4 kilowatt-peak (kWp) solar photovoltaic (PV) panel system on the roof',
'starting_u_value': None,
'new_u_value': None,
'sap_points': None,
'total': 8527.0752,
'subtotal': 7105.896,
'vat': 1421.1791999999996,
'labour_hours': 72,
'labour_days': 2,
'photo_supply': 4000
}
]

View file

@ -1,5 +1,4 @@
from backend.Property import Property
from unittest.mock import Mock
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.tests.test_data.materials import materials
@ -7,7 +6,7 @@ from recommendations.tests.test_data.materials import materials
class TestVentilationRecommendations:
def test_natural_ventilation(self):
input_property1 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property1.data = {"mechanical-ventilation": "natural"}
recommender = VentilationRecommendations(
@ -28,7 +27,7 @@ class TestVentilationRecommendations:
assert recommender.recommendation[0]["parts"][0]["quantity"] == 2
def test_missing_ventilation(self):
input_property2 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property2 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property2.data = {"mechanical-ventilation": None}
recommender2 = VentilationRecommendations(
@ -49,7 +48,7 @@ class TestVentilationRecommendations:
assert recommender2.recommendation[0]["parts"][0]["quantity"] == 2
def test_nodata_ventilation(self):
input_property3 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property3 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property3.data = {"mechanical-ventilation": "NO DATA!!"}
recommender3 = VentilationRecommendations(
@ -70,7 +69,7 @@ class TestVentilationRecommendations:
assert recommender3.recommendation[0]["parts"][0]["quantity"] == 2
def test_existing_ventilation_1(self):
input_property4 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property4 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property4.data = {"mechanical-ventilation": 'mechanical, extract only'}
recommender4 = VentilationRecommendations(
@ -86,7 +85,7 @@ class TestVentilationRecommendations:
assert recommender4.has_ventilaion
def test_existing_ventilation_2(self):
input_property5 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property5 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property5.data = {"mechanical-ventilation": 'mechanical, supply and extract'}
recommender5 = VentilationRecommendations(

View file

@ -231,7 +231,7 @@ class TestWallRecommendationsBase:
class TestCavityWallRecommensations:
def test_fill_empty_cavity(self):
input_property = Property(id=1, postcode="F4k3", address1="123 fake street", epc_client=Mock())
input_property = Property(id=1, postcode="F4k3", address="123 fake street")
input_property.walls = {
'original_description': 'Cavity wall, as built, no insulation (assumed)',
'clean_description': 'Cavity wall, as built, no insulation',
@ -265,7 +265,7 @@ class TestCavityWallRecommensations:
assert np.isclose(recommender.recommendations[1]["total"], 2004.6600000000003)
def test_fill_partial_filled_cavity(self):
input_property = Property(id=1, postcode="F4k3", address1="123 fake street", epc_client=Mock())
input_property = Property(id=1, postcode="F4k3", address="123 fake street")
input_property.walls = {
'original_description': 'Cavity wall, as built, partial insulation (assumed)',
'clean_description': 'Cavity wall, as built, partial insulation',
@ -299,7 +299,7 @@ class TestCavityWallRecommensations:
assert np.isclose(recommender.recommendations[1]["total"], 1999.9350000000002)
def test_system_built_wall(self):
input_property2 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property2 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property2.walls = {
'original_description': 'System built, as built, no insulation (assumed)',
'clean_description': 'System built, as built, no insulation',
@ -331,22 +331,22 @@ class TestCavityWallRecommensations:
assert len(recommender2.recommendations) == 9
assert recommender2.estimated_u_value == 1
assert np.isclose(recommender2.recommendations[0]["new_u_value"], 0.19)
assert np.isclose(recommender2.recommendations[0]["total"], 15899.9616)
assert np.isclose(recommender2.recommendations[0]["total"], 16429.960320000002)
assert recommender2.recommendations[0]["parts"][0]["type"] == "external_wall_insulation"
assert recommender2.recommendations[0]["parts"][0]["depth"] == 100
assert np.isclose(recommender2.recommendations[8]["new_u_value"], 0.23)
assert np.isclose(recommender2.recommendations[8]["total"], 10916.3424)
assert np.isclose(recommender2.recommendations[8]["total"], 11292.768)
assert recommender2.recommendations[8]["parts"][0]["type"] == "internal_wall_insulation"
assert recommender2.recommendations[8]["parts"][0]["depth"] == 72.5
assert np.isclose(recommender2.recommendations[6]["new_u_value"], 0.29)
assert np.isclose(recommender2.recommendations[6]["total"], 10621.934399999998)
assert np.isclose(recommender2.recommendations[6]["total"], 10988.208)
assert recommender2.recommendations[6]["parts"][0]["type"] == "internal_wall_insulation"
assert recommender2.recommendations[6]["parts"][0]["depth"] == 52.5
def test_timber_frame_wall(self):
input_property3 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property3 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property3.walls = {
'original_description': 'Timber frame, as built, no insulation (assumed)',
'clean_description': 'Timber frame, as built, no insulation',
@ -378,17 +378,17 @@ class TestCavityWallRecommensations:
assert len(recommender3.recommendations) == 6
assert recommender3.estimated_u_value == 1.9
assert np.isclose(recommender3.recommendations[0]["new_u_value"], 0.2)
assert np.isclose(recommender3.recommendations[0]["total"], 13117.46832)
assert np.isclose(recommender3.recommendations[0]["total"], 13554.717263999999)
assert recommender3.recommendations[0]["parts"][0]["type"] == "external_wall_insulation"
assert recommender3.recommendations[0]["parts"][0]["depth"] == 100.0
assert np.isclose(recommender3.recommendations[1]["new_u_value"], 0.23)
assert np.isclose(recommender3.recommendations[1]["total"], 34070.50944)
assert np.isclose(recommender3.recommendations[1]["total"], 35206.19308800001)
assert recommender3.recommendations[1]["parts"][0]["type"] == "external_wall_insulation"
assert recommender3.recommendations[1]["parts"][0]["depth"] == 150.0
def test_granite_or_whinstone_wall(self):
input_property4 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property4 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property4.walls = {
'original_description': 'Granite or whinstone, as built, no insulation (assumed)',
'clean_description': 'Granite or whinstone, as built, no insulation',
@ -420,17 +420,17 @@ class TestCavityWallRecommensations:
assert len(recommender4.recommendations) == 6
assert recommender4.estimated_u_value == 2.3
assert np.isclose(recommender4.recommendations[0]["new_u_value"], 0.21)
assert np.isclose(recommender4.recommendations[0]["total"], 28562.514352)
assert np.isclose(recommender4.recommendations[0]["total"], 29547.42864)
assert recommender4.recommendations[0]["parts"][0]["type"] == "external_wall_insulation"
assert recommender4.recommendations[0]["parts"][0]["depth"] == 100
assert np.isclose(recommender4.recommendations[1]["new_u_value"], 0.23)
assert np.isclose(recommender4.recommendations[1]["total"], 74186.52678400002)
assert np.isclose(recommender4.recommendations[1]["total"], 76744.68288000001)
assert recommender4.recommendations[1]["parts"][0]["type"] == "external_wall_insulation"
assert recommender4.recommendations[1]["parts"][0]["depth"] == 150
def test_cob_wall(self):
input_property5 = Property(id=1, postcode="F4k3 2", address1="223 fake street", epc_client=Mock())
input_property5 = Property(id=1, postcode="F4k3 2", address="223 fake street")
input_property5.walls = {
'original_description': 'Cob, as built',
'clean_description': 'Cob, as built',
@ -462,17 +462,17 @@ class TestCavityWallRecommensations:
assert len(recommender5.recommendations) == 5
assert recommender5.estimated_u_value == 0.8
assert np.isclose(recommender5.recommendations[0]["new_u_value"], 0.29)
assert np.isclose(recommender5.recommendations[0]["total"], 8665.040384000002)
assert np.isclose(recommender5.recommendations[0]["total"], 8963.834880000002)
assert recommender5.recommendations[0]["parts"][0]["type"] == "external_wall_insulation"
assert recommender5.recommendations[0]["parts"][0]["depth"] == 50
assert np.isclose(recommender5.recommendations[3]["new_u_value"], 0.26)
assert np.isclose(recommender5.recommendations[3]["total"], 20078.742992)
assert np.isclose(recommender5.recommendations[3]["total"], 20771.11344)
assert recommender5.recommendations[3]["parts"][0]["type"] == "internal_wall_insulation"
assert recommender5.recommendations[3]["parts"][0]["depth"] == 100
def test_sandstone_or_limestone_wall(self):
input_property6 = Property(id=1, postcode="F4k3 6", address1="623 fake street", epc_client=Mock())
input_property6 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property6.walls = {
'original_description': 'Sandstone or limestone, as built, no insulation (assumed)',
'clean_description': 'Sandstone or limestone, as built, no insulation',
@ -504,16 +504,16 @@ class TestCavityWallRecommensations:
assert len(recommender6.recommendations) == 9
assert recommender6.estimated_u_value == 1
assert np.isclose(recommender6.recommendations[0]["new_u_value"], 0.19)
assert np.isclose(recommender6.recommendations[0]["total"], 44829.0584)
assert np.isclose(recommender6.recommendations[0]["total"], 46374.888000000006)
assert recommender6.recommendations[0]["parts"][0]["type"] == "external_wall_insulation"
assert recommender6.recommendations[0]["parts"][0]["depth"] == 100
assert np.isclose(recommender6.recommendations[2]["new_u_value"], 0.21)
assert np.isclose(recommender6.recommendations[2]["total"], 116436.25280000002)
assert np.isclose(recommender6.recommendations[2]["total"], 120451.29600000002)
assert recommender6.recommendations[2]["parts"][0]["type"] == "external_wall_insulation"
assert recommender6.recommendations[2]["parts"][0]["depth"] == 150
assert np.isclose(recommender6.recommendations[4]["new_u_value"], 0.28)
assert np.isclose(recommender6.recommendations[4]["total"], 91267.0136)
assert np.isclose(recommender6.recommendations[4]["total"], 94414.15199999999)
assert recommender6.recommendations[4]["parts"][0]["type"] == "internal_wall_insulation"
assert recommender6.recommendations[4]["parts"][0]["depth"] == 100

View file

@ -1,6 +1,5 @@
from recommendations.WindowsRecommendations import WindowsRecommendations
from backend.Property import Property
from unittest.mock import Mock
from recommendations.tests.test_data.materials import materials
@ -15,11 +14,11 @@ class TestWindowRecommendations:
property_1 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 0
"multi-glaze-proportion": 0,
"uprn": 0
}
)
property_1.windows = {
@ -52,11 +51,11 @@ class TestWindowRecommendations:
property_2 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 33
"multi-glaze-proportion": 33,
"uprn": 0
}
)
property_2.windows = {'original_description': 'Mostly double glazing', 'has_glazing': True,
@ -86,11 +85,11 @@ class TestWindowRecommendations:
property_3 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 80
"multi-glaze-proportion": 80,
"uprn": 0
}
)
property_3.windows = {'original_description': 'Fully double glazed', 'has_glazing': True,
@ -110,11 +109,11 @@ class TestWindowRecommendations:
property_4 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 100
"multi-glaze-proportion": 100,
"uprn": 0
}
)
property_4.windows = {'original_description': 'Full secondary glazing', 'has_glazing': True,
@ -134,11 +133,11 @@ class TestWindowRecommendations:
property_5 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 50
"multi-glaze-proportion": 50,
"uprn": 0
}
)
property_5.windows = {'original_description': 'Partial secondary glazing', 'has_glazing': True,
@ -164,11 +163,11 @@ class TestWindowRecommendations:
property_6 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 0
"multi-glaze-proportion": 0,
"uprn": 0
}
)
property_6.windows = {'original_description': 'Single glazed', 'has_glazing': False, 'glazing_coverage': None,
@ -199,11 +198,11 @@ class TestWindowRecommendations:
property_7 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 100
"multi-glaze-proportion": 100,
"uprn": 0
}
)
property_7.windows = {'original_description': 'Fully triple glazed', 'has_glazing': True,
@ -227,11 +226,11 @@ class TestWindowRecommendations:
property_8 = Property(
id=1,
postcode='1',
address1='1',
epc_client=Mock(),
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 80
"multi-glaze-proportion": 80,
"uprn": 1
}
)
property_8.windows = {'original_description': 'Mostly triple glazing', 'has_glazing': True,