Merge pull request #272 from Hestia-Homes/new-etl-unit-testing

New etl unit testing
This commit is contained in:
KhalimCK 2024-01-29 12:33:53 +00:00 committed by GitHub
commit 5bd6366ad2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 3225 additions and 1375 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (backend)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.10 (model_data)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (backend)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (model_data)" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>

View file

@ -45,7 +45,9 @@ class Definitions:
# contain a null value. A resolution to correct these anomalies will be considered for future data releases.
"NULL",
# We sometimes see fields populated with just an empty string.
""
"",
# An older value which rarely shows up but has been seen in the data.
"UNKNOWN",
}
DATA_ANOMALY_SUBSTRINGS = {

View file

@ -13,7 +13,7 @@ from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from BaseUtility import Definitions
from etl.epc.settings import DATA_ANOMALY_MATCHES
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
@ -25,7 +25,7 @@ DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT =
logger = setup_logger()
class Property(Definitions):
class Property:
ATTRIBUTE_MAP = {
"floor-description": "floor",
"hotwater-description": "hotwater",
@ -51,6 +51,8 @@ class Property(Definitions):
spatial = None
base_difference_record = None
DATA_ANOMALY_MATCHES = DATA_ANOMALY_MATCHES
def __init__(self, id, postcode, address, epc_record):
self.epc_record = epc_record
@ -68,7 +70,7 @@ class Property(Definitions):
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = epc_record.get("year_built")
self.number_of_rooms = epc_record.prepared_epc.get("number_of_rooms")
self.number_of_rooms = epc_record.prepared_epc.get("number_habitable_rooms")
self.age_band = epc_record.get("age_band")
self.construction_age_band = epc_record.get("construction_age_band")
self.number_of_floors = epc_record.get("number_of_floors")
@ -88,22 +90,24 @@ class Property(Definitions):
}
self.solar_hot_water = {
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
"solar_hot_water_boolean": epc_record.get("solar_water_heating_flag_bool"),
}
self.wind_turbine = {
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
}
self.number_of_open_fireplaces = {
"number_of_open_fireplaces": epc_record.prepared_epc.get("number_of_open_fireplaces"),
"number_of_open_fireplaces": epc_record.prepared_epc.get("number_open_fireplaces"),
}
self.number_of_extensions = {
"number_of_extensions": epc_record.prepared_epc.get("number_of_extensions"),
"number_of_extensions": epc_record.prepared_epc.get("extension_count"),
}
self.number_of_storeys = {
"number_of_storeys": epc_record.prepared_epc.get("number_of_storeys"),
"number_of_storeys": epc_record.prepared_epc.get("flat_storey_count"),
}
self.heat_loss_corridor = {
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
"heat_loss_corridor_boolean": epc_record.get("heat_loss_corridor_bool"),
}
self.mains_gas = epc_record.prepared_epc.get('mains_gas_flag')
self.floor_height = epc_record.prepared_epc.get('floor_height')
@ -222,7 +226,10 @@ class Property(Definitions):
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth)
recommendation_record["roof_energy_eff_ending"] = "Very Good"
if recommendation["type"] == "loft_insulation":
recommendation_record["roof_energy_eff_ending"] = "Good"
else:
recommendation_record["roof_energy_eff_ending"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades
if recommendation_record["roof_thermal_transmittance_ending"] is None:
@ -297,6 +304,7 @@ class Property(Definitions):
self.set_basic_property_dimensions()
for description, attribute in cleaned.items():
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
@ -314,6 +322,7 @@ class Property(Definitions):
attributes = [
x for x in cleaned[description] if x["original_description"] == self.data[description]
]
if len(attributes) > 1:
raise ValueError("Either No attributes or multiple found for %s" % description)
@ -433,10 +442,10 @@ class Property(Definitions):
"mainfuel": self.main_fuel["clean_description"],
"ventilation": self.ventilation["ventilation"],
"solar_pv": self.solar_pv["solar_pv"],
"solar_hot_water": self.solar_hot_water["solar_hot_water"],
"solar_hot_water": self.solar_hot_water["solar_hot_water_boolean"],
"wind_turbine": self.wind_turbine["wind_turbine"],
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor"],
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor_boolean"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"],
"number_of_extensions": self.number_of_extensions["number_of_extensions"],

View file

@ -472,7 +472,7 @@ class SearchEpc:
if not epc_data.empty:
# Further processing of the EPC data
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], format='mixed')
epc_data['lodgement-datetime'] = pd.to_datetime(epc_data['lodgement-datetime'], errors='coerce')
epc_data = epc_data.sort_values("lodgement-datetime", ascending=False).groupby("uprn").head(1)
epc_data["house_number"] = epc_data["address"].apply(lambda add1: self.get_house_number(add1))
epc_data["numeric_house_number"] = epc_data["house_number"].apply(

View file

@ -28,8 +28,6 @@ from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_e
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.optimiser.CostOptimiser import CostOptimiser
@ -68,7 +66,6 @@ async def trigger_plan(body: PlanTriggerRequest):
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
@ -96,13 +93,16 @@ async def trigger_plan(body: PlanTriggerRequest):
)
epc_records = {
'original_epc': epc_searcher.newest_epc,
'full_sap_epc': epc_searcher.full_sap_epc,
'old_data': epc_searcher.older_epcs,
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata",
cleaning_data=cleaning_data) # This uses all the epc records to clean the data
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
input_properties.append(
Property(
@ -173,8 +173,6 @@ async def trigger_plan(body: PlanTriggerRequest):
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# all_predictions["heat_demand_predictions"]= all_predictions["sap_change_predictions"].copy()
# all_predictions["carbon_change_predictions"] = all_predictions["sap_change_predictions"].copy()
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising recommendations")
@ -310,10 +308,6 @@ async def trigger_plan(body: PlanTriggerRequest):
}
)
# all_combined_predictions["heat_demand_predictions"]= all_combined_predictions["sap_change_predictions"].copy()
# all_combined_predictions["carbon_change_predictions"] = all_combined_predictions[
# "sap_change_predictions"].copy()
# We update the carbon and heat demand predictions
for property_id, property_recommendations in recommendations.items():
combined_heat_demand = all_combined_predictions["heat_demand_predictions"]

View file

@ -22,6 +22,8 @@ class PropertyValuation:
100021192109: 650000, # Based on Zoopla
766249482: 358000, # Based on Zoopla estimate for 19 Spring Lane, 3 bedroom semi-detached
100120703802: 277000, # Based on Zoopla
10014469685: 286000, # Based on Zoopla
10001328782: 196000, # Based on Zoopla
}
# We base our valuation uplifts on a number of sources
@ -96,11 +98,11 @@ class PropertyValuation:
if not value:
return {
"current_value": None,
"lower_bound_increased_value": None,
"upper_bound_increased_value": None,
"average_increased_value": None,
"average_increase": None
"current_value": 0,
"lower_bound_increased_value": 0,
"upper_bound_increased_value": 0,
"average_increased_value": 0,
"average_increase": 0
}
current_epc = property_instance.data["current-energy-rating"]

View file

@ -1,9 +1,9 @@
import pandas as pd
import pytest
from unittest.mock import Mock
from epc_api.client import EpcClient
from backend.Property import Property
from etl.epc_clean.EpcClean import EpcClean
from etl.epc.Record import EPCRecord
# Define some test data
mock_epc_response = {
@ -196,12 +196,21 @@ class TestProperty:
@pytest.fixture(autouse=True)
def property_instance(self, mock_cleaner):
property_instance = Property(id=1, postcode="AB12CD", address="Test Address", data=mock_epc_response["rows"][0])
epc_record = EPCRecord()
epc_record.prepared_epc = mock_epc_response["rows"][0]
property_instance = Property(id=1, postcode="AB12CD", address="Test Address", epc_record=epc_record)
property_instance.number_of_floors = 2
property_instance.number_of_rooms = 5
property_instance.floor_area = 100
property_instance.floor_height = 2.5
return property_instance
@pytest.fixture(autouse=True)
def property_instance_dupe_data(self):
property_instance_dupe_data = Property(id=2, postcode="AB12CD", address="Test Address")
epc_record = EPCRecord()
epc_record.prepared_epc = mock_epc_response_dupe["rows"][0]
property_instance_dupe_data = Property(id=2, postcode="AB12CD", address="Test Address", epc_record=epc_record)
return property_instance_dupe_data
# @pytest.fixture
@ -271,15 +280,17 @@ class TestProperty:
return mock_cleaner
def test_init(self):
inst1 = Property(0, postcode="AB12CD", address="Test Address")
epc_record = EPCRecord()
epc_record.prepared_epc = {"uprn": 1}
inst1 = Property(0, postcode="AB12CD", address="Test Address", epc_record=epc_record)
assert inst1.data is None
assert inst1.data is not None
inst2 = Property(3, "AB12CD", "Test Address")
inst2 = Property(3, "AB12CD", "Test Address", epc_record=epc_record)
assert inst2.id == 3
inst3 = Property(4, "AB12CD", "Test Address", data={"some": "data", "uprn": 123})
assert inst3.data == {"some": "data", "uprn": 123}
inst3 = Property(4, "AB12CD", "Test Address", epc_record=epc_record)
assert inst3.data == {"uprn": 1}
def test_get_components(
self, property_instance, mock_cleaner, mock_photo_supply_lookup, mock_floor_area_decile_thresholds
@ -372,7 +383,9 @@ class TestProperty:
property_instance.get_components(cleaned, mock_photo_supply_lookup, mock_floor_area_decile_thresholds)
def test_set_spatial(self):
prop = Property(1, postcode="AB12CD", address="Test Address")
epc_record = EPCRecord()
epc_record.prepared_epc = mock_epc_response["rows"][0]
prop = Property(1, postcode="AB12CD", address="Test Address", epc_record=epc_record)
spatial1 = pd.DataFrame([{
'X_COORDINATE': 411143.0, 'Y_COORDINATE': 281701.0, 'LATITUDE': 52.4331896, 'LONGITUDE': -1.8375238,
@ -386,7 +399,7 @@ class TestProperty:
assert prop.is_heritage
assert prop.restricted_measures
prop2 = Property(1, "AB12CD", "Test Address")
prop2 = Property(1, "AB12CD", "Test Address", epc_record=epc_record)
spatial2 = pd.DataFrame([{
'X_COORDINATE': 411143.0, 'Y_COORDINATE': 281701.0, 'LATITUDE': 52.4331896, 'LONGITUDE': -1.8375238,
@ -403,8 +416,9 @@ class TestProperty:
def test_set_floor_level(self):
# In this case, we have a flat which looks looks it's on the first floor, but it's actually on the ground
# floor, so we should set floor_level to 0
prop = Property(1, postcode="AB12CD", address="Test Address")
prop.data = {'floor-level': '01', 'property-type': 'Flat'}
epc_record = EPCRecord()
epc_record.prepared_epc = {'floor-level': '01', 'property-type': 'Flat'}
prop = Property(1, postcode="AB12CD", address="Test Address", epc_record=epc_record)
prop.floor = {
'original_description': 'Solid, no insulation (assumed)', 'clean_description': 'Solid, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_assumed': True,
@ -419,8 +433,9 @@ class TestProperty:
# This property is labelled as being on the ground floor but actually has another property below
# so we set floor level to 1
prop2 = Property(1, postcode="AB12CD", address="Test Address")
prop2.data = {'floor-level': 'Ground', 'property-type': 'Flat'}
epc_record = EPCRecord()
epc_record.prepared_epc = {'floor-level': 'Ground', 'property-type': 'Flat'}
prop2 = Property(1, postcode="AB12CD", address="Test Address", epc_record=epc_record)
prop2.floor = {
'original_description': '(Another dwelling below)', 'clean_description': 'Solid, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_assumed': False,
@ -434,8 +449,9 @@ class TestProperty:
assert prop2.floor_level == 1
# this property is correctly labelled as being on the 2nd floor
prop3 = Property(1, postcode="AB12CD", address="Test Address")
prop3.data = {'floor-level': '02', 'property-type': 'Flat'}
epc_record = EPCRecord()
epc_record.prepared_epc = {'floor-level': '02', 'property-type': 'Flat'}
prop3 = Property(1, postcode="AB12CD", address="Test Address", epc_record=epc_record)
prop3.floor = {
'original_description': '(Another dwelling below)', 'clean_description': 'Solid, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_assumed': False,
@ -449,8 +465,9 @@ class TestProperty:
assert prop3.floor_level == 2
# Example of a house
prop4 = Property(1, postcode="AB12CD", address="Test Address")
prop4.data = {'floor-level': '', 'property-type': 'House'}
epc_record = EPCRecord()
epc_record.prepared_epc = {'floor-level': '', 'property-type': 'House'}
prop4 = Property(1, postcode="AB12CD", address="Test Address", epc_record=epc_record)
prop4.floor = {
'original_description': '(Another dwelling below)', 'clean_description': 'Solid, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_assumed': False,

File diff suppressed because it is too large Load diff

View file

@ -114,7 +114,8 @@ class Eligibility:
self.loft = {
"suitability": False,
"thickness": None,
"reason": "roof not loft"
"reason": "roof not loft",
"thickness_classification": None
}
return
@ -125,29 +126,34 @@ class Eligibility:
is_flat=self.roof["is_flat"]
)
if insulation_thickness <= 100:
thickness_classification = "0-100mm"
elif insulation_thickness <= high_loft_thickness_threshold:
thickness_classification = "100-270mm"
else:
thickness_classification = "270mm+"
if insulation_thickness <= loft_thickness_threshold:
# We produce a thiclkness classification for the loft
# 0 - 100mm insulation
# 100 - 270mm insulation
# 270mm+ insulation
self.loft = {
"suitability": True,
"thickness": insulation_thickness,
"reason": None
"reason": None,
"thickness_classification": thickness_classification
}
if insulation_thickness <= high_loft_thickness_threshold:
self.loft = {
"suitability": True,
"thickness": insulation_thickness,
"reason": "high loft thickness but below regulation"
}
return
if insulation_thickness > high_loft_thickness_threshold:
# Insulation is already thick enough
self.loft = {
"suitability": False,
"thickness": insulation_thickness,
"reason": "existing insulation"
}
return
# Insulation is already thick enough
self.loft = {
"suitability": False,
"thickness": insulation_thickness,
"reason": "existing insulation",
"thickness_classification": thickness_classification
}
return
def cavity_insulation(self):
@ -161,15 +167,13 @@ class Eligibility:
is_empty = (not self.walls["is_filled_cavity"]) or (
self.walls["is_as_built"] and self.walls["insulation_thickness"] not in ["average", "above average"]
)
is_partial_filled = (
self.walls["is_as_built"] and self.walls["insulation_thickness"] not in ["below average"]
)
is_partial_filled = "partial" in self.walls["clean_description"].lower()
# We look for potentially under performing cavities - anything that is assumed, as built and insulated
is_underperforming = (
self.walls["is_as_built"] and self.walls["insulation_thickness"] in ["average"] and self.walls["is_assumed"]
)
is_unfilled_cavity = is_cavity and is_empty
is_unfilled_cavity = is_cavity and (is_empty and not is_partial_filled)
is_partial_filled_cavity = is_cavity and is_partial_filled
is_underperforming_cavity = is_cavity and is_underperforming
@ -233,6 +237,13 @@ class Eligibility:
def room_roof_insulation(self):
is_room_roof = self.roof["is_roof_room"]
if not is_room_roof:
self.room_roof = {
"suitability": False,
"thickness": None
}
return
insulation_thickness = convert_thickness_to_numeric(
self.roof["insulation_thickness"],
self.roof["is_pitched"],
@ -246,6 +257,14 @@ class Eligibility:
def flat_roof_insulation(self):
is_flat = self.roof["is_flat"]
if not is_flat:
self.flat_roof = {
"suitability": False,
"thickness": None
}
return
insulation_thickness = convert_thickness_to_numeric(
self.roof["insulation_thickness"],
self.roof["is_pitched"],
@ -356,20 +375,21 @@ class Eligibility:
"""
current_sap = int(self.epc["current-energy-efficiency"])
if current_sap >= 69:
self.eco4_warmfront = {
"eligible": False,
"message": "sap too high"
}
return
self.cavity_insulation()
self.loft_insulation()
# make sure conditions 2 and 3 are true
is_eligible = self.cavity["suitability"] & self.loft["suitability"]
if current_sap >= 69:
self.eco4_warmfront = {
"eligible": False,
"message": "sap too high",
"cavity_type": self.cavity["type"],
"loft_type": self.loft["thickness_classification"]
}
return
if post_retrofit_sap is None:
if current_sap >= 55:
@ -386,7 +406,9 @@ class Eligibility:
self.eco4_warmfront = {
"eligible": is_eligible,
"message": message
"message": message,
"cavity_type": self.cavity["type"],
"loft_type": self.loft["thickness_classification"]
}
return
@ -394,7 +416,9 @@ class Eligibility:
self.eco4_warmfront = {
"eligible": is_eligible,
"message": None
"message": None,
"cavity_type": self.cavity["type"],
"loft_type": self.loft["thickness_classification"]
}
return

View file

@ -0,0 +1,665 @@
import numpy as np
import pandas as pd
ECO4_NEW_RATES = 1710
GBIS_NEW_RATES = 600
def app():
# Load in the excel
nov_ha_data = pd.read_excel(
'etl/eligibility/ha_15_32/ALL HA FIGURES AND ASSIGNED INSTALLERS 21.11.2023 with sales data.xlsx',
)
# Drop rows where HA name is null
nov_ha_data = nov_ha_data.dropna(subset=["HA Name"])
nov_ha_data["ha_number"] = nov_ha_data["HA Name"].str.extract(r"(\d+)").astype(int)
nov_ha_data = nov_ha_data.sort_values("ha_number", ascending=True)
variance_explanations = pd.read_excel(
'etl/eligibility/ha_15_32/ALL HA FIGURES AND ASSIGNED INSTALLERS 21.11.2023 with sales data.xlsx',
sheet_name="Variance explanations"
)
september_figures = pd.read_excel(
"etl/eligibility/ha_15_32/ALL HA FIGURES AND ASSIGNED INSTALLERS SEP 23 UPDATE (2).xlsx",
sheet_name="HA Stats"
)
historical_invoices = pd.read_excel(
"etl/eligibility/ha_15_32/ALL HA FIGURES AND ASSIGNED INSTALLERS 21.11.2023 with sales data.xlsx",
sheet_name="Jul 22 to Oct 23"
)
# Drop rows where installer rates is null
historical_invoices = historical_invoices[~pd.isnull(historical_invoices["INSTALLER RATES"])]
historical_invoices = historical_invoices[historical_invoices["INSTALLER RATES"] != "NA "]
# By Scheme, take a weighted mean of the INSTALLER RATES, weighted on the number of rows
n_invoices = historical_invoices.groupby(["Scheme", "INSTALLER RATES"])["Invoice number"].count().reset_index()
n_invoices = n_invoices[n_invoices["Scheme"].isin(["Eco 4", "GBIS"])]
historical_scheme_rates = n_invoices.groupby("Scheme").apply(
lambda x: np.average(x["INSTALLER RATES"], weights=x["Invoice number"])
).reset_index().rename(columns={0: "Historical rates"})
# we take just entries sales data that have sales > 0
sales_data = nov_ha_data[nov_ha_data["Sales"] > 0]
# We now need to adjust sales data depending on the variance explanations
sales_data = sales_data.merge(
variance_explanations[["HA", 'Which figure is correct']],
how="left",
left_on="ha_number",
right_on="HA"
)
def adjust_sales(row):
if pd.isnull(row["Which figure is correct"]):
return row["Sales"]
if row["Which figure is correct"] == "HA facts & figures":
return row['No. of Tech surveys complete']
if row["Which figure is correct"] == "Billed amount":
return row["Sales"]
if row["Which figure is correct"] in ["Both correct, HA facts and figures includes November", "Both correct"]:
return row["Sales"]
raise ValueError(f"Unknown value for 'Which figure is correct': {row['Which figure is correct']}")
# We now need to adjust sales data depending on the variance explanations
sales_data["adjusted_sales"] = sales_data.apply(lambda row: adjust_sales(row), axis=1)
# We therefore adjust GBIS and ECO4 sales data based on adjusted sales
sales_data["adjusted_eco4_sales"] = sales_data["No. of Tech surveys complete - Eco 4"] / sales_data["Sales"] * \
sales_data["adjusted_sales"]
sales_data["adjusted_gbis_sales"] = sales_data["No. of Tech surveys complete - GBIS"] / sales_data["Sales"] * \
sales_data["adjusted_sales"]
sales_data["cancellation_rate"] = (sales_data["Sales"] - sales_data["adjusted_sales"]) / sales_data["Sales"]
# The difference between the adjusted sales and the actual sales is the cancellation
cancellations = (sales_data["adjusted_sales"].sum() - sales_data["Sales"].sum()) / sales_data["Sales"].sum()
# Given the cancellations, we can now adjust the expected remaining surveys
sales_data["No. of Tech surveys remaining"] = sales_data["No. of Tech surveys remaining"] * (
1 - sales_data["cancellation_rate"]
)
# We now merge on the expected values for September
sales_data = sales_data.merge(
september_figures[["Redacted HA", "ECO4", "GBIS"]].rename(
columns={"Redacted HA": "HA Name", "ECO4": "Sept Expected ECO4", "GBIS": "Sept Expected GBIS"}
),
how="left",
on="HA Name",
)
sales_data["Sept Expected ECO4"] = sales_data["Sept Expected ECO4"].fillna(0)
sales_data["Sept Expected GBIS"] = sales_data["Sept Expected GBIS"].fillna(0)
# We calculate the ECO4 and GBIS conversion rates with the adjusted numbers
sales_data["ECO4 Conversion"] = sales_data["adjusted_eco4_sales"] / sales_data["adjusted_sales"]
sales_data["GBIS Conversion"] = sales_data["adjusted_gbis_sales"] / sales_data["adjusted_sales"]
# We now calculate the expected remaining ECO4 and GBIS sales
# We take the number of remaining surveys and multiply by the conversion rate for each scheme, which tells us
# how many more we should expect to see
sales_data["Expected Remaining ECO4"] = sales_data["No. of Tech surveys remaining"] * sales_data["ECO4 Conversion"]
sales_data["Expected Remaining GBIS"] = sales_data["No. of Tech surveys remaining"] * sales_data["GBIS Conversion"]
# We now produce a forecasted ECO4 and GBIS sales figure
sales_data["Forecasted ECO4 Sales"] = sales_data["adjusted_eco4_sales"] + sales_data["Expected Remaining ECO4"]
sales_data["Forecasted GBIS Sales"] = sales_data["adjusted_gbis_sales"] + sales_data["Expected Remaining GBIS"]
# Take the columns we're interestd in
# HA # Properties Sept ECO4 Figures Sept GBIS Figures Nov Total Sales Nov ECO4 Sales Nov GBIS Sales
# Remaining Surveys ECO4 conversion GBIS conversion Forecasted ECO4 Sales Forecasted GBIS sales ECO4 Change
# GBIS Change
sales_data_formatted = sales_data[[
"HA Name",
"ASSET LIST no.",
"Sept Expected ECO4",
"Sept Expected GBIS",
"adjusted_sales",
"adjusted_eco4_sales",
"adjusted_gbis_sales",
"No. of Tech surveys remaining",
"ECO4 Conversion",
"GBIS Conversion",
"Forecasted ECO4 Sales",
"Forecasted GBIS Sales"
]].rename(
columns={
"adjusted_sales": "Oct Total Sales (adjusted for variance)",
"adjusted_eco4_sales": "Oct ECO4 Sales (adjusted for variance)",
"adjusted_gbis_sales": "Oct GBIS Sales (adjusted for variance)",
"No. of Tech surveys remaining": "Remaining Surveys",
}
)
# Convert columns which should be integers to integers
for col in ["ASSET LIST no.", "Remaining Surveys", "Sept Expected ECO4", "Sept Expected GBIS",
"Oct Total Sales (adjusted for variance)", "Oct ECO4 Sales (adjusted for variance)",
"Oct GBIS Sales (adjusted for variance)", "Forecasted ECO4 Sales", "Forecasted GBIS Sales"]:
sales_data_formatted[col] = sales_data_formatted[col].fillna(0)
sales_data_formatted[col] = sales_data_formatted[col].astype(int)
# Remove HA 17 because this was EPCs only. We also remove HA33 because they do not have access to the full portfolio
sales_data_formatted = sales_data_formatted[
~sales_data_formatted["HA Name"].isin(["HA 17", "HA 33"])
]
# September expected ECO4 and GBIS
sept_expected_eco4 = sales_data_formatted["Sept Expected ECO4"].sum()
sept_expected_gbis = sales_data_formatted["Sept Expected GBIS"].sum()
# Completed so far
oct_eco4_sales = sales_data_formatted["Oct ECO4 Sales (adjusted for variance)"].sum()
oct_gbis_sales = sales_data_formatted["Oct GBIS Sales (adjusted for variance)"].sum()
# Forecasted figures
forecasted_eco4_sales = sales_data_formatted["Forecasted ECO4 Sales"].sum()
forecasted_gbis_sales = sales_data_formatted["Forecasted GBIS Sales"].sum()
# Expected remaining sales
expected_remaining_eco4_sales = forecasted_eco4_sales - oct_eco4_sales
expected_remaining_gbis_sales = forecasted_gbis_sales - oct_gbis_sales
# Forecast change vs September
forecasted_eco4_change = 100 * (forecasted_eco4_sales - sept_expected_eco4) / sept_expected_eco4
forecasted_gbis_change = 100 * (forecasted_gbis_sales - sept_expected_gbis) / sept_expected_gbis
aggregates = pd.DataFrame(
columns=["Scheme", "Sept Expected", "Oct Completed", "Forecasted Remaining Sales", "Forecasted Total Sales",
"Forecasted Change vs Sept"],
data=[
["ECO4", sept_expected_eco4, oct_eco4_sales, expected_remaining_eco4_sales, forecasted_eco4_sales,
forecasted_eco4_change],
["GBIS", sept_expected_gbis, oct_gbis_sales, expected_remaining_gbis_sales, forecasted_gbis_sales,
forecasted_gbis_change],
]
)
# Multiply by histoical rates to get revenue
# For ECO4, this is ~£1456 and for GBIS it's ~£432
historical_gbis_price = historical_scheme_rates[
historical_scheme_rates["Scheme"] == "GBIS"
]["Historical rates"].iloc[0]
historical_eco4_price = historical_scheme_rates[
historical_scheme_rates["Scheme"] == "Eco 4"
]["Historical rates"].iloc[0]
aggregates["Sept Expected Revenue"] = np.where(
aggregates["Scheme"] == "ECO4",
aggregates["Sept Expected"] * historical_eco4_price,
aggregates["Sept Expected"] * historical_gbis_price
)
aggregates["Completed Revenue"] = np.where(
aggregates["Scheme"] == "ECO4",
aggregates["Oct Completed"] * historical_eco4_price,
aggregates["Oct Completed"] * historical_gbis_price
)
# We use the new rates for the forecasted revenue
aggregates["Forecasted Remaining Revenue"] = np.where(
aggregates["Scheme"] == "ECO4",
aggregates["Forecasted Remaining Sales"] * ECO4_NEW_RATES,
aggregates["Forecasted Remaining Sales"] * GBIS_NEW_RATES
)
# We also calculate the forecasted remaining revenue at the original price
aggregates["Forecasted Remaining Revenue (original price)"] = np.where(
aggregates["Scheme"] == "ECO4",
aggregates["Forecasted Remaining Sales"] * historical_eco4_price,
aggregates["Forecasted Remaining Sales"] * historical_gbis_price
)
aggregates["Forecasted Revenue"] = aggregates["Completed Revenue"] + aggregates["Forecasted Remaining Revenue"]
# Forecasted revenue with original price
aggregates["Forecasted Revenue (original price)"] = (
aggregates["Completed Revenue"] + aggregates["Forecasted Remaining Revenue (original price)"]
)
# Create a totals row which sums up the two rows
forecasted_change_vs_sept = 100 * (
aggregates["Forecasted Total Sales"].sum() - aggregates["Sept Expected"].sum()
) / aggregates["Sept Expected"].sum()
aggregates = pd.concat(
[
aggregates,
pd.DataFrame(
[
["Total", aggregates["Sept Expected"].sum(), aggregates["Oct Completed"].sum(),
aggregates["Forecasted Remaining Sales"].sum(), aggregates["Forecasted Total Sales"].sum(),
forecasted_change_vs_sept,
aggregates["Sept Expected Revenue"].sum(), aggregates["Completed Revenue"].sum(),
aggregates["Forecasted Remaining Revenue"].sum(),
aggregates["Forecasted Remaining Revenue (original price)"].sum(),
aggregates["Forecasted Revenue"].sum(),
aggregates["Forecasted Revenue (original price)"].sum(),
]
],
columns=aggregates.columns
)
]
)
# For each property in the asset list, we now calculate an average conversion rate to ECO4 and GBIS
# We do this by taking the forecasted sales values for each schemes and dividing by the number of properties
number_properties = sales_data_formatted["ASSET LIST no."].sum()
eco4_conversion_rate = forecasted_eco4_sales / number_properties
gbis_conversion_rate = forecasted_gbis_sales / number_properties
# We also attribute a future value per property
future_eco4_value = ECO4_NEW_RATES * eco4_conversion_rate
future_gbis_value = GBIS_NEW_RATES * gbis_conversion_rate
# We also calulate a revenue figure for the old rates
historical_eco4_value = historical_eco4_price * eco4_conversion_rate
historical_gbis_value = historical_gbis_price * gbis_conversion_rate
# For the HAs that have not begun selling, we estimate the value of the projects
# We start with some problem HAs
# HA 7, HA 24, HA 25
# These HAs have no sales data, so we use the expected figures
problem_has_data = nov_ha_data[
(nov_ha_data["HA Name"].isin(["HA 7", "HA 24", "HA 25"]))
].copy()
# Merge on the september expected figures
problem_has_data = problem_has_data.merge(
september_figures[["Redacted HA", "ECO4", "GBIS"]].rename(
columns={"Redacted HA": "HA Name", "ECO4": "Sept Expected ECO4", "GBIS": "Sept Expected GBIS"}
),
how="left",
on="HA Name",
)
# Fill NAs
problem_has_data["Sept Expected ECO4"] = problem_has_data["Sept Expected ECO4"].fillna(0)
problem_has_data["Sept Expected GBIS"] = problem_has_data["Sept Expected GBIS"].fillna(0)
# We now calculate the expected ECO4 and GBIS sales based on the average conversion rates
problem_has_data["Expected ECO4 Sales"] = problem_has_data["ASSET LIST no."] * eco4_conversion_rate
problem_has_data["Expected GBIS Sales"] = problem_has_data["ASSET LIST no."] * gbis_conversion_rate
# Filter just on columns we're interested in
problem_has_data = problem_has_data[[
"HA Name",
"ASSET LIST no.",
"Sept Expected ECO4",
"Sept Expected GBIS",
"ECO4",
"GBIS",
"Expected ECO4 Sales",
"Expected GBIS Sales"
]].rename(
columns={
"ECO4": "Nov Expected ECO4",
"GBIS": "Nov Expected GBIS",
}
)
# Fill NAs
problem_has_data["Nov Expected ECO4"] = problem_has_data["Nov Expected ECO4"].fillna(0)
problem_has_data["Nov Expected GBIS"] = problem_has_data["Nov Expected GBIS"].fillna(0)
# We calculate HA level Sept, Nov expected revenue, based on historical rates and then forecasted revenue
problem_has_data["Sept Expected ECO4 Value"] = problem_has_data["Sept Expected ECO4"] * historical_eco4_price
problem_has_data["Sept Expected GBIS Value"] = problem_has_data["Sept Expected GBIS"] * historical_gbis_price
problem_has_data["Nov Expected ECO4 Value"] = problem_has_data["Nov Expected ECO4"] * historical_eco4_price
problem_has_data["Nov Expected GBIS Value"] = problem_has_data["Nov Expected GBIS"] * historical_gbis_price
problem_has_data["Forecasted ECO4 Revenue"] = problem_has_data["ASSET LIST no."] * future_eco4_value
problem_has_data["Forecasted GBIS Revenue"] = problem_has_data["ASSET LIST no."] * future_gbis_value
# Totals
problem_has_data["Sept Expected Total Value"] = problem_has_data["Sept Expected ECO4 Value"] + \
problem_has_data["Sept Expected GBIS Value"]
problem_has_data["Nov Expected Total Value"] = problem_has_data["Nov Expected ECO4 Value"] + \
problem_has_data["Nov Expected GBIS Value"]
problem_has_data["Forecasted Total Revenue"] = problem_has_data["Forecasted ECO4 Revenue"] + \
problem_has_data["Forecasted GBIS Revenue"]
# We calculate a total expected value for September, November and then forecasted
problem_has_expected_eco4_value = problem_has_data["Sept Expected ECO4"].sum() * historical_eco4_price
problem_has_expected_gbis_value = problem_has_data["Sept Expected GBIS"].sum() * historical_gbis_price
problem_has_expected_total_value = problem_has_expected_eco4_value + problem_has_expected_gbis_value
problem_has_nov_eco4_value = problem_has_data["Nov Expected ECO4"].sum() * historical_eco4_price
problem_has_nov_gbis_value = problem_has_data["Nov Expected GBIS"].sum() * historical_gbis_price
problem_has_nov_total_value = problem_has_nov_eco4_value + problem_has_nov_gbis_value
forecasted_eco4_value = problem_has_data["ASSET LIST no."].sum() * future_eco4_value
forecasted_gbis_value = problem_has_data["ASSET LIST no."].sum() * future_gbis_value
problem_has_forecasted_total_value = forecasted_eco4_value + forecasted_gbis_value
problem_has_summary = pd.DataFrame(
columns=["Scheme", "Sept Expected", "Nov Expected", "Forecasted"],
data=[
["ECO4", problem_has_expected_eco4_value, problem_has_nov_eco4_value, forecasted_eco4_value],
["GBIS", problem_has_expected_gbis_value, problem_has_nov_gbis_value, forecasted_gbis_value],
["Total", problem_has_expected_total_value, problem_has_nov_total_value, problem_has_forecasted_total_value]
]
)
# We now also estimate the value of the remaining HAs based on historical sales performance and new rates
# We take the has that are not in the sales data
remaining_has = nov_ha_data[
~nov_ha_data["HA Name"].isin(sales_data_formatted["HA Name"])
].copy()
# Merge on the september expected figures
remaining_has = remaining_has.merge(
september_figures[["Redacted HA", "ECO4", "GBIS"]].rename(
columns={"Redacted HA": "HA Name", "ECO4": "Sept Expected ECO4", "GBIS": "Sept Expected GBIS"}
),
how="left",
on="HA Name",
)
# We update the asset list size for HA 33, because they do not have access to the full portfolio
remaining_has.loc[remaining_has["HA Name"] == "HA 33", "ASSET LIST no."] = 20699
# We also remove HA 17
remaining_has = remaining_has[~remaining_has["HA Name"].isin(["HA 17"])]
# We now calculate the expected ECO4 and GBIS sales based on the average conversion rates
remaining_has["Expected ECO4 Sales"] = remaining_has["ASSET LIST no."] * eco4_conversion_rate
remaining_has["Expected GBIS Sales"] = remaining_has["ASSET LIST no."] * gbis_conversion_rate
# Filter just on columns we're interested in
remaining_has = remaining_has[[
"HA Name",
"ASSET LIST no.",
"Sept Expected ECO4",
"Sept Expected GBIS",
"ECO4",
"GBIS",
]].rename(
columns={
"ECO4": "Nov Expected ECO4",
"GBIS": "Nov Expected GBIS",
}
)
remaining_has = remaining_has.fillna(0)
# We take just HAs that had an initial september expectation for ECO4 or GBIS, or that now have a Nov expectation
remaining_has = remaining_has[
(remaining_has["Sept Expected ECO4"] > 0) | (remaining_has["Sept Expected GBIS"] > 0) |
(remaining_has["Nov Expected ECO4"] > 0) | (remaining_has["Nov Expected GBIS"] > 0)
]
# Expected sales based on asset list size and conversion rate
remaining_has["Forecasted Sales ECO4"] = remaining_has["ASSET LIST no."] * eco4_conversion_rate
remaining_has["Forecasted Sales GBIS"] = remaining_has["ASSET LIST no."] * gbis_conversion_rate
# Calculat the total expected value for September and November
remaining_has["Sept Expected ECO4 Value"] = remaining_has["Sept Expected ECO4"] * historical_eco4_price
remaining_has["Sept Expected GBIS Value"] = remaining_has["Sept Expected GBIS"] * historical_gbis_price
remaining_has["Nov Expected ECO4 Value"] = remaining_has["Nov Expected ECO4"] * historical_eco4_price
remaining_has["Nov Expected GBIS Value"] = remaining_has["Nov Expected GBIS"] * historical_gbis_price
# Calculate forecasted revenue
remaining_has["Forecasted ECO4 Revenue"] = remaining_has["ASSET LIST no."] * future_eco4_value
remaining_has["Forecasted GBIS Revenue"] = remaining_has["ASSET LIST no."] * future_gbis_value
# We also calculate forecasted revenue with the original price
remaining_has["Forecasted ECO4 Revenue (original price)"] = remaining_has["ASSET LIST no."] * historical_eco4_value
remaining_has["Forecasted GBIS Revenue (original price)"] = remaining_has["ASSET LIST no."] * historical_gbis_value
# Calculate totals for each scheme
remaining_has_september_eco4_sales = remaining_has["Sept Expected ECO4"].sum()
remaining_has_september_gbis_sales = remaining_has["Sept Expected GBIS"].sum()
remaining_has_november_eco4_sales = remaining_has["Nov Expected ECO4"].sum()
remaining_has_november_gbis_sales = remaining_has["Nov Expected GBIS"].sum()
remaining_has_forecasted_eco4_sales = remaining_has["Forecasted Sales ECO4"].sum()
remaining_has_forecasted_gbis_sales = remaining_has["Forecasted Sales GBIS"].sum()
remaining_has_september_eco4_value = remaining_has["Sept Expected ECO4 Value"].sum()
remaining_has_september_gbis_value = remaining_has["Sept Expected GBIS Value"].sum()
remaining_has_november_eco4_value = remaining_has["Nov Expected ECO4 Value"].sum()
remaining_has_november_gbis_value = remaining_has["Nov Expected GBIS Value"].sum()
remaining_has_forecasted_eco4_value = remaining_has["Forecasted ECO4 Revenue"].sum()
remaining_has_forecasted_gbis_value = remaining_has["Forecasted GBIS Revenue"].sum()
remaining_has_forecasted_eco4_value_original_price = remaining_has["Forecasted ECO4 Revenue (original price)"].sum()
remaining_has_forecasted_gbis_value_original_price = remaining_has["Forecasted GBIS Revenue (original price)"].sum()
# Calculate the change in forecasted sales against the September expected sales
remaining_has_foecast_change_eco4 = 100 * (
remaining_has["Forecasted Sales ECO4"].sum() - remaining_has["Sept Expected ECO4"].sum()
) / remaining_has["Sept Expected ECO4"].sum()
remaining_has_foecast_change_gbis = 100 * (
remaining_has["Forecasted Sales GBIS"].sum() - remaining_has["Sept Expected GBIS"].sum()
) / remaining_has["Sept Expected GBIS"].sum()
# Total change
remaining_has_foecast_change_total = 100 * (
remaining_has["Forecasted Sales ECO4"].sum() + remaining_has["Forecasted Sales GBIS"].sum() -
remaining_has["Sept Expected ECO4"].sum() - remaining_has["Sept Expected GBIS"].sum()
) / (remaining_has["Sept Expected ECO4"].sum() + remaining_has["Sept Expected GBIS"].sum())
asset_list_size = remaining_has["ASSET LIST no."].sum()
# Create a summary table of the rest with the totals for ECO4, GBIS and then a total row
remaining_has_aggregate = pd.DataFrame(
columns=["Scheme", "Asset List Size", "Sept Expected Sales", "Nov Expected Sales", "Forecasted Sales",
"Forecasted Change vs Sept",
"Sept Expected Value", "Nov Expected Value", "Forecasted Value", "Forecasted Value (original price)"],
data=[
[
"ECO4", asset_list_size, remaining_has_september_eco4_sales, remaining_has_november_eco4_sales,
remaining_has_forecasted_eco4_sales, remaining_has_foecast_change_eco4,
remaining_has_september_eco4_value,
remaining_has_november_eco4_value, remaining_has_forecasted_eco4_value,
remaining_has_forecasted_eco4_value_original_price
],
[
"GBIS", asset_list_size, remaining_has_september_gbis_sales, remaining_has_november_gbis_sales,
remaining_has_forecasted_gbis_sales, remaining_has_foecast_change_gbis,
remaining_has_september_gbis_value,
remaining_has_november_gbis_value, remaining_has_forecasted_gbis_value,
remaining_has_forecasted_gbis_value_original_price
],
[
"Total", asset_list_size, remaining_has_september_eco4_sales + remaining_has_september_gbis_sales,
remaining_has_november_eco4_sales + remaining_has_november_gbis_sales,
remaining_has_forecasted_eco4_sales + remaining_has_forecasted_gbis_sales,
remaining_has_foecast_change_total,
remaining_has_september_eco4_value + remaining_has_september_gbis_value,
remaining_has_november_eco4_value + remaining_has_november_gbis_value,
remaining_has_forecasted_eco4_value + remaining_has_forecasted_gbis_value,
remaining_has_forecasted_eco4_value_original_price +
remaining_has_forecasted_gbis_value_original_price
]
]
)
# Calculate pipeline value
pipeline_value = aggregates[["Scheme", "Completed Revenue", "Forecasted Remaining Revenue"]].merge(
remaining_has_aggregate[["Scheme", "Forecasted Value"]].rename(
columns={"Forecasted Value": "Forecasted Revenue, Unconfirmed HAs"}
), how="inner", on="Scheme"
)
# Calculate the total
pipeline_value["Total Value"] = (
pipeline_value["Completed Revenue"] + pipeline_value["Forecasted Remaining Revenue"] + pipeline_value[
"Forecasted Revenue, Unconfirmed HAs"]
)
# TODO: Insert model figures
model_results = pd.DataFrame(
[
{
# This one, we don't have sales data
"HA Name": "HA 15",
"Model Expected Additional ECO4 (unit level)": None,
"Model Expected Total ECO4 (unit level)": 296,
"Model Expected Additional GBIS (unit level)": None,
"Model Expected Total GBIS (unit level)": 209,
},
{
"HA Name": "HA 16",
# Old before re-run
# "Model Expected Additional ECO4 (unit level)": 418,
# "Model Expected Total ECO4 (unit level)": 1820,
# "Model Expected Additional GBIS (unit level)": 576,
# "Model Expected Total GBIS (unit level)": 612,
# IN the partial sales data, WFT have completed 1407 ECO4, 36 GBIS
"Model Expected Additional ECO4 (unit level)": 411 + 342 + 235,
"Model Expected Total ECO4 (unit level)": 1407 + 411 + 342 + 235,
"Model Expected Additional GBIS (unit level)": 223,
"Model Expected Total GBIS (unit level)": 36 + 223,
},
{
"HA Name": "HA 24",
"Model Expected Additional ECO4 (unit level)": 224,
"Model Expected Total ECO4 (unit level)": 848,
"Model Expected Additional GBIS (unit level)": 552,
"Model Expected Total GBIS (unit level)": 552,
},
{
"HA Name": "HA 25",
"Model Expected Additional ECO4 (unit level)": None,
"Model Expected Total ECO4 (unit level)": 1709 + 59,
"Model Expected Additional GBIS (unit level)": None,
"Model Expected Total GBIS (unit level)": 2004 + 107,
}
]
)
sales_data_formatted["Remaining ECO4 Sales"] = (
sales_data_formatted["Forecasted ECO4 Sales"] - sales_data_formatted["Oct ECO4 Sales (adjusted for variance)"]
)
sales_data_formatted["Remaining GBIS Sales"] = (
sales_data_formatted["Forecasted GBIS Sales"] - sales_data_formatted["Oct GBIS Sales (adjusted for variance)"]
)
sales_data_formatted["Completed ECO4 Revenue"] = (sales_data_formatted[
"Oct ECO4 Sales (adjusted for variance)"] *
historical_eco4_price)
sales_data_formatted["Completed GBIS Revenue"] = (sales_data_formatted[
"Oct GBIS Sales (adjusted for variance)"] *
historical_gbis_price)
ha_subset_with_sales = ["HA 15", "HA 16", "HA 24"]
has_subset_with_sales_value = sales_data_formatted[
sales_data_formatted["HA Name"].isin(ha_subset_with_sales)
].copy()[
[
"HA Name",
"Oct ECO4 Sales (adjusted for variance)",
"Oct GBIS Sales (adjusted for variance)",
"Remaining ECO4 Sales",
"Remaining GBIS Sales",
"Forecasted ECO4 Sales",
"Forecasted GBIS Sales",
"Completed ECO4 Revenue",
"Completed GBIS Revenue"
]
]
has_subset_with_sales_value["Remaining ECO4 Revenue"] = has_subset_with_sales_value[
"Remaining ECO4 Sales"] * ECO4_NEW_RATES
has_subset_with_sales_value["Remaining GBIS Revenue"] = has_subset_with_sales_value[
"Remaining GBIS Sales"] * GBIS_NEW_RATES
has_subset_with_sales_value["Remaining Total Revenue"] = (
has_subset_with_sales_value["Remaining ECO4 Revenue"] + has_subset_with_sales_value["Remaining GBIS Revenue"]
)
model_results["Model Expected Additional ECO4 Revenue"] = (
model_results["Model Expected Additional ECO4 (unit level)"] * ECO4_NEW_RATES
)
model_results["Model Expected Additional GBIS revenue"] = (
model_results["Model Expected Additional GBIS (unit level)"] * GBIS_NEW_RATES
)
model_results["Model Expected Additional Total Revenue"] = (
model_results["Model Expected Additional ECO4 Revenue"] + model_results[
"Model Expected Additional GBIS revenue"]
)
# Show more columns with pandas
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
# Look at HA 16
ha16_model = model_results[model_results["HA Name"] == "HA 16"]
has_subset_with_sales_value[has_subset_with_sales_value["HA Name"] == "HA 16"]
# WFT: For HA 16: 4,598,190 ECO4, 57,000 GBIS
# Model:
# Look at HA 24
ha24_model = model_results[model_results["HA Name"] == "HA 24"]
has_subset_with_sales_value[has_subset_with_sales_value["HA Name"] == "HA 24"]
# Look at HA 15
ha15_data = has_subset_with_sales_value[has_subset_with_sales_value["HA Name"] == "HA 15"]
ha15_portfolio_value = ha15_data["Completed ECO4 Revenue"] + ha15_data[
"Completed GBIS Revenue"] + ha15_data["Remaining Total Revenue"]
# # This doesn't have sales data so in the model analysis, we just value the ha as a whole
ha15_model = model_results[model_results["HA Name"] == "HA 15"]
ha15_value = ha15_model["Model Expected Total ECO4 (unit level)"].iloc[0] * ECO4_NEW_RATES + \
ha15_model["Model Expected Total GBIS (unit level)"].iloc[0] * GBIS_NEW_RATES
model_results["Expected ECO4 Revenue"] = model_results["Model Expected Total ECO4 (unit level)"] * ECO4_NEW_RATES
model_results["Expected GBIS Revenue"] = model_results["Model Expected Total GBIS (unit level)"] * GBIS_NEW_RATES
model_results["Expected Total Revenue"] = model_results["Expected ECO4 Revenue"] + model_results[
"Expected GBIS Revenue"]
model_results[model_results["HA Name"].isin(["HA 15"])]
# We now create a final excel with all of the data
# We want:
# 1) aggregates
# 2) sales_data_formatted
# 3) remaining_has_aggregate
# 4) remaining_has
# 5) problem_has_summary
# Function to get the maximum column width
def get_col_widths(dataframe):
# First we find the maximum length of the index column
idx_max = max([len(str(s)) for s in dataframe.index.values] + [len(str(dataframe.index.name))])
# Then, we concatenate this to the max of the lengths of column name and its max value for each column, row-wise
return [idx_max] + [max(dataframe[col].astype(str).map(len).max(), len(col)) for col in dataframe.columns]
# Create a Pandas Excel writer using XlsxWriter as the engine
with pd.ExcelWriter('HA Pipeline Analysis.xlsx', engine='xlsxwriter') as writer:
# Write each dataframe to a different worksheet without the index
for df, sheet in [(aggregates, 'Forecasted Sales'),
(sales_data_formatted, 'Sales Data'),
(remaining_has_aggregate, 'Remaining HAs Value'),
(remaining_has, 'Remaining HAs data'),
(pipeline_value, 'Pipeline Value'),
(problem_has_summary, 'Problem HAs Analysis'),
(problem_has_data, 'Problem HAs Data')
]:
df.to_excel(writer, sheet_name=sheet, index=False)
# Auto-adjust columns' width
for i, width in enumerate(get_col_widths(df)):
writer.sheets[sheet].set_column(i, i, width)

View file

@ -4,6 +4,7 @@ used by the Warmfront team, to identify which properties are eligible for ECO4 a
work is being done in December 2023, prior to completion of acquisition
"""
import pickle
from etl.epc.Record import EPCRecord
from pathlib import Path
from tqdm import tqdm
import pandas as pd
@ -16,8 +17,6 @@ from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from backend.Property import Property
from etl.eligibility.Eligibility import Eligibility
from etl.epc.DataProcessor import DataProcessor
from backend.app.plan.utils import create_recommendation_scoring_data
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from backend.ml_models.api import ModelApi
@ -347,48 +346,31 @@ def prepare_model_data_row(
:param modelling_epc:
:return:
"""
epc_records = {
'original_epc': modelling_epc.copy(),
'full_sap_epc': full_sap_epc.copy(),
'old_data': old_data.copy(),
}
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
p = Property(
id=property_id,
postcode=modelling_epc["postcode"],
address=modelling_epc["address1"],
data=modelling_epc,
old_data=old_data,
full_sap_epc=full_sap_epc
epc_record=prepared_epc
)
p.get_components(cleaned, photo_supply_lookup=photo_supply_lookup,
floor_area_decile_thresholds=floor_area_decile_thresholds)
# THIS IS TEMP AND SHOULDN'T BE HERE
data_to_clean = p.get_model_data()
if data_to_clean["NUMBER_HEATED_ROOMS"] in ['', None]:
data_to_clean["NUMBER_HEATED_ROOMS"] = data_to_clean["NUMBER_HABITABLE_ROOMS"]
p.data["number-heated-rooms"] = data_to_clean["NUMBER_HABITABLE_ROOMS"]
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**data_to_clean, LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
p.get_components(
cleaned, photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
)
p.set_number_lighting_outlets(cleaned_property_data)
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
data_processor.pre_process()
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
# We simulate the impact of the retrofit using expected performance of the wall and roof,
# after retrofit. We use the minimal u-values required to meet building regulations part L
# TODO: Check the performance of the materials warmfront's installers use, particularly for
# cavity
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
cavity_simulation = {
"recommendation_id": "-".join([property_id, "cavity"]),
@ -404,21 +386,16 @@ def prepare_model_data_row(
"parts": [{"depth": 270}]
}
cavity_scoring = create_recommendation_scoring_data(
property=p,
recommendation=cavity_simulation,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
simulations = [
[cavity_simulation],
[loft_simulation]
]
loft_scoring = create_recommendation_scoring_data(
property=p,
recommendation=loft_simulation,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
p.adjust_difference_record_with_recommendations(simulations)
# Make sure we definitely have the correct data
cavity_scoring = [x for x in p.recommendations_scoring_data if "cavity" in x["id"]][0]
loft_scoring = [x for x in p.recommendations_scoring_data if "loft" in x["id"]][0]
return [cavity_scoring, loft_scoring]

View file

@ -0,0 +1,113 @@
import openpyxl
import pandas as pd
import numpy as np
def get_excel_survey_list(workbook_path, worksheet_name=None):
survey_workbook = openpyxl.load_workbook(workbook_path)
if worksheet_name is not None:
survey_sheet = survey_workbook[worksheet_name]
else:
survey_sheet = survey_workbook.active
survey_rows = []
survey_colors = []
for row in survey_sheet.iter_rows(min_row=2, values_only=False): # Assuming the first row is headers
row_data = [cell.value for cell in row] # This will get you the cell values
row_color = row[0].fill.start_color.index if row[0].fill.start_color.index != '00000000' else None
# row_color = COLOR_INDEX[row_color]
survey_rows.append(row_data)
survey_colors.append(row_color)
survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]])
survey_list["row_colour"] = survey_colors
return survey_list
def load_data():
# Load for HA 16 - ECO 4
ha16_survey_list = get_excel_survey_list('etl/eligibility/ha_15_32/HESTIA- HA 16 ECO4 SURVEY LIST.xlsx')
# Load for HA 24 - ECO 4
ha24_survey_list = get_excel_survey_list('etl/eligibility/ha_15_32/HESTIA - HA 24 ECO4 SURVEY LIST.xlsx')
# Load for HA 25 - ECO 3
ha25_survey_list = get_excel_survey_list(
'etl/eligibility/ha_15_32/HESTIA - HA 25 ECO3 SURVEY LIST.xlsx', worksheet_name="CAVITY"
)
# Remove columns with None column names
ha25_survey_list = ha25_survey_list.dropna(axis=1, how='all')
# Standardised this installation status columns
ha16_survey_list["survey_status"] = ha16_survey_list["INSTALLED OR CANCELLED"].copy()
ha16_survey_list["survey_status"] = ha16_survey_list["survey_status"].replace(
{
"NO UPDATE - CHECKED 2.10.23": "no update",
"NO UPDATE - CHECKED 18.12.23": "no update",
"INSTALLED": "installed",
"CANCELLED": "cancelled",
"LOFT STILL TO BE INSTALLED": "loft remaining",
}
)
ha24_survey_list["survey_status"] = ha24_survey_list["INSTALLED OR CANCELLED"].copy()
ha24_survey_list["survey_status"] = ha24_survey_list["survey_status"].replace(
{
"NO UPDATE - CHECKED 21.11.23": "no update",
"NO UPDATE - CHECKED 18.12.23": "no update",
"INSTALLED": "installed",
"CANCELLED": "cancelled",
"LOFT STILL TO BE INSTALLED": "loft remaining",
"SEE NOTES >>": "see notes",
}
)
# We need to prepare HA25 differently
ha25_survey_list["survey_status"] = np.where(
ha25_survey_list["row_colour"] == "FF7030A0", "installed",
np.where(ha25_survey_list["row_colour"] == "FF92D050", "installed",
np.where(ha25_survey_list["row_colour"] == "FFFF0000", "cancelled",
np.where(ha25_survey_list["row_colour"] == "FFFFFF00", "filler row - drop",
np.where(ha25_survey_list["row_colour"] == "FF38FD23", "installed", "unknown")
)
)
)
)
ha25_survey_list = ha25_survey_list[ha25_survey_list["survey_status"] != "filler row - drop"]
# We standardise the cancellation reasons - just create a new column
ha16_survey_list["cancellation_reason"] = ha16_survey_list["INSTALLERS NOTES ; REASONS FOR CANCELLATIONS"].copy()
ha24_survey_list["cancellation_reason"] = ha24_survey_list["INSTALLERS NOTES ; REASONS FOR CANCELLATIONS"].copy()
# There's no cancellation reason for HA25
ha25_survey_list["cancellation_reason"] = "No reason provided"
# Combine the dataframes
ha16_survey_list["HA"] = "HA 16"
ha24_survey_list["HA"] = "HA 24"
ha25_survey_list["HA"] = "HA 25"
cancellation_data = pd.concat(
[
ha16_survey_list[["HA", "survey_status", "cancellation_reason"]],
ha24_survey_list[["HA", "survey_status", "cancellation_reason"]],
ha25_survey_list[["HA", "survey_status", "cancellation_reason"]]
]
)
# Take just rows that we have a confirmed status for
cancellation_data = cancellation_data[~cancellation_data["survey_status"].isin(["no update", "loft remaining"])]
return cancellation_data
def app():
"""
This application is used to analyse the cancellation data provided by warmfront
:return:
"""
# This is cancellations of jobs that completed invasive surveys and the installer could not conclude the work
sales_cancellation_data = load_data()

File diff suppressed because it is too large Load diff

View file

@ -33,7 +33,6 @@ NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS]
ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
# These lookups are used to clean the construction age band
construction_age_bounds_map = {
"England and Wales: before 1900": {"l": 0, "u": 1899},
@ -74,7 +73,8 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None:
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training", violation_mode: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
@ -82,23 +82,23 @@ class EPCDataProcessor:
want to perform, such as confine_data()
"""
is_data_a_dataframe = isinstance(data, pd.DataFrame)
self.data : pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages : pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
if run_mode not in ["training", "newdata"]:
raise ValueError("Run mode must be either training or newdata")
self.run_mode = run_mode if not violation_mode else "newdata"
def prepare_data(self, filepath: Path | str | None = None) -> None:
"""
Given the run mode, we apply the relevant pipeline steps
Ignore step is used to highlight which steps are not needed in newdata
"""
ignore_step = True if self.run_mode == "newdata" else False
if filepath is not None:
@ -126,7 +126,7 @@ class EPCDataProcessor:
self.fill_na_fields()
self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step)
# Final re-casting after data transformed and prepared
self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True)
@ -138,31 +138,35 @@ class EPCDataProcessor:
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
# TODO: check if this has impact on training dataset
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
# cleaned_data = self.apply_averages_cleaning(
# data_to_clean=self.data,
# cleaning_data=self.cleaning_averages,
# cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
# colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
# )
# When running in newdata mode, cleaning_averages has lower cases so we co-erce back to upper
cleaning_averages = self.cleaning_averages.copy()
if self.run_mode == "newdata":
cleaning_averages.columns = cleaning_averages.columns.str.upper()
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
)
data_to_clean=self.data,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
)
self.data = self.data if cleaned_data is None else cleaned_data
self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step)
self.cast_data_columns_to_lower()
def cast_data_columns_to_lower(self):
"""
Convert all columns names to lower
"""
self.data.columns = self.data.columns.str.lower()
def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False):
"""
Convert all column names to lower
@ -171,9 +175,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
def add_local_authority_to_cleaning_average(self, ignore_step: bool = False):
"""
Add the Local authority column to the cleaning averages
@ -182,7 +186,7 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
@ -195,7 +199,7 @@ class EPCDataProcessor:
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
@ -218,7 +222,6 @@ class EPCDataProcessor:
for col in convert_to_lower:
self.data[col] = self.data[col].str.lower()
def remap_build_form(self):
"""
Remap build form to standard values
@ -226,7 +229,6 @@ class EPCDataProcessor:
"""
self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
def remap_anomalies(self):
"""
Remap anomalies to None
@ -258,7 +260,7 @@ class EPCDataProcessor:
if ignore_step:
return
self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
def load_data(self, filepath, low_memory=False) -> None:
@ -404,7 +406,8 @@ class EPCDataProcessor:
# self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# # Final re-casting after data transformed and prepared
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else
# COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
@ -423,7 +426,7 @@ class EPCDataProcessor:
# cleaning_data=self.cleaning_averages,
# cols_to_merge_on=COLUMNS_TO_MERGE_ON
# )
# self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
# self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
@ -431,7 +434,6 @@ class EPCDataProcessor:
# return self.data, self.cleaning_averages
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
@ -578,7 +580,7 @@ class EPCDataProcessor:
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
@ -604,15 +606,15 @@ class EPCDataProcessor:
self.data[key] = self.data[key].astype(value)
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
"""
Using a dictionary to recast all columns at once
"""
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
self.data = self.data.astype(column_mappings)
def confine_data(self, ignore_step: bool = False):
@ -642,7 +644,7 @@ class EPCDataProcessor:
violation_missing_hotwater_description,
violation_missing_roof_description,
violation_invalid_property_type,
], axis=1,
], axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
@ -654,8 +656,8 @@ class EPCDataProcessor:
"violation_missing_roof_description",
"violation_invalid_property_type",
]
)
)
self.data = pd.concat([self.data, violation_df], axis=1)
if ignore_step:
@ -703,7 +705,7 @@ class EPCDataProcessor:
if self.violation_mode:
# TODO:
return
if ignore_step:
return
@ -721,7 +723,9 @@ class EPCDataProcessor:
self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
@staticmethod
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False):
def apply_averages_cleaning(
data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False
):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.

View file

@ -15,6 +15,37 @@ from recommendations.recommendation_utils import (
get_wall_type,
)
# TODO: Can probably produce this in the property change app and store in S3
BOOLEAN_VARIABLES = [
'is_cavity_wall', 'is_filled_cavity', 'is_solid_brick', 'is_system_built', 'is_timber_frame',
'is_granite_or_whinstone', 'is_as_built', 'is_cob', 'is_sandstone_or_limestone', 'is_park_home',
'external_insulation', 'internal_insulation', 'is_park_home_ending', 'external_insulation_ending',
'internal_insulation_ending', 'is_to_unheated_space', 'is_to_external_air', 'is_suspended', 'is_solid',
'another_property_below', 'is_pitched', 'is_roof_room', 'is_loft', 'is_flat', 'is_thatched', 'is_at_rafters',
'has_dwelling_above', 'has_radiators', 'has_fan_coil_units', 'has_pipes_in_screed_above_insulation',
'has_pipes_in_insulated_timber_floor', 'has_pipes_in_concrete_slab', 'has_boiler', 'has_air_source_heat_pump',
'has_room_heaters', 'has_electric_storage_heaters', 'has_warm_air', 'has_electric_underfloor_heating',
'has_electric_ceiling_heating', 'has_community_scheme', 'has_ground_source_heat_pump', 'has_no_system_present',
'has_portable_electric_heaters', 'has_water_source_heat_pump', 'has_electric_heat_pump', 'has_micro-cogeneration',
'has_solar_assisted_heat_pump', 'has_exhaust_source_heat_pump', 'has_community_heat_pump', 'has_electric',
'has_mains_gas', 'has_wood_logs', 'has_coal', 'has_oil', 'has_wood_pellets', 'has_anthracite',
'has_dual_fuel_mineral_and_wood', 'has_smokeless_fuel', 'has_lpg', 'has_b30k', 'has_electricaire',
'has_assumed_for_most_rooms', 'has_underfloor_heating', 'has_radiators_ending', 'has_fan_coil_units_ending',
'has_pipes_in_screed_above_insulation_ending', 'has_pipes_in_insulated_timber_floor_ending',
'has_pipes_in_concrete_slab_ending', 'has_boiler_ending', 'has_air_source_heat_pump_ending',
'has_room_heaters_ending', 'has_electric_storage_heaters_ending', 'has_warm_air_ending',
'has_electric_underfloor_heating_ending', 'has_electric_ceiling_heating_ending', 'has_community_scheme_ending',
'has_ground_source_heat_pump_ending', 'has_no_system_present_ending', 'has_portable_electric_heaters_ending',
'has_water_source_heat_pump_ending', 'has_electric_heat_pump_ending', 'has_micro-cogeneration_ending',
'has_solar_assisted_heat_pump_ending', 'has_exhaust_source_heat_pump_ending', 'has_community_heat_pump_ending',
'has_electric_ending', 'has_mains_gas_ending', 'has_wood_logs_ending', 'has_coal_ending', 'has_oil_ending',
'has_wood_pellets_ending', 'has_anthracite_ending', 'has_dual_fuel_mineral_and_wood_ending',
'has_smokeless_fuel_ending', 'has_lpg_ending', 'has_b30k_ending', 'has_electricaire_ending',
'has_assumed_for_most_rooms_ending', 'has_underfloor_heating_ending', 'multiple_room_thermostats',
'multiple_room_thermostats_ending', 'is_community', 'no_individual_heating_or_community_network',
'is_community_ending', 'no_individual_heating_or_community_network_ending'
]
class BaseDataset:
"""
@ -616,7 +647,7 @@ class TrainingDataset(BaseDataset):
for col in missings.index:
unique_values = self.df[col].unique()
if True in unique_values or False in unique_values:
if (True in unique_values) or (False in unique_values) or (col in BOOLEAN_VARIABLES):
self.df[col] = self.df[col].fillna(False)
if "none" in unique_values:
self.df[col] = self.df[col].fillna("none")

View file

@ -105,6 +105,8 @@ class EPCRecord:
year_built: int = None
number_of_floors: int = None
number_of_open_fireplaces: int = None
heat_loss_corridor_bool: bool = None
solar_water_heating_flag_bool: bool = None
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
@ -378,9 +380,8 @@ class EPCRecord:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]]
if self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES
else None
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if
self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None
)
def _clean_number_lighting_outlets(self):
@ -390,7 +391,7 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
if self.prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES:
# We check old EPCs and the full SAP EPC
lighting_data = []
@ -415,18 +416,19 @@ class EPCRecord:
np.median(lighting_data)
)
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority
# Use averages from the cleaning dataset, based on the property type, built form, construction age
# band and local authority
cleaning_data = self.cleaning_data.copy()
# When running in new-data more, the columns will have been coerced to lower case so we push them
# back to upper case
if self.run_mode == "newdata":
cleaning_data.columns = [x.upper() for x in cleaning_data.columns]
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe(
"prepared_epc", replace_empty_string=True
),
cleaning_data=self.cleaning_data,
cols_to_merge_on=[
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"LOCAL_AUTHORITY",
],
data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]
@ -535,19 +537,14 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
map = {
mains_gas_map = {
"Y": True,
"N": False,
}
self.prepared_epc["mains-gas-flag"] = (
None
if (
self.prepared_epc["mains-gas-flag"] == ""
or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
)
else map[self.prepared_epc["mains-gas-flag"]]
)
self.prepared_epc["mains-gas-flag"] = None if (
self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
) else mains_gas_map[self.prepared_epc["mains-gas-flag"]]
def _clean_heat_loss_corridor(self):
"""
@ -556,24 +553,33 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
map = {
valid_values = [
"no corridor",
"unheated corridor",
"heated corridor"
]
boolean_map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False,
}
self.prepared_epc["heat-loss-corridor"] = (
False
if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES
else map[self.prepared_epc["heat-loss-corridor"]]
"no corridor" if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES else
self.prepared_epc["heat-loss-corridor"]
)
if self.prepared_epc["heat-loss-corridor"] not in valid_values:
self.prepared_epc["heat-loss-corridor"] = "no corridor"
self.prepared_epc["unheated-corridor-length"] = (
float(self.prepared_epc["unheated-corridor-length"])
if self.prepared_epc["unheated-corridor-length"] != ""
else None
float(self.prepared_epc["unheated-corridor-length"]) if
self.prepared_epc["unheated-corridor-length"] not in ["", None] else None
)
# We create boolean versions of heat-loss-corridor
self.heat_loss_corridor_bool = boolean_map[self.prepared_epc["heat-loss-corridor"]]
def _clean_count_variables(self):
"""
This method will clean the count variables, if empty or invalid
@ -581,26 +587,24 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
fields = [
"number-open-fireplaces",
"extension-count",
"flat-storey-count",
"number-habitable-rooms"
]
null_attributes = ["number_of_storeys", "number_of_rooms"]
null_attributes = ["flat-storey-count", "number-habitable-rooms"]
for attribute, epc_field in fields.items():
# TODO: check this
# value = self.data["extension-count"]
value = self.prepared_epc[epc_field]
if value == "" or value in DATA_ANOMALY_MATCHES:
for attribute in fields:
value = self.prepared_epc[attribute]
if value in DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
value = int(float(value))
self.prepared_epc[attribute] = value
@ -611,11 +615,9 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["wind-turbine-count"] = (
int(self.prepared_epc["wind-turbine-count"])
if self.prepared_epc["wind-turbine-count"] != ""
else None
)
self.prepared_epc['wind-turbine-count'] = int(
self.prepared_epc['wind-turbine-count']
) if self.prepared_epc['wind-turbine-count'] not in DATA_ANOMALY_MATCHES else None
def _clean_solar_hot_water(self):
"""
@ -625,15 +627,24 @@ class EPCRecord:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {
"Y": "Y",
"N": "N",
"": "N",
None: "N"
}
boolean_map = {
"Y": True,
"N": False,
"": None,
}
self.prepared_epc["solar-water-heating-flag"] = value_map[
self.prepared_epc["solar-water-heating-flag"]
]
# Create a boolean version for storage in the database
self.solar_water_heating_flag_bool = boolean_map[self.prepared_epc['solar-water-heating-flag']]
def _clean_solar_pv(self):
"""
This method will clean the solar pv, if empty or invalid
@ -641,11 +652,8 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["photo-supply"] = (
float(self.prepared_epc["photo-supply"])
if self.prepared_epc["photo-supply"] != ""
else None
)
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if (
self.prepared_epc['photo-supply'] not in DATA_ANOMALY_MATCHES) else None
def _clean_energy(self):
"""
@ -668,12 +676,13 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["built-form"] = BUILT_FORM_REMAP.get(
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(
self.prepared_epc["built-form"], self.prepared_epc["built-form"]
)
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] == "Flat":
self.prepared_epc["built-form"] = "Semi-Detached"
if self.prepared_epc["property-type"] in ["Flat", "Maisonette"]:
self.prepared_epc["built-form"] = "End-Terrace"
def _clean_age_band(self):
"""
@ -682,10 +691,11 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"] = EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"]
)
if self.construction_age_band in DATA_ANOMALY_MATCHES:
if self.prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
@ -693,31 +703,31 @@ class EPCRecord:
old_record["lodgement-datetime"]
for old_record in self.old_data
if old_record["construction-age-band"]
not in DATA_ANOMALY_MATCHES
not in DATA_ANOMALY_MATCHES
]
)
most_recent = [
old_record
for old_record in self.old_data
if old_record["lodgement-datetime"] == max_datetime
]
self.construction_age_band = (
EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
most_recent = [old_record for old_record in self.old_data if
old_record["lodgement-datetime"] == max_datetime]
self.prepared_epc["construction-age-band"] = EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.construction_age_band = self.prepared_epc["construction-age-band"]
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (
self.age_band is None
):
self.age_band = "L"
self.construction_age_band = "England and Wales: 2012 onwards"
self.construction_age_band = 'England and Wales: 2012 onwards'
self.prepared_epc["construction-age-band"] = self.construction_age_band
if self.age_band is None:
raise ValueError("age_band is missing")
self.age_band = "C"
self.construction_age_band = "England and Wales: 1930-1949"
self.prepared_epc["construction-age-band"] = self.construction_age_band
def _clean_year_built(self):
"""
@ -750,13 +760,10 @@ class EPCRecord:
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc["mechanical-ventilation"] = (
None
if (
self.mechanical_ventilation == ""
or self.mechanical_ventilation in DATA_ANOMALY_MATCHES
)
else self.mechanical_ventilation
self.prepared_epc['mechanical-ventilation'] = None if (
self.prepared_epc['mechanical-ventilation'] in DATA_ANOMALY_MATCHES
) else (
self.prepared_epc['mechanical-ventilation']
)
def _field_validation(self):
@ -793,13 +800,15 @@ class EPCRecord:
validation_config["function"](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}"
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}"
)
if validation_config["acceptable_values"] is not None:
if field_value not in validation_config["acceptable_values"]:
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable values of {validation_config['acceptable_values']}"
f"Field {record_key} has value {field_value} which is not in the acceptable values of "
f"{validation_config['acceptable_values']}"
)
def _validate_float(
@ -818,7 +827,8 @@ class EPCRecord:
validation_config["function"](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}"
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}"
)
if validation_config["range"] is not None:
@ -827,7 +837,8 @@ class EPCRecord:
or field_value > validation_config["range"][1]
):
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable range of {validation_config['range']}"
f"Field {record_key} has value {field_value} which is not in the acceptable range of "
f"{validation_config['range']}"
)
def __sub__(self, other):
@ -1045,7 +1056,8 @@ class EPCDifferenceRecord:
def ensure_adequate_data(self) -> bool:
"""
This method will ensure that the difference record has adequate data, to keep record, even if rdsap change is zero
This method will ensure that the difference record has adequate data, to keep record, even if rdsap change is
zero
Can move into the initiation of the difference record
"""
wall_check = self.record1.walls_description == self.record2.walls_description

View file

@ -43,7 +43,11 @@ DATA_ANOMALY_MATCHES = {
# contain a null value. A resolution to correct these anomalies will be considered for future data releases.
"NULL",
# We sometimes see fields populated with just an empty string.
""
"",
# We sometimes find None values - particulatly when we produce an estimated EPC
None,
# An older value which rarely shows up but has been seen in the data.
"UNKNOWN",
}
DATA_ANOMALY_SUBSTRINGS = {

View file

@ -0,0 +1,358 @@
import pytest
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.Record import EPCRecord
from etl.epc.settings import DATA_ANOMALY_MATCHES
import random
class TestEpcRecord:
@pytest.fixture()
def cleaning_data(self):
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
return cleaning_data
@pytest.fixture()
def epc_records_1(self):
epc_records_1 = {
'original_epc': {
'low-energy-fixed-light-count': '', 'address': '139 School Road, Hall Green',
'uprn-source': 'Energy Assessor', 'floor-height': '2.6', 'heating-cost-potential': '1138',
'unheated-corridor-length': '', 'hot-water-cost-potential': '175',
'construction-age-band': 'England and Wales: 1900-1929', 'potential-energy-rating': 'B',
'mainheat-energy-eff': 'Good', 'windows-env-eff': 'Average', 'lighting-energy-eff': 'Very Good',
'environment-impact-potential': '82', 'glazed-type': 'double glazing, unknown install date',
'heating-cost-current': '2711', 'address3': '',
'mainheatcont-description': 'Programmer, TRVs and bypass',
'sheating-energy-eff': 'N/A', 'property-type': 'House', 'local-authority-label': 'Birmingham',
'fixed-lighting-outlets-count': '11', 'energy-tariff': 'Single', 'mechanical-ventilation': 'natural',
'hot-water-cost-current': '310', 'county': '', 'postcode': 'B28 8JF', 'solar-water-heating-flag': 'N',
'constituency': 'E14000562', 'co2-emissions-potential': '2.0', 'number-heated-rooms': '4',
'floor-description': 'Suspended, no insulation (assumed)', 'energy-consumption-potential': '107',
'local-authority': 'E08000025', 'built-form': 'Semi-Detached', 'number-open-fireplaces': '0',
'windows-description': 'Fully double glazed', 'glazed-area': 'Normal', 'inspection-date': '2023-07-05',
'mains-gas-flag': 'Y', 'co2-emiss-curr-per-floor-area': '65', 'address1': '139 School Road',
'heat-loss-corridor': '', 'flat-storey-count': '', 'constituency-label': 'Birmingham, Hall Green',
'roof-energy-eff': 'Average', 'total-floor-area': '103.0', 'building-reference-number': '10004697322',
'environment-impact-current': '43', 'co2-emissions-current': '6.7',
'roof-description': 'Pitched, 100 mm loft insulation', 'floor-energy-eff': 'N/A',
'number-habitable-rooms': '4', 'address2': 'Hall Green', 'hot-water-env-eff': 'Good',
'posttown': 'BIRMINGHAM', 'mainheatc-energy-eff': 'Average', 'main-fuel': 'mains gas (not community)',
'lighting-env-eff': 'Very Good', 'windows-energy-eff': 'Average', 'floor-env-eff': 'N/A',
'sheating-env-eff': 'N/A', 'lighting-description': 'Low energy lighting in 82% of fixed outlets',
'roof-env-eff': 'Average', 'walls-energy-eff': 'Very Poor', 'photo-supply': '0.0',
'lighting-cost-potential': '182', 'mainheat-env-eff': 'Good', 'multi-glaze-proportion': '100',
'main-heating-controls': '', 'lodgement-datetime': '2023-07-13 08:23:07', 'flat-top-storey': '',
'current-energy-rating': 'E', 'secondheat-description': 'None', 'walls-env-eff': 'Very Poor',
'transaction-type': 'rental', 'uprn': '100070505235', 'current-energy-efficiency': '51',
'energy-consumption-current': '366', 'mainheat-description': 'Boiler and radiators, mains gas',
'lighting-cost-current': '182', 'lodgement-date': '2023-07-13', 'extension-count': '0',
'mainheatc-env-eff': 'Average',
'lmk-key': 'c1d137711da433fb3cced74b1a6848da8bbc1159d076455d26d7b4668982601e',
'wind-turbine-count': '0',
'tenure': 'Rented (social)', 'floor-level': '', 'potential-energy-efficiency': '84',
'hot-water-energy-eff': 'Good', 'low-energy-lighting': '82',
'walls-description': 'Solid brick, as built, no insulation (assumed)',
'hotwater-description': 'From main system'}, 'full_sap_epc': {}, 'old_data': []
}
return epc_records_1
def test_clean_mechanical_ventilation(self, cleaning_data, epc_records_1):
# We have an epc with Natural ventilation - the resulting epc should also have natural ventulation
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"mechanical-ventilation": "natural"
}
record._clean_ventilation()
assert record.prepared_epc["mechanical-ventilation"] == "natural"
record2 = EPCRecord(cleaning_data=cleaning_data)
record2.prepared_epc = {
"mechanical-ventilation": ""
}
record2._clean_ventilation()
assert record2.prepared_epc["mechanical-ventilation"] is None
record3 = EPCRecord(cleaning_data=cleaning_data)
record3.prepared_epc = {
"mechanical-ventilation": None
}
record3._clean_ventilation()
assert record3.prepared_epc["mechanical-ventilation"] is None
record4 = EPCRecord(cleaning_data=cleaning_data)
record4.prepared_epc = {
"mechanical-ventilation": "INVALID"
}
record4._clean_ventilation()
assert record4.prepared_epc["mechanical-ventilation"] is None
def test_clean_energy_valid_values(self, cleaning_data, epc_records_1):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"energy-consumption-current": "200",
"co2-emissions-current": "5.5"
}
record._clean_energy()
assert record.prepared_epc["energy-consumption-current"] == 200.0
assert record.prepared_epc["co2-emissions-current"] == 5.5
def test_clean_energy_empty_values(self, cleaning_data):
# We cannot have invalid values so this should raise an exception
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"energy-consumption-current": "",
"co2-emissions-current": ""
}
with pytest.raises(ValueError):
record._clean_energy()
def test_clean_built_form_valid_remap(self, cleaning_data, epc_records_1):
record = EPCRecord(cleaning_data=cleaning_data)
# Assuming "Semi" should be remapped to "Semi-Detached"
record.prepared_epc = {
"built-form": "Semi-Detached",
"property-type": "Flat" # Assuming this affects the remapping
}
record._clean_built_form()
assert record.prepared_epc["built-form"] == "Semi-Detached"
def test_clean_built_form_anomaly(self, cleaning_data, epc_records_1):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"built-form": "",
"property-type": "Flat"
}
record._clean_built_form()
assert record.prepared_epc["built-form"] == "End-Terrace"
def test_clean_floor_area_valid(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"total-floor-area": "120.5"
}
record._clean_floor_area()
assert record.prepared_epc["total-floor-area"] == 120.5
def test_clean_floor_area_empty(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"total-floor-area": ""
}
# We have no known case of missing floor area
with pytest.raises(ValueError):
record._clean_floor_area()
def test_clean_heat_loss_corridor_valid(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"heat-loss-corridor": "unheated corridor",
"unheated-corridor-length": ""
}
record._clean_heat_loss_corridor()
assert record.prepared_epc["heat-loss-corridor"] == "unheated corridor"
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"heat-loss-corridor": "unheated corridor",
"unheated-corridor-length": None
}
record._clean_heat_loss_corridor()
assert record.prepared_epc["heat-loss-corridor"] == "unheated corridor"
assert record.prepared_epc["unheated-corridor-length"] is None
def test_clean_heat_loss_corridor_anomaly(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
# Assuming "InvalidCorridor" is an anomaly
record.prepared_epc = {
"heat-loss-corridor": "InvalidCorridor",
"unheated-corridor-length": ""
}
record._clean_heat_loss_corridor()
assert record.prepared_epc["heat-loss-corridor"] == "no corridor"
def test_clean_mains_gas_valid(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"mains-gas-flag": "Y"
}
record._clean_mains_gas()
assert record.prepared_epc["mains-gas-flag"] is True
def test_clean_mains_gas_anomaly(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"mains-gas-flag": "InvalidValue"
}
# It should always be Y or N or an anomally value
with pytest.raises(KeyError):
record._clean_mains_gas()
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"mains-gas-flag": random.choice(list(DATA_ANOMALY_MATCHES))
}
record._clean_mains_gas()
assert record.prepared_epc["mains-gas-flag"] is None
def test_clean_solar_hot_water_valid(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"solar-water-heating-flag": "Y"
}
record._clean_solar_hot_water()
assert record.prepared_epc["solar-water-heating-flag"] == "Y"
assert record.solar_water_heating_flag_bool is True
def test_clean_solar_hot_water_empty(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"solar-water-heating-flag": ""
}
record._clean_solar_hot_water()
assert record.prepared_epc["solar-water-heating-flag"] == "N"
assert record.solar_water_heating_flag_bool is False
def test_clean_number_lighting_outlets_valid(self, cleaning_data, epc_records_1):
record = EPCRecord(cleaning_data=cleaning_data, epc_records=epc_records_1)
record.prepared_epc = {
"fixed-lighting-outlets-count": "5"
}
record._clean_number_lighting_outlets()
assert record.prepared_epc["fixed-lighting-outlets-count"] == 5.0
def test_clean_number_lighting_outlets_empty(self, cleaning_data, epc_records_1):
record = EPCRecord(cleaning_data=cleaning_data)
record.run_mode = "newdata"
record.prepared_epc = {
"fixed-lighting-outlets-count": "",
"property-type": "Flat",
"built-form": "Semi-Detached",
"construction-age-band": "England and Wales: 1900-1929",
"local-authority": "E08000025",
"number-habitable-rooms": "4",
"number-heated-rooms": "4",
}
record.old_data = []
record.full_sap_epc = []
record._clean_number_lighting_outlets()
assert record.prepared_epc["fixed-lighting-outlets-count"] == 8.0
def test_clean_count_variables(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"number-open-fireplaces": "1",
"extension-count": None,
"flat-storey-count": "",
"number-habitable-rooms": "INVALID!",
}
record._clean_count_variables()
assert record.prepared_epc["number-open-fireplaces"] == 1.0
assert record.prepared_epc["extension-count"] == 0
assert record.prepared_epc["flat-storey-count"] is None
assert record.prepared_epc["number-habitable-rooms"] is None
def test_clean_floor_level(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"floor-level": "1",
}
record._clean_floor_level()
assert record.prepared_epc["floor-level"] == 1.0
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"floor-level": "",
}
record._clean_floor_level()
assert record.prepared_epc["floor-level"] is None
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"floor-level": None,
}
record._clean_floor_level()
assert record.prepared_epc["floor-level"] is None
def test_clean_solar_hot_water(self, cleaning_data):
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"solar-water-heating-flag": "Y",
}
record._clean_solar_hot_water()
assert record.prepared_epc["solar-water-heating-flag"] == "Y"
assert record.solar_water_heating_flag_bool is True
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"solar-water-heating-flag": "N",
}
record._clean_solar_hot_water()
assert record.prepared_epc["solar-water-heating-flag"] == "N"
assert record.solar_water_heating_flag_bool is False
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"solar-water-heating-flag": "",
}
record._clean_solar_hot_water()
assert record.prepared_epc["solar-water-heating-flag"] == "N"
assert record.solar_water_heating_flag_bool is False
record = EPCRecord(cleaning_data=cleaning_data)
record.prepared_epc = {
"solar-water-heating-flag": None,
}
record._clean_solar_hot_water()
assert record.prepared_epc["solar-water-heating-flag"] == "N"
assert record.solar_water_heating_flag_bool is False

View file

@ -0,0 +1,38 @@
"""
This script will create an input csv for the recommendation engine and upload it to S3, which can be used for
testing
"""
import os
import pandas as pd
from utils.s3 import save_csv_to_s3
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", None)
USER_ID = 8
PORTFOLIO_ID = 61
def app():
pilot_file = pd.DataFrame(
[
{"address": "42, Foxes Field", "postcode": "TR18 3RJ", "Notes": None},
{"address": "11, Cranley Gardens", "postcode": "TQ13 8UT", "Notes": None},
]
)
# Store the data in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/livewest_pilot_file.csv"
save_csv_to_s3(
dataframe=pilot_file,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename
}
print(body)

View file

@ -0,0 +1,38 @@
"""
This script will create an input csv for the recommendation engine and upload it to S3, which can be used for
testing
"""
import os
import pandas as pd
from utils.s3 import save_csv_to_s3
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", None)
USER_ID = 8
PORTFOLIO_ID = 59
def app():
pilot_file = pd.DataFrame(
[
{"address": "10 Elm Close", "postcode": "CV37 8XL", "Notes": None},
{"address": "21, Spring Lane", "postcode": "MK17 0QP", "Notes": None},
]
)
# Store the data in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/the_guiness_partnership_pilot_file.csv"
save_csv_to_s3(
dataframe=pilot_file,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename
}
print(body)

View file

@ -109,6 +109,7 @@ class FloorRecommendations(Definitions):
insulation_thickness=self.property.floor["insulation_thickness"],
wall_type=self.property.wall_type
)
self.estimated_u_value = u_value
if u_value < self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:

View file

@ -91,6 +91,7 @@ class RoofRecommendations:
raise NotImplementedError("Implement me")
u_value = get_roof_u_value(**{**self.property.roof, "age_band": self.property.age_band})
self.estimated_u_value = u_value
if u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
# The Roof is already compliant

View file

@ -29,4 +29,34 @@ floor_uvalue_test_cases = [
"insulation_thickness": None,
"expected": ValueError,
},
# 16 Glastonbury road EPR - the EPR has 0.71 due to the property having 320mm wall thickness, but default being 250
{
"floor_type": "suspended",
"area": 34.5,
"perimeter": 16.7,
"age_band": "D",
"wall_type": "cavity",
"insulation_thickness": None,
"expected": 0.72,
},
# 31 Loddon Way - the EPR has 0.5 due to the property having 320mm wall thickness, but default being 250
{
"floor_type": "solid",
"area": 52.08,
"perimeter": 16.2,
"age_band": "E",
"wall_type": "cavity",
"insulation_thickness": None,
"expected": 0.52,
},
# 62 Pearmain Drive
{
"floor_type": "solid",
"area": 38.64,
"perimeter": 18.1,
"age_band": "E",
"wall_type": "cavity",
"insulation_thickness": None,
"expected": 0.69,
},
]

View file

@ -76,5 +76,33 @@ wall_uvalue_test_cases = [
"is_granite_or_whinstone": False,
"is_sandstone_or_limestone": False,
"uvalue": 0
},
{
"clean_description": "Cavity wall, as built, insulated",
"age_band": "F",
"is_granite_or_whinstone": False,
"is_sandstone_or_limestone": False,
"uvalue": 0.4
},
{
"clean_description": "Cavity wall, as built, insulated",
"age_band": "D",
"is_granite_or_whinstone": False,
"is_sandstone_or_limestone": False,
"uvalue": 0.7
},
{
"clean_description": "Cavity wall, filled cavity",
"age_band": "E",
"is_granite_or_whinstone": False,
"is_sandstone_or_limestone": False,
"uvalue": 0.7
},
{
"clean_description": "Cavity wall, as built, no insulation",
"age_band": "E",
"is_granite_or_whinstone": False,
"is_sandstone_or_limestone": False,
"uvalue": 1.5
}
]

View file

@ -1,16 +1,18 @@
from backend.Property import Property
from unittest.mock import Mock
from recommendations.FireplaceRecommendations import FireplaceRecommendations
from etl.epc.Record import EPCRecord
class TestFirepaceRecommendations:
def test_no_fireplaces(self):
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.data = {
"number-open-fireplaces": 0
epc_record = EPCRecord()
epc_record.prepared_epc = {
"number-open-fireplaces": 0,
}
property_instance = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
recommender = FireplaceRecommendations(
property_instance=property_instance
)
@ -22,10 +24,11 @@ class TestFirepaceRecommendations:
assert recommender.recommendation is None
def test_one_fireplace(self):
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.data = {
"number-open-fireplaces": 1
epc_record = EPCRecord()
epc_record.prepared_epc = {
"number-open-fireplaces": 1,
}
property_instance = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
recommender = FireplaceRecommendations(
property_instance=property_instance
@ -40,10 +43,11 @@ class TestFirepaceRecommendations:
assert recommender.recommendation[0]["total"] == 300
def test_multiple_fireplaces(self):
property_instance = Property(id=0, address="fake", postcode="fake")
property_instance.data = {
"number-open-fireplaces": 3
epc_record = EPCRecord()
epc_record.prepared_epc = {
"number-open-fireplaces": 3,
}
property_instance = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
recommender = FireplaceRecommendations(
property_instance=property_instance

View file

@ -1,5 +1,5 @@
import pytest
from unittest.mock import Mock
from etl.epc.Record import EPCRecord
from backend.Property import Property
from recommendations.LightingRecommendations import LightingRecommendations
@ -9,18 +9,20 @@ from recommendations.tests.test_data.materials import materials
class TestLightingRecommendations:
def test_init_invalid_materials(self):
input_property0 = Property(id=1, postcode="F4k3 6", address="623 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Greater London Authority"}
input_property0 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
input_property0.lighting = {"low_energy_proportion": 0}
input_property0.data = {"county": "Greater London Authority"}
# Test for invalid materials
with pytest.raises(ValueError):
LightingRecommendations(input_property0, [])
def test_recommend_no_action_needed(self):
# Case where no recommendation is needed
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Greater London Authority"}
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
input_property1.lighting = {"low_energy_proportion": 100}
input_property1.data = {"county": "Greater London Authority"}
lr = LightingRecommendations(input_property1, materials)
lr.recommend()
@ -28,9 +30,9 @@ class TestLightingRecommendations:
def test_recommend_action_needed(self):
# Case where recommendation is needed
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property1.lighting = {"low_energy_proportion": 100}
input_property1.data = {"county": "Greater London Authority"}
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Greater London Authority"}
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
input_property1.lighting = {"low_energy_proportion": 0.80}
input_property1.number_lighting_outlets = 20

View file

@ -1,12 +1,17 @@
from backend.Property import Property
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.tests.test_data.materials import materials
from etl.epc.Record import EPCRecord
class TestRoofRecommendations:
def test_loft_insulation_recommendation_no_insulation(self):
property_instance = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Cambridgeshire",
}
property_instance = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance.age_band = "F"
property_instance.insulation_floor_area = 100
property_instance.roof = {
@ -18,9 +23,6 @@ class TestRoofRecommendations:
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': 'none', 'roof_thermal_transmittance': None, 'roof_insulation_thickness': 'none'
}
property_instance.data = {
"county": "Cambridgeshire",
}
roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
@ -31,7 +33,9 @@ class TestRoofRecommendations:
assert len(roof_recommender.recommendations)
def test_loft_insulation_recommendation_50mm_insulation(self):
property_instance2 = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Kent"}
property_instance2 = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance2.age_band = "F"
property_instance2.insulation_floor_area = 100
property_instance2.roof = {
@ -43,7 +47,6 @@ class TestRoofRecommendations:
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': '50', 'roof_thermal_transmittance': None, 'roof_insulation_thickness': 'none'
}
property_instance2.data = {"county": "Kent"}
roof_recommender2 = RoofRecommendations(property_instance=property_instance2, materials=materials)
@ -57,7 +60,9 @@ class TestRoofRecommendations:
assert roof_recommender2.recommendations[0]["new_u_value"] == 0.14
assert roof_recommender2.recommendations[0]["starting_u_value"] == 0.68
property_instance3 = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Greater London Authority"}
property_instance3 = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance3.age_band = "F"
property_instance3.insulation_floor_area = 100
property_instance3.roof = {
@ -69,7 +74,6 @@ class TestRoofRecommendations:
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': '50', 'roof_thermal_transmittance': None, 'roof_insulation_thickness': 'none'
}
property_instance3.data = {"county": "Greater London Authority"}
roof_recommender3 = RoofRecommendations(property_instance=property_instance3, materials=materials)
@ -82,7 +86,9 @@ class TestRoofRecommendations:
assert roof_recommender3.recommendations[0]["parts"][0]["depth"] == 270
def test_loft_insulation_recommendation_150mm_insulation(self):
property_instance4 = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "North East Lincolnshire"}
property_instance4 = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance4.age_band = "F"
property_instance4.insulation_floor_area = 100
property_instance4.roof = {
@ -94,7 +100,6 @@ class TestRoofRecommendations:
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': '150', 'roof_thermal_transmittance': None, 'roof_insulation_thickness': 'none'
}
property_instance4.data = {"county": "North East Lincolnshire"}
roof_recommender4 = RoofRecommendations(property_instance=property_instance4, materials=materials)
@ -109,7 +114,9 @@ class TestRoofRecommendations:
assert roof_recommender4.recommendations[0]["starting_u_value"] == 0.3
assert roof_recommender4.recommendations[0]["parts"][0]["depth"] == 150
property_instance5 = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Somerset"}
property_instance5 = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance5.age_band = "F"
property_instance5.insulation_floor_area = 100
property_instance5.roof = {
@ -121,7 +128,6 @@ class TestRoofRecommendations:
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': '150', 'roof_thermal_transmittance': None, 'roof_insulation_thickness': 'none'
}
property_instance5.data = {"county": "Somerset"}
roof_recommender5 = RoofRecommendations(property_instance=property_instance5, materials=materials)
@ -136,7 +142,9 @@ class TestRoofRecommendations:
def test_loft_insulation_recommendation_270mm_insulation(self):
# We shouldn't recommend anything in this case
property_instance6 = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Portsmouth"}
property_instance6 = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance6.age_band = "F"
property_instance6.insulation_floor_area = 100
property_instance6.roof = {
@ -148,7 +156,6 @@ class TestRoofRecommendations:
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': '270', 'roof_thermal_transmittance': None, 'roof_insulation_thickness': 'none'
}
property_instance6.data = {"county": "Portsmouth"}
roof_recommender6 = RoofRecommendations(property_instance=property_instance6, materials=materials)
@ -277,7 +284,9 @@ class TestRoofRecommendations:
# "Insulate your room roof with 270mm of Example room roof insulation"
def test_flat_no_insulation(self):
property_instance11 = Property(id=11, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Swindon"}
property_instance11 = Property(id=11, address="fake", postcode="fake", epc_record=epc_record)
property_instance11.age_band = "D"
property_instance11.insulation_floor_area = 33.5
property_instance11.perimeter = 24
@ -288,7 +297,6 @@ class TestRoofRecommendations:
'is_roof_room': False, 'is_loft': False, 'is_flat': True, 'is_thatched': False, 'is_at_rafters': False,
'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True, 'insulation_thickness': 'none'
}
property_instance11.data = {"county": "Swindon"}
roof_recommender11 = RoofRecommendations(property_instance=property_instance11, materials=materials)
@ -306,7 +314,9 @@ class TestRoofRecommendations:
"Insulate the home's flat roof with 150mm of Ecotherm Eco-Versal General Purpose Insulation Board"
def test_flat_insulated(self):
property_instance12 = Property(id=12, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Thurrock"}
property_instance12 = Property(id=12, address="fake", postcode="fake", epc_record=epc_record)
property_instance12.age_band = "D"
property_instance12.insulation_floor_area = 40
property_instance12.perimeter = 30
@ -319,7 +329,6 @@ class TestRoofRecommendations:
'is_loft': False, 'is_flat': True, 'is_thatched': False, 'is_at_rafters': False, 'is_assumed': True,
'has_dwelling_above': False, 'is_valid': True, 'insulation_thickness': 'average'
}
property_instance12.data = {"county": "Thurrock"}
roof_recommender12 = RoofRecommendations(property_instance=property_instance12, materials=materials)
@ -330,7 +339,9 @@ class TestRoofRecommendations:
assert not roof_recommender12.recommendations
def test_flat_limited_insulation(self):
property_instance13 = Property(id=12, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Tyne and Wear"}
property_instance13 = Property(id=12, address="fake", postcode="fake", epc_record=epc_record)
property_instance13.age_band = "D"
property_instance13.insulation_floor_area = 40
property_instance13.perimeter = 40
@ -342,7 +353,6 @@ class TestRoofRecommendations:
'is_loft': False, 'is_flat': True, 'is_thatched': False, 'is_at_rafters': False, 'is_assumed': True,
'has_dwelling_above': False, 'is_valid': True, 'insulation_thickness': 'below average'
}
property_instance13.data = {"county": "Tyne and Wear"}
roof_recommender13 = RoofRecommendations(property_instance=property_instance13, materials=materials)
@ -362,7 +372,9 @@ class TestRoofRecommendations:
"Insulate the home's flat roof with 150mm of Ecotherm Eco-Versal General Purpose Insulation Board"
def test_property_above(self):
property_instance14 = Property(id=0, address="fake", postcode="fake")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Suffolk"}
property_instance14 = Property(id=0, address="fake", postcode="fake", epc_record=epc_record)
property_instance14.age_band = "F"
property_instance14.insulation_floor_area = 100
property_instance14.roof = {
@ -373,7 +385,6 @@ class TestRoofRecommendations:
'is_assumed': False, 'has_dwelling_above': True, 'is_valid': True,
'insulation_thickness': None
}
property_instance14.data = {"county": "Suffolk"}
roof_recommender14 = RoofRecommendations(property_instance=property_instance14, materials=materials)

View file

@ -1,45 +1,50 @@
import pytest
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from backend.Property import Property
from etl.epc.Record import EPCRecord
class TestSolarPvRecommendations:
@pytest.fixture
def property_instance_invalid_type(self):
# Setup the property_instance with an invalid property type
property_instance_invalid_type = Property(id=1, address="", postcode="")
property_instance_invalid_type.data = {
epc_record = EPCRecord()
epc_record.prepared_epc = {
"property-type": "InvalidType", "county": "Broxbourne", "photo-supply": None
}
property_instance_invalid_type = Property(id=1, address="", postcode="", epc_record=epc_record)
property_instance_invalid_type.roof = {"is_flat": False, "is_pitched": False, "is_roof_room": False}
return property_instance_invalid_type
@pytest.fixture
def property_instance_invalid_roof(self):
# Setup the property_instance with invalid roof type
property_instance_invalid_roof = Property(id=1, address="", postcode="")
property_instance_invalid_roof.data = {
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Huntingdonshire", "property-type": "House", "photo-supply": None
}
property_instance_invalid_roof = Property(id=1, address="", postcode="", epc_record=epc_record)
property_instance_invalid_roof.roof = {"is_flat": False, "is_pitched": False, "is_roof_room": False}
return property_instance_invalid_roof
@pytest.fixture
def property_instance_has_solar_pv(self):
# Setup the property_instance without existing solar pv
property_instance_has_solar_pv = Property(id=1, address="", postcode="")
property_instance_has_solar_pv.data = {"photo-supply": "40", "county": "Huntingdonshire",
"property-type": "House"}
epc_record = EPCRecord()
epc_record.prepared_epc = {"photo-supply": "40", "county": "Huntingdonshire",
"property-type": "House"}
property_instance_has_solar_pv = Property(id=1, address="", postcode="", epc_record=epc_record)
property_instance_has_solar_pv.roof = {"is_flat": True}
return property_instance_has_solar_pv
@pytest.fixture
def property_instance_valid_all(self):
# Setup a valid property_instance that passes all conditions
property_instance_valid_all = Property(id=1, address="", postcode="")
epc_record = EPCRecord()
epc_record.prepared_epc = {"property-type": "House", "photo-supply": None, "county": "Huntingdonshire"}
property_instance_valid_all = Property(id=1, address="", postcode="", epc_record=epc_record)
property_instance_valid_all.solar_pv_roof_area = 20
property_instance_valid_all.solar_pv_percentage = 40
property_instance_valid_all.data = {"property-type": "House", "photo-supply": None, "county": "Huntingdonshire"}
property_instance_valid_all.roof = {"is_flat": True}
return property_instance_valid_all

View file

@ -1,13 +1,15 @@
from backend.Property import Property
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.tests.test_data.materials import materials
from etl.epc.Record import EPCRecord
class TestVentilationRecommendations:
def test_natural_ventilation(self):
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property1.data = {"mechanical-ventilation": "natural"}
epc_record = EPCRecord()
epc_record.prepared_epc = {"mechanical-ventilation": "natural"}
input_property1 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
recommender = VentilationRecommendations(
property_instance=input_property1,
@ -27,8 +29,9 @@ class TestVentilationRecommendations:
assert recommender.recommendation[0]["parts"][0]["quantity"] == 2
def test_missing_ventilation(self):
input_property2 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property2.data = {"mechanical-ventilation": None}
epc_record = EPCRecord()
epc_record.prepared_epc = {"mechanical-ventilation": None}
input_property2 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
recommender2 = VentilationRecommendations(
property_instance=input_property2,
@ -48,8 +51,9 @@ class TestVentilationRecommendations:
assert recommender2.recommendation[0]["parts"][0]["quantity"] == 2
def test_nodata_ventilation(self):
input_property3 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property3.data = {"mechanical-ventilation": "NO DATA!!"}
epc_record = EPCRecord()
epc_record.prepared_epc = {"mechanical-ventilation": "NO DATA!!"}
input_property3 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
recommender3 = VentilationRecommendations(
property_instance=input_property3,
@ -69,8 +73,9 @@ class TestVentilationRecommendations:
assert recommender3.recommendation[0]["parts"][0]["quantity"] == 2
def test_existing_ventilation_1(self):
input_property4 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property4.data = {"mechanical-ventilation": 'mechanical, extract only'}
epc_record = EPCRecord()
epc_record.prepared_epc = {"mechanical-ventilation": "mechanical, extract only"}
input_property4 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
recommender4 = VentilationRecommendations(
property_instance=input_property4,
@ -85,8 +90,9 @@ class TestVentilationRecommendations:
assert recommender4.has_ventilaion
def test_existing_ventilation_2(self):
input_property5 = Property(id=1, postcode="F4k3 6", address="623 fake street")
input_property5.data = {"mechanical-ventilation": 'mechanical, supply and extract'}
epc_record = EPCRecord()
epc_record.prepared_epc = {"mechanical-ventilation": "mechanical, supply and extract"}
input_property5 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
recommender5 = VentilationRecommendations(
property_instance=input_property5,

View file

@ -7,6 +7,7 @@ from recommendations.WallRecommendations import WallRecommendations
from backend.Property import Property
from recommendations.recommendation_utils import is_diminishing_returns
from recommendations.tests.test_data.materials import materials
from etl.epc.Record import EPCRecord
# with open(
@ -231,7 +232,9 @@ class TestWallRecommendationsBase:
class TestCavityWallRecommensations:
def test_fill_empty_cavity(self):
input_property = Property(id=1, postcode="F4k3", address="123 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "Derbyshire"}
input_property = Property(id=1, postcode="F4k3", address="123 fake street", epc_record=epc_record)
input_property.walls = {
'original_description': 'Cavity wall, as built, no insulation (assumed)',
'clean_description': 'Cavity wall, as built, no insulation',
@ -245,7 +248,6 @@ class TestCavityWallRecommensations:
}
input_property.age_band = "C"
input_property.insulation_wall_area = 50
input_property.data = {"county": "Derbyshire"}
recommender = WallRecommendations(
property_instance=input_property,
@ -265,7 +267,9 @@ class TestCavityWallRecommensations:
assert np.isclose(recommender.recommendations[1]["total"], 2004.6600000000003)
def test_fill_partial_filled_cavity(self):
input_property = Property(id=1, postcode="F4k3", address="123 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"county": "County Durham"}
input_property = Property(id=1, postcode="F4k3", address="123 fake street", epc_record=epc_record)
input_property.walls = {
'original_description': 'Cavity wall, as built, partial insulation (assumed)',
'clean_description': 'Cavity wall, as built, partial insulation',
@ -279,7 +283,6 @@ class TestCavityWallRecommensations:
}
input_property.age_band = "C"
input_property.insulation_wall_area = 50
input_property.data = {"county": "County Durham"}
recommender = WallRecommendations(
property_instance=input_property,
@ -299,7 +302,9 @@ class TestCavityWallRecommensations:
assert np.isclose(recommender.recommendations[1]["total"], 1999.9350000000002)
def test_system_built_wall(self):
input_property2 = Property(id=1, postcode="F4k3 2", address="223 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"property-type": "House", "county": "Derbyshire", "built-form": "Detached"}
input_property2 = Property(id=1, postcode="F4k3 2", address="223 fake street", epc_record=epc_record)
input_property2.walls = {
'original_description': 'System built, as built, no insulation (assumed)',
'clean_description': 'System built, as built, no insulation',
@ -314,7 +319,6 @@ class TestCavityWallRecommensations:
input_property2.age_band = "F"
input_property2.insulation_wall_area = 120
input_property2.restricted_measures = False
input_property2.data = {"property-type": "House", "county": "Derbyshire", "built-form": "Detached"}
assert input_property2.walls["is_system_built"]
@ -346,7 +350,9 @@ class TestCavityWallRecommensations:
assert recommender2.recommendations[6]["parts"][0]["depth"] == 52.5
def test_timber_frame_wall(self):
input_property3 = Property(id=1, postcode="F4k3 2", address="223 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"property-type": "House", "county": "Derbyshire", "built-form": "Semi-Detached"}
input_property3 = Property(id=1, postcode="F4k3 2", address="223 fake street", epc_record=epc_record)
input_property3.walls = {
'original_description': 'Timber frame, as built, no insulation (assumed)',
'clean_description': 'Timber frame, as built, no insulation',
@ -361,7 +367,6 @@ class TestCavityWallRecommensations:
input_property3.age_band = "B"
input_property3.insulation_wall_area = 99
input_property3.restricted_measures = False
input_property3.data = {"property-type": "House", "county": "Derbyshire", "built-form": "Semi-Detached"}
assert input_property3.walls["is_timber_frame"]
@ -388,7 +393,9 @@ class TestCavityWallRecommensations:
assert recommender3.recommendations[1]["parts"][0]["depth"] == 150.0
def test_granite_or_whinstone_wall(self):
input_property4 = Property(id=1, postcode="F4k3 2", address="223 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"property-type": "Bungalow", "county": "Derbyshire", "built-form": "Detached"}
input_property4 = Property(id=1, postcode="F4k3 2", address="223 fake street", epc_record=epc_record)
input_property4.walls = {
'original_description': 'Granite or whinstone, as built, no insulation (assumed)',
'clean_description': 'Granite or whinstone, as built, no insulation',
@ -403,7 +410,6 @@ class TestCavityWallRecommensations:
input_property4.age_band = "A"
input_property4.insulation_wall_area = 223
input_property4.restricted_measures = False
input_property4.data = {"property-type": "Bungalow", "county": "Derbyshire", "built-form": "Detached"}
assert input_property4.walls["is_granite_or_whinstone"]
@ -430,7 +436,9 @@ class TestCavityWallRecommensations:
assert recommender4.recommendations[1]["parts"][0]["depth"] == 150
def test_cob_wall(self):
input_property5 = Property(id=1, postcode="F4k3 2", address="223 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"property-type": "Bungalow", "county": "Derbyshire", "built-form": "Detached"}
input_property5 = Property(id=1, postcode="F4k3 2", address="223 fake street", epc_record=epc_record)
input_property5.walls = {
'original_description': 'Cob, as built',
'clean_description': 'Cob, as built',
@ -445,7 +453,6 @@ class TestCavityWallRecommensations:
input_property5.age_band = "E"
input_property5.insulation_wall_area = 77
input_property5.restricted_measures = False
input_property5.data = {"property-type": "Bungalow", "county": "Derbyshire", "built-form": "Detached"}
assert input_property5.walls["is_cob"]
@ -472,7 +479,9 @@ class TestCavityWallRecommensations:
assert recommender5.recommendations[3]["parts"][0]["depth"] == 100
def test_sandstone_or_limestone_wall(self):
input_property6 = Property(id=1, postcode="F4k3 6", address="623 fake street")
epc_record = EPCRecord()
epc_record.prepared_epc = {"property-type": "House", "county": "Derbyshire", "built-form": "Mid-Terrace"}
input_property6 = Property(id=1, postcode="F4k3 6", address="623 fake street", epc_record=epc_record)
input_property6.walls = {
'original_description': 'Sandstone or limestone, as built, no insulation (assumed)',
'clean_description': 'Sandstone or limestone, as built, no insulation',
@ -487,7 +496,6 @@ class TestCavityWallRecommensations:
input_property6.age_band = "F"
input_property6.insulation_wall_area = 350
input_property6.restricted_measures = False
input_property6.data = {"property-type": "House", "county": "Derbyshire", "built-form": "Mid-Terrace"}
assert input_property6.walls["is_sandstone_or_limestone"]

View file

@ -1,6 +1,7 @@
from recommendations.WindowsRecommendations import WindowsRecommendations
from backend.Property import Property
from recommendations.tests.test_data.materials import materials
from etl.epc.Record import EPCRecord
class TestWindowRecommendations:
@ -10,16 +11,17 @@ class TestWindowRecommendations:
For this property, we expect all windows to be single glazed and should recommend full double glazing
:return:
"""
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 0,
"uprn": 0
}
property_1 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 0,
"uprn": 0
}
epc_record=epc_record
)
property_1.windows = {
'original_description': 'Single glazed', 'has_glazing': False, 'glazing_coverage': 'full',
@ -47,16 +49,17 @@ class TestWindowRecommendations:
double glazing
:return:
"""
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 33,
"uprn": 0
}
property_2 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 33,
"uprn": 0
}
epc_record=epc_record
)
property_2.windows = {'original_description': 'Mostly double glazing', 'has_glazing': True,
'glazing_coverage': 'most',
@ -81,16 +84,17 @@ class TestWindowRecommendations:
This property has full double glazing so we shouldn't recommend anything
:return:
"""
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 100,
"uprn": 0
}
property_3 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 80,
"uprn": 0
}
epc_record=epc_record
)
property_3.windows = {'original_description': 'Fully double glazed', 'has_glazing': True,
'glazing_coverage': 'full',
@ -106,15 +110,17 @@ class TestWindowRecommendations:
assert not recommender3.recommendation
def test_fully_secondary_glazed(self):
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 100,
"uprn": 0
}
property_4 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 100,
"uprn": 0
}
epc_record=epc_record
)
property_4.windows = {'original_description': 'Full secondary glazing', 'has_glazing': True,
'glazing_coverage': 'full',
@ -130,15 +136,17 @@ class TestWindowRecommendations:
assert not recommender4.recommendation
def test_partial_secondary_glazing(self):
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 50,
"uprn": 0
}
property_5 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 50,
"uprn": 0
}
epc_record=epc_record
)
property_5.windows = {'original_description': 'Partial secondary glazing', 'has_glazing': True,
'glazing_coverage': 'partial',
@ -160,15 +168,18 @@ class TestWindowRecommendations:
'labour_days': 0.8125, 'is_secondary_glazing': True}]
def test_single_glazed_restricted_measures(self):
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 0,
"uprn": 0
}
property_6 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 0,
"uprn": 0
}
epc_record=epc_record
)
property_6.windows = {'original_description': 'Single glazed', 'has_glazing': False, 'glazing_coverage': None,
'glazing_type': 'single',
@ -195,15 +206,17 @@ class TestWindowRecommendations:
]
def test_full_triple_glazed(self):
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 100,
"uprn": 0
}
property_7 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 100,
"uprn": 0
}
epc_record=epc_record
)
property_7.windows = {'original_description': 'Fully triple glazed', 'has_glazing': True,
'glazing_coverage': 'full',
@ -222,16 +235,17 @@ class TestWindowRecommendations:
"""
We should just recommend double glazing to the remaining windows, since it's a cheaper option
"""
epc_record = EPCRecord()
epc_record.prepared_epc = {
"county": "Wychavon",
"multi-glaze-proportion": 80,
"uprn": 1
}
property_8 = Property(
id=1,
postcode='1',
address='1',
data={
"county": "Wychavon",
"multi-glaze-proportion": 80,
"uprn": 1
}
epc_record=epc_record
)
property_8.windows = {'original_description': 'Mostly triple glazing', 'has_glazing': True,
'glazing_coverage': 'most',

View file

@ -1,3 +1,4 @@
import pickle
import boto3
from io import BytesIO, StringIO
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
@ -141,5 +142,56 @@ def save_csv_to_s3(dataframe, bucket_name, file_name):
s3.put_object(Body=csv_buffer.getvalue(), Bucket=bucket_name, Key=file_name)
return True
except Exception as e:
print(f"An error occurred: {e}")
logger.error(f"An error occurred: {e}")
return False
def save_pickle_to_s3(data, bucket_name, s3_file_name):
"""
Save an object to an S3 bucket as a pickle file.
:param data: The data to save
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name to use for the saved data in S3 (should end in .pkl)
"""
# Serialize data to a pickle format
try:
serialized_data = pickle.dumps(data)
except Exception as e:
print(f'Failed to serialize data: {str(e)}')
return
# Use save_data_to_s3 function to upload the serialized data to S3
save_data_to_s3(serialized_data, bucket_name, s3_file_name)
def read_pickle_from_s3(bucket_name, s3_file_name):
"""
Read a pickle file from an S3 bucket and return the data.
:param bucket_name: The name of the S3 bucket
:param s3_file_name: The file name of the pickle file in S3
:return: The data read from the pickle file
"""
try:
s3 = boto3.client('s3')
s3_response = s3.get_object(Bucket=bucket_name, Key=s3_file_name)
serialized_data = s3_response['Body'].read()
except NoCredentialsError:
logger.errpr("Credentials not available.")
return None
except PartialCredentialsError:
logger.errpr("Incomplete credentials provided.")
return None
except Exception as e:
logger.errpr(f'Failed to download data from {bucket_name}/{s3_file_name}: {str(e)}')
return None
# Deserialize data from pickle format
try:
data = pickle.loads(serialized_data)
except Exception as e:
logger.errpr(f'Failed to deserialize data: {str(e)}')
return None
return data