diff --git a/.idea/Model.iml b/.idea/Model.iml
index 762580d9..df6c4faa 100644
--- a/.idea/Model.iml
+++ b/.idea/Model.iml
@@ -7,7 +7,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index c916a158..50cad4ca 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py
index 2d658c04..8ec4fdbe 100644
--- a/backend/SearchEpc.py
+++ b/backend/SearchEpc.py
@@ -96,7 +96,7 @@ vartypes = {
'walls-env-eff': 'str',
'transaction-type': 'str',
# 'uprn': "Int64",
- 'current-energy-efficiency': 'float',
+ 'current-energy-efficiency': 'Int64',
'energy-consumption-current': 'float',
'mainheat-description': 'str',
'lighting-cost-current': 'float',
@@ -342,8 +342,12 @@ class SearchEpc:
rows_filtered = [r for r in rows if ", ".join([r["address"], r["posttown"]]) == best_match[0]]
else:
best_match = process.extractOne(address, [r["address"] for r in rows], score_cutoff=0)
+ # Get the UPRN for the best match
+ best_match_uprn = {r["uprn"] for r in rows if r["address"] == best_match[0]}.pop()
# Get all of the scores
- rows_filtered = [r for r in rows if r["address"] == best_match[0]]
+ rows_filtered = [
+ r for r in rows if (r["address"] == best_match[0]) or (r["uprn"] == best_match_uprn)
+ ]
if rows_filtered:
return rows_filtered
@@ -642,6 +646,7 @@ class SearchEpc:
estimation_data = epc_data[[key, "weight", "lodgement-datetime"]].copy()
estimation_data = estimation_data[~pd.isnull(estimation_data[key])]
estimation_data = estimation_data[~estimation_data[key].isin(Definitions.DATA_ANOMALY_MATCHES)]
+
if vartype == "Int64":
# We have some edge cases where we get the error "invalid literal for int() with base 10: '1.0'"
# so this handles this
@@ -653,6 +658,13 @@ class SearchEpc:
estimated_epc[key] = None
continue
+ if key == "floor-height":
+ # We speficially handle this, to avoid extreme values
+ # We check if we have any rows less than 3.5m
+ if estimation_data[estimation_data["floor-height"].astype(float) <= 3.5].shape[0] > 0:
+ # Perform the filter
+ estimation_data = estimation_data[estimation_data["floor-height"].astype(float) <= 3.5]
+
if vartype == "Int64":
estimated_value = self._estimate_int(estimation_data, key)
elif vartype == "float":
@@ -675,6 +687,14 @@ class SearchEpc:
estimated_epc["current-energy-rating"] = sap_to_epc(estimated_epc["current-energy-efficiency"])
+ # Convert the cost current and potential variables - to string integers
+ for variable in ["heating-cost-current", "hot-water-cost-current", "lighting-cost-current",
+ "heating-cost-potential", "hot-water-cost-potential", "lighting-cost-potential"]:
+ estimated_epc[variable] = str(int(estimated_epc[variable]))
+
+ # This is a string
+ estimated_epc["low-energy-fixed-light-count"] = str(estimated_epc["low-energy-fixed-light-count"])
+
estimated_epc["postcode"] = self.postcode
estimated_epc["uprn"] = self.uprn
estimated_epc["address"] = self.full_address
diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py
index 65a6c32c..3b6f3985 100644
--- a/backend/app/plan/router.py
+++ b/backend/app/plan/router.py
@@ -393,6 +393,13 @@ async def trigger_plan(body: PlanTriggerRequest):
session.begin()
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
+ # Check for duplicate UPRNS
+ input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x]
+ if input_uprns:
+ # Check for dupes
+ if len(input_uprns) != len(set(input_uprns)):
+ raise ValueError("Duplicate UPRNs in the input data")
+
# If we have patches or overrides, we should read them in here
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
@@ -848,6 +855,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# Commit final changes
session.commit()
+
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
diff --git a/etl/customers/aiha/xml_extraction.py b/etl/customers/aiha/xml_extraction.py
index 531b6752..f96744ec 100644
--- a/etl/customers/aiha/xml_extraction.py
+++ b/etl/customers/aiha/xml_extraction.py
@@ -701,7 +701,7 @@ def main():
"starting_sap": 53,
"recommended_measures": [
{
- "measure": "Cyliner Insulation",
+ "measure": "Cylinder Insulation",
"description": "80mm cylinder insulation",
"sap_points": 2,
"ending_sap": 55,
diff --git a/etl/customers/ksquared/Wave3 Modelling.py b/etl/customers/ksquared/Wave3 Modelling.py
index 023ae25c..b96b261f 100644
--- a/etl/customers/ksquared/Wave3 Modelling.py
+++ b/etl/customers/ksquared/Wave3 Modelling.py
@@ -1,8 +1,17 @@
+import os
import time
+from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
+from backend.SearchEpc import SearchEpc
+from utils.s3 import save_csv_to_s3
+
+load_dotenv(dotenv_path="backend/.env")
+EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
+USER_ID = 8
+PORTFOLIO_ID = 117
def app():
@@ -32,18 +41,118 @@ def app():
for col in ["Address letter or number", "Street address", "Postcode"]:
hornsey_asset_list[col] = hornsey_asset_list[col].str.replace(" ", " ")
+ hornsey_asset_list = hornsey_asset_list[hornsey_asset_list["Address letter or number"] != ""]
+
+ missed_uprns = {
+ "Flat 13A Stowell House": 100021213098,
+ "Flat 24 Stowell House": 100021213110,
+ "Flat 1 36 Haringey Park": None
+ }
extracted_data = []
+ asset_list = []
for _, home in tqdm(hornsey_asset_list.iterrows(), total=len(hornsey_asset_list)):
- time.sleep(0.5)
+
+ if home["Address letter or number"] == "Flat 1 36 Haringey Park":
+ continue
+
# Some properties do not have an epc
if not home["Energy starting band (EPC)"]:
+ asset_list.append(
+ {
+ "uprn": missed_uprns[home["Address letter or number"]],
+ "address": home["Address letter or number"],
+ "postcode": home["Postcode"],
+ "property_type": "Flat", # They're all flats
+ }
+ )
continue
+
unit_number = home["Address letter or number"]
street = home["Street address"]
postcode = home["Postcode"]
address = ", ".join([x for x in [unit_number, street] if x])
- searcher = RetrieveFindMyEpc(address=address, postcode=postcode)
- epc_data = searcher.retrieve_newest_find_my_epc_data()
- extracted_data.append(epc_data)
+ find_epc_searcher = RetrieveFindMyEpc(address=address, postcode=postcode)
+ find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
+ time.sleep(0.5)
+ # We need uprn
+ searcher = SearchEpc(
+ address1=address,
+ postcode=postcode,
+ auth_token=EPC_AUTH_TOKEN,
+ os_api_key="",
+ full_address=address,
+ )
+ searcher.find_property(skip_os=True)
+ newest_epc = searcher.newest_epc
+ if newest_epc["current-energy-efficiency"] != home["Energy starting band (EPC)"].split("-")[1]:
+ raise Exception("Something went wrong with the EPC data")
+
+ extracted_data.append(
+ {
+ "uprn": newest_epc["uprn"],
+ **find_epc_data,
+ "hotwater-description": newest_epc["hotwater-description"],
+ }
+ )
+
+ asset_list.append(
+ {
+ "uprn": newest_epc["uprn"],
+ "address": home["Address letter or number"],
+ "postcode": home["Postcode"],
+ "property_type": "Flat", # They're all flats
+ }
+ )
# We format the extracted data so that is has the same structure as non-intrusive recommendations
+ # We then get the UPRNs and create the asset list
+
+ non_invasive_recommendations = [
+ {
+ "uprn": r["uprn"],
+ "recommendations": r["recommendations"]
+ } for r in extracted_data
+ ]
+ for r in non_invasive_recommendations:
+ new_recommendations = []
+ extracted = [r for r in extracted_data if r["uprn"] == r["uprn"]][0]
+ for rec in r["recommendations"]:
+ if extracted["hotwater-description"] == "Gas boiler/circulator, no cylinder thermostat":
+ if rec["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]:
+ continue
+ rec["survey"] = False
+ new_recommendations.append(rec)
+ r["recommendations"] = new_recommendations
+
+ # Store the asset list in s3
+ filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
+ save_csv_to_s3(
+ dataframe=pd.DataFrame(asset_list),
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=filename
+ )
+
+ # Store the non-invasive recommendations in s3
+ non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
+ save_csv_to_s3(
+ dataframe=pd.DataFrame(non_invasive_recommendations),
+ bucket_name="retrofit-plan-inputs-dev",
+ file_name=non_invasive_recommendations_filename
+ )
+
+ body = {
+ "portfolio_id": str(PORTFOLIO_ID),
+ "housing_type": "Social",
+ "goal": "Increasing EPC",
+ "goal_value": "C",
+ "trigger_file_path": filename,
+ "already_installed_file_path": "",
+ "patches_file_path": "",
+ "non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
+ "valuation_file_path": "",
+ "scenario_name": "Wave 3 Packages",
+ "multi_plan": True,
+ "budget": None,
+ "exclusions": ["boiler_upgrade"]
+ }
+ print(body)
diff --git a/etl/epc/Record.py b/etl/epc/Record.py
index 4c1a912b..558dbacb 100644
--- a/etl/epc/Record.py
+++ b/etl/epc/Record.py
@@ -359,6 +359,7 @@ class EPCRecord:
self._clean_property_dimensions()
self._clean_number_lighting_outlets()
self._clean_floor_level()
+ self._clean_floor_height()
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
@@ -387,6 +388,20 @@ class EPCRecord:
return df
+ def _clean_floor_height(self):
+ """ Remaps anomalies in floor height to the average floor height for the property type """
+ floor_height_data = self.cleaning_data[
+ (self.cleaning_data["property_type"] == self.prepared_epc["property-type"]) &
+ (self.cleaning_data["built_form"] == self.prepared_epc["built-form"])
+ ]
+ average = floor_height_data["floor_height"].mean()
+ sd = floor_height_data["floor_height"].std()
+ # If we're in the top 0.5 percentile of floor heights, we'll set it to the average
+ if self.prepared_epc["floor-height"] > average + 10 * sd:
+ self.prepared_epc["floor-height"] = average
+ if self.prepared_epc["floor-height"] <= 1.665:
+ self.prepared_epc["floor-height"] = average
+
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid
diff --git a/recommendations/HotwaterRecommendations.py b/recommendations/HotwaterRecommendations.py
index 636a7be0..5ff7ae4f 100644
--- a/recommendations/HotwaterRecommendations.py
+++ b/recommendations/HotwaterRecommendations.py
@@ -21,11 +21,44 @@ class HotwaterRecommendations:
"""
# Reset the recommendations
self.recommendations = []
+ non_invasive_recommendations = self.property.non_invasive_recommendations
+ if non_invasive_recommendations:
+ measures = [
+ r["type"] for r in non_invasive_recommendations if
+ r["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]
+ ]
+
+ recommendations_phase = phase
+ for m in measures:
+ non_invasive_rec = [
+ r for r in non_invasive_recommendations if r["type"] == m
+ ][0]
+ if m == "hot_water_tank_insulation":
+ # We need to be able to stack these recommendations
+ self.recommend_tank_insulation(
+ phase=recommendations_phase,
+ sap_points=non_invasive_rec["sap_points"],
+ survey=non_invasive_rec["survey"],
+ )
+
+ recommendations_phase += 1
+ elif m == "cylinder_thermostat":
+ self.recommend_cylinder_thermostat(
+ phase=recommendations_phase,
+ sap_points=non_invasive_rec["sap_points"],
+ survey=non_invasive_rec["survey"],
+ )
+ recommendations_phase += 1
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
- # If there is no system present, but access to the mains, we
+ if self.property.hotwater["clean_description"] == "Gas boiler/circulator, no cylinder thermostat":
+ # Handle this case specifically:
+ self.recommend_cylinder_thermostat_gas_boiler_circulator(phase=phase)
+ return
+
+ # If there is no system present, but access to the mains, we
if (
(self.property.hotwater["heater_type"] in ["electric immersion"]) &
@@ -39,7 +72,7 @@ class HotwaterRecommendations:
self.recommend_cylinder_thermostat(phase=phase)
return
- def recommend_tank_insulation(self, phase):
+ def recommend_tank_insulation(self, phase, sap_points=None, survey=False, _return=False):
"""
If the home has a very poor hot water system, this is often indicative of a lack of insulation on the hot water
tank. This is a very simple and cost effective improvement that can be made to the home. It will likely
@@ -55,27 +88,30 @@ class HotwaterRecommendations:
else:
description = "Insulate hot water tank"
- self.recommendations.append(
- {
- "phase": phase,
- "parts": [],
- "type": "hot_water_tank_insulation",
- "measure_type": "hot_water_tank_insulation",
- "description": description,
- "starting_u_value": None,
- "new_u_value": None,
- "sap_points": None,
- "already_installed": already_installed,
- **recommendation_cost,
- "simulation_config": {"hot_water_energy_eff_ending": "Poor"},
- "description_simulation": {
- "hot-water-energy-eff": "Poor"
- }
- }
- )
+ to_append = {
+ "phase": phase,
+ "parts": [],
+ "type": "hot_water_tank_insulation",
+ "measure_type": "hot_water_tank_insulation",
+ "description": description,
+ "starting_u_value": None,
+ "new_u_value": None,
+ "sap_points": sap_points,
+ "already_installed": already_installed,
+ **recommendation_cost,
+ "simulation_config": {"hot_water_energy_eff_ending": "Poor"},
+ "description_simulation": {
+ "hot-water-energy-eff": "Poor"
+ },
+ "survey": survey
+ }
+ if _return:
+ return to_append
+
+ self.recommendations.append(to_append)
return
- def recommend_cylinder_thermostat(self, phase):
+ def recommend_cylinder_thermostat(self, phase, sap_points=None, survey=False, _return=False):
"""
If the home has a very poor hot water system, this is often indicative of a lack of insulation on the hot water
tank. This is a very simple and cost effective improvement that can be made to the home.
@@ -101,23 +137,86 @@ class HotwaterRecommendations:
**hotwater_simulation_config
}
- self.recommendations.append(
- {
- "phase": phase,
- "parts": [],
- "type": "cylinder_thermostat",
- "measure_type": "cylinder_thermostat",
- "description": description,
- "starting_u_value": None,
- "new_u_value": None,
- "sap_points": None,
- "already_installed": already_installed,
- **recommendation_cost,
- "simulation_config": simulation_config,
- "description_simulation": {
- "hot-water-energy-eff": self.property.data["hot-water-energy-eff"],
- "hotwater-description": new_epc_description,
- }
- }
- )
+ to_append = {
+ "phase": phase,
+ "parts": [],
+ "type": "cylinder_thermostat",
+ "measure_type": "cylinder_thermostat",
+ "description": description,
+ "starting_u_value": None,
+ "new_u_value": None,
+ "sap_points": sap_points,
+ "already_installed": already_installed,
+ **recommendation_cost,
+ "simulation_config": simulation_config,
+ "description_simulation": {
+ "hot-water-energy-eff": self.property.data["hot-water-energy-eff"],
+ "hotwater-description": new_epc_description,
+ },
+ "survey": survey
+ }
+ if _return:
+ return to_append
+
+ self.recommendations.append(to_append)
+ return
+
+ def recommend_cylinder_thermostat_gas_boiler_circulator(self, phase):
+ """
+ If the home has a very poor hot water system, this is often indicative of a lack of insulation on the
+ hot water
+ tank. This is a very simple and cost effective improvement that can be made to the home.
+ """
+
+ thermostat_recommendation_cost = self.costs.cylinder_thermostat()
+ cylinder_recommendation_cost = self.costs.hot_water_tank_insulation()
+ # Add them
+ total_cost = {
+ k: thermostat_recommendation_cost[k] + cylinder_recommendation_cost[k] for k in
+ thermostat_recommendation_cost.keys()
+ }
+
+ already_installed = "cylinder_thermostat" in self.property.already_installed
+ if already_installed:
+ total_cost = override_costs(total_cost)
+ description = "Cylinder thermostat & insulation has already been installed, no further action required"
+ else:
+ description = "Install a smart cylinder thermostat and insulate the hot water tank with 80mm insulation"
+
+ new_epc_description = "From main system"
+ hotwater_ending_config = HotWaterAttributes(new_epc_description).process()
+ hotwater_simulation_config = check_simulation_difference(
+ new_config=hotwater_ending_config, old_config=self.property.hotwater
+ )
+
+ if self.property.data["hot-water-energy-eff"] in ["Very Poor", "Poor", "Average"]:
+ new_efficiency = "Good"
+ else:
+ new_efficiency = self.property.data["hot-water-energy-eff"]
+
+ simulation_config = {
+ "hot_water_energy_eff_ending": new_efficiency,
+ **hotwater_simulation_config
+ }
+
+ to_append = {
+ "phase": phase,
+ "parts": [],
+ "type": "cylinder_thermostat",
+ "measure_type": "cylinder_thermostat",
+ "description": description,
+ "starting_u_value": None,
+ "new_u_value": None,
+ "sap_points": None,
+ "already_installed": already_installed,
+ **total_cost,
+ "simulation_config": simulation_config,
+ "description_simulation": {
+ "hot-water-energy-eff": simulation_config["hot_water_energy_eff_ending"],
+ "hotwater-description": new_epc_description,
+ },
+ "survey": False
+ }
+
+ self.recommendations.append(to_append)
return
diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py
index a1183d33..ed6a8526 100644
--- a/recommendations/Recommendations.py
+++ b/recommendations/Recommendations.py
@@ -142,12 +142,9 @@ class Recommendations:
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
- # insulation
- # We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this
- # has no
- # real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
- # have any
- # wall or roof recommendations, we will ensure that ventilation is included in the simulation
+ # insulation We will not attribute a SAP impact to the ventilation recommendation, since we've seen that this
+ # has no real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
+ # have any wall or roof recommendations, we will ensure that ventilation is included in the simulation
if (
(self.wall_recomender.recommendations or self.roof_recommender.recommendations) and
("ventilation" in measures)
@@ -253,8 +250,13 @@ class Recommendations:
if "hot_water" in measures:
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
- property_recommendations.append(self.hotwater_recommender.recommendations)
- phase += 1
+ if len(self.hotwater_recommender.recommendations) > 1:
+ for r in self.hotwater_recommender.recommendations:
+ property_recommendations.append([r])
+ phase += 1
+ else:
+ property_recommendations.append(self.hotwater_recommender.recommendations)
+ phase += 1
if "secondary_heating" in measures:
self.secondary_heating_recommender.recommend(phase=phase)
diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py
index acc78359..51264b75 100644
--- a/recommendations/RoofRecommendations.py
+++ b/recommendations/RoofRecommendations.py
@@ -152,6 +152,9 @@ class RoofRecommendations:
if self.is_room_roof_insulated_or_unsuitable(measures):
return
+ if self.property.roof["is_thatched"]:
+ return
+
# If we have a u-value already, need to implement this
if u_value:
if u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py
index c7917911..f77ae5a0 100644
--- a/recommendations/WallRecommendations.py
+++ b/recommendations/WallRecommendations.py
@@ -540,15 +540,10 @@ class WallRecommendations(Definitions):
lowest_selected_u_value = None
recommendations = []
-
- iwi_non_invasive_recommendations = next(
- (r for r in self.property.non_invasive_recommendations if r["type"] == "internal_wall_insulation"), {}
+ non_invasive_recommendations = next(
+ (r for r in self.property.non_invasive_recommendations if
+ r["type"] == insulation_materials["type"].values[0]), {}
)
- ewi_non_invasive_recommendations = next(
- (r for r in self.property.non_invasive_recommendations if r["type"] == "external_wall_insulation"), {}
- )
- if ewi_non_invasive_recommendations:
- raise NotImplementedError("Implement ewi non-invasive recommendations")
for _, insulation_material_group in insulation_materials.groupby("description"):
@@ -590,31 +585,25 @@ class WallRecommendations(Definitions):
if already_installed:
cost_result = override_costs(cost_result)
+ if non_invasive_recommendations.get("cost") is not None:
+ raise NotImplementedError(
+ "Not handled passing costs from non-invasive recommendations for iwi"
+ )
+
if material["type"] == "internal_wall_insulation":
-
- if iwi_non_invasive_recommendations.get("cost") is not None:
- raise NotImplementedError(
- "Not handled passing costs from non-invasive recommendations for iwi"
- )
-
- sap_points = iwi_non_invasive_recommendations.get("sap_points", None)
- survey = iwi_non_invasive_recommendations.get("survey", False)
-
new_description = self.get_internal_external_wall_description(
self.INTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value
)
-
elif material["type"] == "external_wall_insulation":
-
- sap_points = ewi_non_invasive_recommendations.get("sap_points", None)
- survey = ewi_non_invasive_recommendations.get("survey", False)
-
new_description = self.get_internal_external_wall_description(
self.EXTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value
)
else:
raise ValueError("Invalid material type")
+ sap_points = non_invasive_recommendations.get("sap_points", None)
+ survey = non_invasive_recommendations.get("survey", False)
+
wall_ending_config = WallAttributes(new_description).process()
walls_simulation_config = check_simulation_difference(
diff --git a/recommendations/rdsap_tables.py b/recommendations/rdsap_tables.py
index 16c7d26e..e56faf7c 100644
--- a/recommendations/rdsap_tables.py
+++ b/recommendations/rdsap_tables.py
@@ -257,7 +257,7 @@ epc_wall_description_map = {
"Timber frame, as built, partial insulation": "Timber frame as built",
"Timber frame, as built, no insulation": "Timber frame as built",
"Timber frame, with external insulation": "Timber frame with internal insulation",
-
+ "Timber frame, with internal insulation": "Timber frame with internal insulation",
############################
# Sandstone/limestones wall mappings
############################